package permission // 权限规则学习系统. // // 当用户反复允许同类操作时,自动建议添加永久规则, // 减少重复授权的烦扰,提升使用体验. // // 设计思路: // - 记录每次权限决策的工具名,输入特征和用户决策 // - 当同类操作被允许超过阈值次数时,建议永久规则 // - 阈值可配置,默认 3 次 // - 线程安全,可在并发环境下使用 import ( "sync" ) // 默认建议阈值:同类操作被允许达到此次数后建议添加规则. const defaultSuggestThreshold = 3 // LearningTracker 追踪权限决策历史,用于建议永久规则. type LearningTracker struct { mu sync.Mutex decisions []decisionRecord // 所有决策记录 prefixCounts map[string]*ruleCount // Bash 前缀被允许的次数 pathPatternCounts map[string]*ruleCount // 文件路径模式被允许的次数 domainCounts map[string]*ruleCount // 域名被允许的次数 threshold int // 建议阈值 } // decisionRecord 是一条权限决策记录. type decisionRecord struct { ToolName string Input map[string]any Decision Decision Feature string // 提取的特征(命令前缀,文件路径模式,域名等) } // ruleCount 追踪某类规则被允许的次数. type ruleCount struct { AllowCount int // 被允许的次数 DenyCount int // 被拒绝的次数 RuleString string // 对应的规则字符串 Description string // 规则描述 Suggested bool // 是否已经建议过 } // NewLearningTracker 创建一个新的权限学习追踪器. // // threshold 指定建议阈值,即同类操作被允许多少次后建议添加永久规则. // 传入 0 使用默认值(3 次). func NewLearningTracker(threshold int) *LearningTracker { if threshold <= 0 { threshold = defaultSuggestThreshold } return &LearningTracker{ decisions: make([]decisionRecord, 0), prefixCounts: make(map[string]*ruleCount), pathPatternCounts: make(map[string]*ruleCount), domainCounts: make(map[string]*ruleCount), threshold: threshold, } } // TrackDecision 记录一次权限决策. // // 在用户做出允许或拒绝决策后调用此方法,系统将提取操作特征并累计计数. // 只记录用户主动做出的决策(Allow 和 Deny),自动决策不记录. func (lt *LearningTracker) TrackDecision(toolName string, input map[string]any, decision Decision) { // 只追踪用户主动做出的 allow/deny 决策 if decision != DecisionAllow && decision != DecisionDeny { return } lt.mu.Lock() defer lt.mu.Unlock() feature := lt.extractFeature(toolName, input) record := decisionRecord{ ToolName: toolName, Input: input, Decision: decision, Feature: feature, } lt.decisions = append(lt.decisions, record) // 按工具类型累计计数 switch toolName { case "Bash": lt.trackBashDecision(input, decision) case "Edit", "FileEdit", "Write", "FileWrite": lt.trackFileDecision(toolName, input, decision) case "WebFetch": lt.trackWebFetchDecision(input, decision) } } // trackBashDecision 追踪 Bash 命令的权限决策. func (lt *LearningTracker) trackBashDecision(input map[string]any, decision Decision) { command, _ := input["command"].(string) if command == "" { return } // 提取命令前缀作为特征 cmdName, _ := ExtractCommandName(command) if cmdName == "" { return } key := "bash:" + cmdName rc, ok := lt.prefixCounts[key] if !ok { rc = &ruleCount{ RuleString: "Bash(prefix:" + cmdName + ")", Description: "允许所有 " + cmdName + " 命令", } lt.prefixCounts[key] = rc } if decision == DecisionAllow { rc.AllowCount++ } else { rc.DenyCount++ } } // trackFileDecision 追踪文件操作的权限决策. func (lt *LearningTracker) trackFileDecision(toolName string, input map[string]any, decision Decision) { path := ExtractFilePath(input) if path == "" { return } // 使用目录作为路径模式特征 dir := extractDirPattern(path) if dir == "" { return } key := toolName + ":" + dir rc, ok := lt.pathPatternCounts[key] if !ok { rc = &ruleCount{ RuleString: toolName + "(" + dir + "/**)", Description: "允许 " + toolName + " 操作 " + dir + "/ 下的文件", } lt.pathPatternCounts[key] = rc } if decision == DecisionAllow { rc.AllowCount++ } else { rc.DenyCount++ } } // trackWebFetchDecision 追踪网页访问的权限决策. func (lt *LearningTracker) trackWebFetchDecision(input map[string]any, decision Decision) { url := ExtractURL(input) if url == "" { return } domain := ExtractDomainFromURL(url) if domain == "" { return } key := "webfetch:" + domain rc, ok := lt.domainCounts[key] if !ok { rc = &ruleCount{ RuleString: "WebFetch(domain:" + domain + ")", Description: "允许访问 " + domain, } lt.domainCounts[key] = rc } if decision == DecisionAllow { rc.AllowCount++ } else { rc.DenyCount++ } } // SuggestPermanentRules 基于历史决策建议永久规则. // // 返回所有满足建议阈值且未曾建议过的规则. // 每条规则只会被建议一次,调用此方法后会标记已建议的规则. func (lt *LearningTracker) SuggestPermanentRules() []SuggestedRule { lt.mu.Lock() defer lt.mu.Unlock() var suggestions []SuggestedRule // 检查 Bash 前缀规则 for _, rc := range lt.prefixCounts { if rc.Suggested { continue } // 精妙之处(CLEVER): 建议条件要求 AllowCount > DenyCount*2-- // 不仅看绝对次数达到阈值,还要求允许次数远超拒绝次数. // 如果用户允许了 3 次又拒绝了 2 次同类操作,说明态度不一致,不应建议自动放行. // 这个 2:1 的比例是经验值,避免在用户犹豫不决时过早建议. if rc.AllowCount >= lt.threshold && rc.AllowCount > rc.DenyCount*2 { suggestions = append(suggestions, SuggestedRule{ RuleString: rc.RuleString, Description: rc.Description, }) rc.Suggested = true } } // 检查文件路径模式规则 for _, rc := range lt.pathPatternCounts { if rc.Suggested { continue } if rc.AllowCount >= lt.threshold && rc.AllowCount > rc.DenyCount*2 { suggestions = append(suggestions, SuggestedRule{ RuleString: rc.RuleString, Description: rc.Description, }) rc.Suggested = true } } // 检查域名规则 for _, rc := range lt.domainCounts { if rc.Suggested { continue } if rc.AllowCount >= lt.threshold && rc.AllowCount > rc.DenyCount*2 { suggestions = append(suggestions, SuggestedRule{ RuleString: rc.RuleString, Description: rc.Description, }) rc.Suggested = true } } return suggestions } // Reset 重置所有追踪数据. // 通常在会话结束或用户显式请求时调用. func (lt *LearningTracker) Reset() { lt.mu.Lock() defer lt.mu.Unlock() lt.decisions = make([]decisionRecord, 0) lt.prefixCounts = make(map[string]*ruleCount) lt.pathPatternCounts = make(map[string]*ruleCount) lt.domainCounts = make(map[string]*ruleCount) } // Stats 返回当前追踪统计信息. func (lt *LearningTracker) Stats() LearningStats { lt.mu.Lock() defer lt.mu.Unlock() stats := LearningStats{ TotalDecisions: len(lt.decisions), } for _, d := range lt.decisions { if d.Decision == DecisionAllow { stats.TotalAllowed++ } else { stats.TotalDenied++ } } return stats } // LearningStats 是学习追踪器的统计信息. type LearningStats struct { TotalDecisions int // 总决策次数 TotalAllowed int // 总允许次数 TotalDenied int // 总拒绝次数 } // extractFeature 从工具调用中提取用于分类的特征字符串. func (lt *LearningTracker) extractFeature(toolName string, input map[string]any) string { switch toolName { case "Bash": command, _ := input["command"].(string) cmdName, _ := ExtractCommandName(command) return "bash:" + cmdName case "Edit", "FileEdit", "Write", "FileWrite": path := ExtractFilePath(input) dir := extractDirPattern(path) return toolName + ":" + dir case "WebFetch": url := ExtractURL(input) return "webfetch:" + ExtractDomainFromURL(url) default: return toolName } } // extractDirPattern 从文件路径中提取目录模式. // 返回路径的目录部分,去掉文件名. func extractDirPattern(path string) string { if path == "" { return "" } // 找到最后一个 / 的位置 lastSlash := -1 for i := len(path) - 1; i >= 0; i-- { if path[i] == '/' || path[i] == '\\' { lastSlash = i break } } if lastSlash <= 0 { return "." } return path[:lastSlash] }