package permission // 完整权限检查入口. // // 这是权限系统的核心检查器,整合规则匹配,Bash 安全分析, // 文件系统权限检查和域名权限检查,提供统一的权限检查入口. // // 对应原项目中 hasPermissionsToUseTool() 和 // hasPermissionsToUseToolInner() 的功能. import ( "fmt" "strings" "sync" "git.flytoex.net/yuanwei/flyto-agent/internal/syslib/bash" ) // PermissionClass 是工具的权限检查类型. // 工具在 Metadata.PermissionClass 中声明,权限引擎据此分派检查逻辑. // // 升华改进(ELEVATED): P0-2 修复--第三方工具通过 RegisterToolClass 声明权限类型, // 所有工具必须在 Metadata.PermissionClass 中声明类型,注册时自动写入 toolClassRegistry. // 替代方案:修改 CheckToolPermission 签名传入 ToolRegistry(破坏所有调用点). type PermissionClass = string const ( // PermClassBash 命令执行类工具(Bash 语义:子命令分割,前缀匹配,危险命令检测). PermClassBash PermissionClass = "bash" // PermClassFile 文件操作类工具(路径权限检查,危险路径检测). PermClassFile PermissionClass = "file" // PermClassWebFetch Web 请求类工具(域名权限检查). PermClassWebFetch PermissionClass = "webfetch" // PermClassGeneric 通用工具(只读自动放行,其余默认 Ask). PermClassGeneric PermissionClass = "generic" // PermClassReadOnly 只读通用工具(等同 PermClassGeneric 但永远放行). // 用于明确声明工具无副作用,无需用户确认. PermClassReadOnly PermissionClass = "readonly" ) // toolClassMu 保护动态工具权限类型注册表的并发读写. var toolClassMu sync.RWMutex // toolClassRegistry 存储动态注册的工具名 → 权限类型映射. // 由 tools.Registry.Register 在注册工具时自动填充(如果工具声明了 Metadata.PermissionClass). var toolClassRegistry = map[string]PermissionClass{} // RegisterToolClass 注册工具的权限检查类型. // 通常由 tools.Registry 在注册工具时自动调用,第三方工具也可手动调用. // // 精妙之处(CLEVER): 解耦 permission ↔ tools 双向依赖-- // permission 包不 import tools,tools.Registry 调用此函数注入映射. // 工具新增时只需在 Metadata.PermissionClass 声明一次,无需改权限核心代码. func RegisterToolClass(toolName string, class PermissionClass) { toolClassMu.Lock() defer toolClassMu.Unlock() toolClassRegistry[toolName] = class } // UnregisterToolClass 从动态注册表中移除工具(测试清理用). func UnregisterToolClass(toolName string) { toolClassMu.Lock() defer toolClassMu.Unlock() delete(toolClassRegistry, toolName) } // lookupToolClass 查询工具的权限类型(动态注册表优先,找不到返回空字符串). func lookupToolClass(toolName string) PermissionClass { toolClassMu.RLock() defer toolClassMu.RUnlock() return toolClassRegistry[toolName] } // MaxSubcommands 是单条 Bash 输入允许的最大子命令数量. // // 精妙之处(CLEVER): 50 子命令上限不是性能优化,是安全防线. // 恶意注入可以构造 cmd1; cmd2; ... cmd1000 的超长命令串, // 逐个检查时分类器调用次数爆炸.50 是"正常不会超过,恶意会超过"的经验值. // 正常 shell one-liner 一般 5-15 个子命令. const MaxSubcommands = 50 // CheckToolPermission 是完整的权限检查入口. // // 综合以下检查维度: // 1. 模式检查(bypass / plan 快速路径) // 2. 规则匹配(按优先级从低到高) // 3. 工具特定检查: // - Bash:分割复合命令,逐个检查;检测危险命令 // - 文件操作:检查路径权限和危险路径 // - WebFetch:检查域名权限 // 4. 危险检测(在规则匹配之后的额外安全层) // // 返回值说明: // - DecisionAllow: 允许执行 // - DecisionDeny: 拒绝执行 // - DecisionAsk: 需要询问用户确认 func CheckToolPermission(toolName string, input map[string]any, rules []Rule, mode Mode) *Response { // 1. 快速路径:bypass 模式直接放行 if mode == ModeBypass { return &Response{Decision: DecisionAllow, Reason: "bypass mode"} } // 2. 快速路径:plan 模式直接拒绝 if mode == ModePlan { return &Response{Decision: DecisionDeny, Reason: "plan mode"} } // 3. 根据工具类型分派检查逻辑. // // 所有工具必须通过 Metadata.PermissionClass 声明权限类型, // 注册时自动写入 toolClassRegistry.无 fallback 硬编码 map. // // Bash 注意事项(ELEVATED-REVIEWED 保留): // PermClassBash 语义依赖 shell AST 解析,非 shell 的命令执行工具(如 PowerShell) // 应声明为独立的 PermissionClass 并新增专用分支,而非复用 PermClassBash. dynClass := lookupToolClass(toolName) switch { case dynClass == PermClassBash: return checkBashPermission(input, rules, mode) case dynClass == PermClassFile: return checkFilePermission(toolName, input, rules, mode) case dynClass == PermClassWebFetch: return checkWebFetchPermission(input, rules, mode) case dynClass == PermClassReadOnly: // 明确声明只读:直接放行,无需经过 checkGenericPermission 的规则匹配. return &Response{Decision: DecisionAllow, Reason: "readonly tool: " + toolName} default: return checkGenericPermission(toolName, input, rules, mode) } } // checkBashPermission 检查 Bash 工具的权限. // // 特殊处理: // - 分割复合命令,逐个检查 // - 任何子命令被拒绝 → 整体拒绝 // - 任何子命令需要 ask → 整体 ask // - 全部允许 → 整体允许 func checkBashPermission(input map[string]any, rules []Rule, mode Mode) *Response { command, _ := input["command"].(string) if command == "" { return &Response{Decision: DecisionAsk, Reason: "empty command"} } // AST 解析,提取子命令 root := bash.Parse(command) commands := bash.ExtractCommands(root) if len(commands) == 0 { return &Response{Decision: DecisionAsk, Reason: "no commands parsed"} } // 5.2A: 子命令数量上限检查 if len(commands) > MaxSubcommands { return &Response{ Decision: DecisionAsk, Reason: fmt.Sprintf("Command contains %d sub-commands (limit: %d). Please simplify or confirm.", len(commands), MaxSubcommands), } } // 构建子命令文本列表 subCommands := make([]string, 0, len(commands)) for _, c := range commands { raw := strings.TrimSpace(c.RawText) if raw != "" { subCommands = append(subCommands, raw) } } if len(subCommands) == 0 { return &Response{Decision: DecisionAsk, Reason: "no commands parsed"} } // 筛选出适用于 Bash 的规则 bashRules := filterRulesForTool("Bash", rules) overallDecision := DecisionAllow var overallReason string for i, subCmd := range subCommands { // 5.3 集成: 如果子命令是 sed,额外过 sed 安全验证 if i < len(commands) { cmdName, _ := bash.ExtractCommandName(commands[i]) if cmdName == "sed" { sedResult := CheckSedCommand(subCmd, false) if !sedResult.Safe { return &Response{ Decision: DecisionAsk, Reason: "sed security: " + sedResult.Reason, } } } } // 5.4: 对于含动态重定向目标的命令,在 reason 中保留完整信息 if i < len(commands) && hasDynamicRedirect(commands[i]) { overallDecision = DecisionAsk if overallReason == "" { overallReason = "command has dynamic redirect target: " + subCmd } continue } // 检查单个子命令 resp := checkSingleBashCommand(subCmd, bashRules, mode) switch resp.Decision { case DecisionDeny: // 任何子命令被拒绝,整体拒绝 return resp case DecisionAsk: // 记录需要 ask,继续检查其他命令 overallDecision = DecisionAsk if overallReason == "" { overallReason = resp.Reason } } // DecisionAllow:继续检查下一个子命令 } // 升华改进(ELEVATED): 故意不实现逻辑短路语义. // 正向看:cmd1 && cmd2 中 cmd1 失败则 cmd2 不执行,可以降低 cmd2 的风险评估. // 反向看:权限检查是静态分析,不知道运行时 cmd1 会不会成功. // 攻击者可以构造 true && rm -rf /(true 一定成功,短路失效). // 结论:每个子命令独立评估,不利用短路语义.安全第一. // 替代方案:利用短路语义降低非首命令的风险评估(省一些 AI 调用,但有安全风险). if overallDecision == DecisionAsk { return &Response{Decision: DecisionAsk, Reason: overallReason} } return &Response{Decision: DecisionAllow, Reason: "all sub-commands allowed"} } // hasDynamicRedirect 检查命令是否包含动态重定向目标. // 动态重定向目标无法在静态分析时确定文件路径,需要用户确认. func hasDynamicRedirect(cmd *bash.CommandInfo) bool { for _, redir := range cmd.Redirections { if !redir.IsStatic { return true } } return false } // checkSingleBashCommand 检查单个 Bash 命令的权限. func checkSingleBashCommand(cmd string, bashRules []Rule, mode Mode) *Response { // 1. 尝试规则匹配 ruleResp := matchBashRules(cmd, bashRules) if ruleResp != nil { // 规则有明确的 allow/deny 结果 if ruleResp.Decision == DecisionAllow || ruleResp.Decision == DecisionDeny { return ruleResp } } // 2. 危险命令检测 if IsDangerousCommand(cmd) { return &Response{ Decision: DecisionAsk, Reason: "dangerous command detected: " + cmd, } } // 3. accept_edits 模式不自动放行 Bash 命令 // (Bash 命令总是需要检查,不受 accept_edits 影响) // 4. 如果有规则匹配到 ask,返回 ask if ruleResp != nil && ruleResp.Decision == DecisionAsk { return ruleResp } // 5. 默认需要询问 return &Response{Decision: DecisionAsk, Reason: "no matching rule for bash command: " + cmd} } // matchBashRules 对 Bash 命令匹配规则. // // 支持两种匹配方式: // - 无 Content 的规则:匹配所有 Bash 命令 // - prefix: 开头的规则:前缀匹配命令 func matchBashRules(cmd string, bashRules []Rule) *Response { var bestMatch *Rule bestPriority := -1 bestSpecificity := -1 // 规则的具体程度(越具体越优先) for i := range bashRules { rule := &bashRules[i] parsed := ParseContent(rule.Content) specificity := 0 matched := false switch parsed.Type { case ContentNone: // 无条件匹配所有 Bash 命令 matched = true specificity = 0 case ContentPrefix: // 前缀匹配 if matchCommandPrefix(cmd, parsed.Value) { matched = true specificity = len(parsed.Value) // 更长的前缀更具体 } } if !matched { continue } priority := SourcePriority(rule.Source) // 同优先级下,更具体的规则优先 if priority > bestPriority || (priority == bestPriority && specificity > bestSpecificity) { bestMatch = rule bestPriority = priority bestSpecificity = specificity } } if bestMatch == nil { return nil } return &Response{ Decision: bestMatch.Behavior, Reason: "bash rule: " + SerializeRule(*bestMatch), } } // matchCommandPrefix 检查命令是否匹配前缀规则. // // 前缀匹配规则:命令以指定前缀开头(空格分词边界或完全匹配). // 示例: // - prefix "npm" 匹配 "npm install", "npm test" // - prefix "git push" 匹配 "git push origin main" // - prefix "npm" 不匹配 "npmore"(需要完整词匹配) func matchCommandPrefix(cmd, prefix string) bool { cmd = strings.TrimSpace(cmd) prefix = strings.TrimSpace(prefix) if prefix == "" { return true } if cmd == prefix { return true } // 前缀匹配,要求在词边界处 if strings.HasPrefix(cmd, prefix) { // 检查前缀后面是空格或行末 nextChar := cmd[len(prefix)] if nextChar == ' ' || nextChar == '\t' { return true } } return false } // checkFilePermission 检查文件操作工具的权限. func checkFilePermission(toolName string, input map[string]any, rules []Rule, mode Mode) *Response { path := ExtractFilePath(input) if path == "" { // 无法提取路径,需要询问 return &Response{Decision: DecisionAsk, Reason: "cannot extract file path"} } // 1. 尝试规则匹配(路径 glob) fileRules := filterRulesForTool(toolName, rules) ruleResp := matchFileRules(path, fileRules) if ruleResp != nil && (ruleResp.Decision == DecisionAllow || ruleResp.Decision == DecisionDeny) { return ruleResp } // 2. 危险路径检测 if IsDangerousPath(path) { return &Response{ Decision: DecisionAsk, Reason: "dangerous file path: " + path, } } // 3. accept_edits 模式自动放行文件编辑(动态注册表优先) dynClass := lookupToolClass(toolName) if mode == ModeAcceptEdits && (dynClass == PermClassFile || isEditTool(toolName)) { return &Response{Decision: DecisionAllow, Reason: "accept_edits mode"} } // 4. 如果有规则匹配到 ask if ruleResp != nil { return ruleResp } // 精妙之处(CLEVER): 只读工具默认放行--Read/FileRead 不修改文件系统, // 如果也要用户确认会极大降低体验(Agent 读几十个文件每个都要确认). // 但注意 Read 仍受危险路径检测约束(步骤 2),读 ~/.ssh/id_rsa 仍会触发 ask. if toolName == "Read" || toolName == "FileRead" { return &Response{Decision: DecisionAllow, Reason: "read-only tool"} } // 6. 默认需要询问 return &Response{Decision: DecisionAsk, Reason: "no matching rule for file operation: " + path} } // matchFileRules 匹配文件路径相关的规则. func matchFileRules(path string, fileRules []Rule) *Response { var bestMatch *Rule bestPriority := -1 for i := range fileRules { rule := &fileRules[i] parsed := ParseContent(rule.Content) matched := false switch parsed.Type { case ContentNone: // 无条件匹配 matched = true case ContentPath: // 路径 glob 匹配 if GlobMatch(parsed.Value, path) { matched = true } } if !matched { continue } priority := SourcePriority(rule.Source) if priority > bestPriority { bestMatch = rule bestPriority = priority } } if bestMatch == nil { return nil } return &Response{ Decision: bestMatch.Behavior, Reason: "file rule: " + SerializeRule(*bestMatch), } } // checkWebFetchPermission 检查 WebFetch 工具的权限. func checkWebFetchPermission(input map[string]any, rules []Rule, mode Mode) *Response { url := ExtractURL(input) if url == "" { return &Response{Decision: DecisionAsk, Reason: "cannot extract URL"} } domain := ExtractDomainFromURL(url) // 1. 尝试规则匹配(域名) webRules := filterRulesForTool("WebFetch", rules) ruleResp := matchDomainRules(domain, webRules) if ruleResp != nil && (ruleResp.Decision == DecisionAllow || ruleResp.Decision == DecisionDeny) { return ruleResp } // 2. 如果有规则匹配到 ask if ruleResp != nil { return ruleResp } // 3. WebFetch 是只读操作,默认允许(但需要规则明确) return &Response{Decision: DecisionAsk, Reason: "no matching rule for domain: " + domain} } // matchDomainRules 匹配域名相关的规则. func matchDomainRules(domain string, webRules []Rule) *Response { var bestMatch *Rule bestPriority := -1 for i := range webRules { rule := &webRules[i] parsed := ParseContent(rule.Content) matched := false switch parsed.Type { case ContentNone: // 无条件匹配 matched = true case ContentDomain: // 域名匹配 if MatchDomain(parsed.Value, domain) { matched = true } } if !matched { continue } priority := SourcePriority(rule.Source) if priority > bestPriority { bestMatch = rule bestPriority = priority } } if bestMatch == nil { return nil } return &Response{ Decision: bestMatch.Behavior, Reason: "domain rule: " + SerializeRule(*bestMatch), } } // checkGenericPermission 检查通用工具的权限. func checkGenericPermission(toolName string, input map[string]any, rules []Rule, mode Mode) *Response { // 1. 尝试规则匹配 toolRules := filterRulesForTool(toolName, rules) ruleResp := matchGenericRules(toolRules) if ruleResp != nil { return ruleResp } // 2. 只读工具默认允许 dynC := lookupToolClass(toolName) if dynC == PermClassReadOnly { return &Response{Decision: DecisionAllow, Reason: "read-only tool: " + toolName} } // 3. accept_edits 模式下编辑工具自动放行(动态注册表优先) if mode == ModeAcceptEdits && (dynC == PermClassFile || isEditTool(toolName)) { return &Response{Decision: DecisionAllow, Reason: "accept_edits mode"} } // 4. 默认需要询问 return &Response{Decision: DecisionAsk, Reason: "no matching rule for tool: " + toolName} } // matchGenericRules 匹配通用规则(无内容条件). func matchGenericRules(toolRules []Rule) *Response { var bestMatch *Rule bestPriority := -1 for i := range toolRules { rule := &toolRules[i] priority := SourcePriority(rule.Source) if priority > bestPriority { bestMatch = rule bestPriority = priority } } if bestMatch == nil { return nil } return &Response{ Decision: bestMatch.Behavior, Reason: "generic rule: " + SerializeRule(*bestMatch), } } // filterRulesForTool 筛选适用于指定工具的规则. // // 包括: // - 工具名完全匹配的规则 // - 通配符 * 规则 func filterRulesForTool(toolName string, rules []Rule) []Rule { var filtered []Rule for _, rule := range rules { if rule.ToolName == toolName || rule.ToolName == "*" { filtered = append(filtered, rule) } } return filtered }