package engine // audit_observer.go - 将 EventObserver 事件桥接到 AuditSink. // // 设计定位: // // AuditObserver 是 EventObserver 的一个实现,它"监听"引擎内部事件, // 将关键操作事件翻译为 AuditEntry 并写入 AuditSink. // // 这样设计的好处: // 1. 工具层(FileWrite/FileEdit)无需直接依赖 AuditSink,保持接口单一 // 2. 引擎层通过 CompositeObserver 把 AuditObserver 与其他 Observer 叠加 // 3. 新增审计点只需在 Event() 里加一个 case,不需要改工具代码 // // 监听的事件: // - "operation_recorded" (status=success/failed) - 工具执行完成 // - "secret_scan_blocked" - 写入被秘密扫描拦截(工具主动上报) // // 升华改进(ELEVATED): 早期方案通过分析服务(Statsig)记录 tengu_file_operation 事件, // 强依赖网络和 Statsig 账号.AuditObserver 在本地完成转换, // 零网络依赖,离线场景同样有效. // 替代方案:<在 OperationLog.Record() 内部直接写 AuditSink> // - 否决原因:将审计落地逻辑耦合进 OperationLog,违反单一职责. import ( "strconv" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/security" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // AuditObserver 实现 EventObserver,将引擎事件翻译为审计记录. // 通过 NewCompositeObserver(engineObserver, auditObserver) 插入事件流. type AuditObserver struct { sink security.AuditSink sessionID string // 当前会话 ID(可选) // registry 用于查询工具声明的 AuditOperation (见 SetToolRegistry 注释). // nil 时 operationFromTool 回退到 fallbackOperationFromTool 启发式. registry *tools.Registry // includeToolInput 由 Config.AuditIncludeToolInput 驱动.false 时 handleOperationRecorded // 不写入 Extra["tool_input"], 对齐早期实现 默认行为 (betaSessionTracing 双 env 才启). includeToolInput bool // inputMaxBytes <=0 = 不截断; >0 = 超限时截断并加 Extra["tool_input_truncated"]="true". inputMaxBytes int } // NewAuditObserver 创建 AuditObserver. // sink 是审计落地后端;sessionID 可以为空(单用户 CLI 场景通常不需要). // // 构造后通常立即调用 SetToolRegistry 注入工具注册表, 启用工具自描述的 // AuditOperation 查询 (L1191 修复).未调用 SetToolRegistry 时回退到 // 硬编码启发式, 行为等同重构前, 零回归. func NewAuditObserver(sink security.AuditSink, sessionID string) *AuditObserver { if sink == nil { sink = security.NoopAuditSink{} } return &AuditObserver{sink: sink, sessionID: sessionID} } // SetToolRegistry 后置注入工具注册表, 启用工具自描述的 AuditOperation 查询. // // 升华改进(ELEVATED): P1 L1191 修复 - 原 operationFromTool 是硬编码 switch, // 违反开闭原则, 消费者加新工具 (DatabaseWrite / SlackSend 等) 无法正确生成 // 审计 operation 标签.改为运行时查询 Registry 的 Metadata.AuditOperation, // 消费者在工具注册时声明一次, 审计系统零侵入扩展. // // 设计选择: 后置注入而非构造参数, 为了保持 NewAuditObserver 签名不变 // (避免破坏现有 8 个 call site + doc 例子), 同时让 engine.go 初始化时 // 在构造后调用一次 SetToolRegistry 完成注入. // // 替代方案 A: <改 NewAuditObserver 为 3 参数> - 否决: 破坏现有 API. // 替代方案 B: - 否决: 过度设计, // // 单个可选依赖不值得引入 option 类型. // // 线程安全: 仅在 engine 初始化阶段调用一次, 后续只读.如果未来有运行时 // 动态替换 registry 的需求, 需要加锁 (但目前没有此需求). func (a *AuditObserver) SetToolRegistry(r *tools.Registry) { a.registry = r } // SetInputAudit 配置工具 Input 审计策略 (L1223 修复). // // include: 是否把工具 Input 写入 Extra["tool_input"].默认 false. // maxBytes: 写入上限 (<=0 不截断).超限会截断并加 Extra["tool_input_truncated"]="true". // // 前置条件: 调用方 (engine.go) 负责在 OperationEntry.Input 进入 OperationLog 前 // 完成 SecretStore.Redact 脱敏, 与 Output 脱敏走同一路径 (engine.go:3769 precedent). // AuditObserver 不重复脱敏, 避免跨包依赖 SecretStore. // // 替代方案 A: - 否决: 与 Output 现有 // "入口统一脱敏" 原则冲突, 也让 AuditObserver 跨越单一职责. // 替代方案 B: <加 InputRedactor 接口允许多实现叠加> - 否决: SecretStore.Redact 已 // 覆盖 99% 实际凭据 (value 匹配), 叠 SecretGuard regex 兜底边际收益太低. // // 线程安全: 与 SetToolRegistry 同约定, 仅在 engine 初始化阶段调用一次. func (a *AuditObserver) SetInputAudit(include bool, maxBytes int) { a.includeToolInput = include a.inputMaxBytes = maxBytes } // Event 接收引擎事件,转换为审计记录. // // 精妙之处(CLEVER): 用 switch 而非 if-else chain-- // 随着事件类型增加,switch 可读性优于连续 if,且 Go 编译器对 switch 有优化. // 未匹配的事件静默忽略,不影响其他 Observer 的处理. func (a *AuditObserver) Event(name string, data map[string]any) { switch name { case "operation_recorded": a.handleOperationRecorded(data) case "secret_scan_blocked": a.handleSecretScanBlocked(data) } // 其他事件静默忽略--AuditObserver 只关心操作审计相关事件 } // Error 接收错误事件,记录到审计日志(outcome=error). func (a *AuditObserver) Error(err error, ctx map[string]any) { // 历史包袱(LEGACY): 目前把 observer.Error 也记录为审计条目, // 但 error 事件粒度太粗(不一定是文件操作错误). // 未来改进:只在 ctx 中有 tool_name 和 resource 字段时才记录. toolName, _ := ctx["tool"].(string) resource, _ := ctx["resource"].(string) if toolName == "" && resource == "" { return // 非工具相关的错误不记录审计 } reason := "" if err != nil { reason = err.Error() } //nolint:errcheck - 审计写入失败不应阻断错误处理流程 a.sink.Write(security.AuditEntry{ SessionID: a.sessionID, Timestamp: time.Now().UTC(), ToolName: toolName, Operation: extractString(ctx, "operation"), Resource: resource, Outcome: "error", Reason: reason, }) } // handleOperationRecorded 处理 "operation_recorded" 事件. // 对应 OperationLog.Record() 触发的事件. func (a *AuditObserver) handleOperationRecorded(data map[string]any) { toolName := extractString(data, "tool") status := extractString(data, "status") messageID := extractString(data, "message_id") if toolName == "" { return // 没有工具信息,不记录 } // status → outcome 映射 // 精妙之处(CLEVER): "rolled_back" 也是 allowed(操作成功了,后来被撤销), // 与 "failed"(操作本身失败)语义不同.合规审计需要区分两者. outcome := "allowed" if status == "failed" { outcome = "blocked" } entry := security.AuditEntry{ SessionID: a.sessionID, TurnNumber: extractInt(data, "turn_number"), Timestamp: time.Now().UTC(), ToolName: toolName, Operation: a.operationFromTool(toolName), Resource: extractString(data, "resource"), Outcome: outcome, Reason: extractString(data, "reason"), } // message_id 和 input_len 存入 Extra(不是核心字段,但调试时有用) if entry.Extra == nil { entry.Extra = make(map[string]string) } if messageID != "" { entry.Extra["message_id"] = messageID } if inputLen := extractInt(data, "input_len"); inputLen > 0 { entry.Extra["tool_input_bytes"] = strconv.Itoa(inputLen) } // L1223 修复: Input 内容审计 (opt-in). // 前提: tool_input 已由 engine.go:3806 附近经 SecretStore.Redact 脱敏. // 默认 false 保守对齐早期实现 双 env 才开启的策略. if a.includeToolInput { if input := extractString(data, "tool_input"); input != "" { if a.inputMaxBytes > 0 && len(input) > a.inputMaxBytes { input = input[:a.inputMaxBytes] entry.Extra["tool_input_truncated"] = "true" } entry.Extra["tool_input"] = input } } //nolint:errcheck - 审计写入失败不应阻断正常操作流程 a.sink.Write(entry) } // handleSecretScanBlocked 处理 "secret_scan_blocked" 事件. // 由工具层(FileWrite/FileEdit)在秘密扫描拦截时主动触发. func (a *AuditObserver) handleSecretScanBlocked(data map[string]any) { //nolint:errcheck a.sink.Write(security.AuditEntry{ SessionID: a.sessionID, TurnNumber: extractInt(data, "turn_number"), Timestamp: time.Now().UTC(), ToolName: extractString(data, "tool"), Operation: "write", Resource: extractString(data, "path"), Outcome: "blocked", // ELEVATED: 早期方案把 rule_ids(如 "github-pat,aws-access-token")直接写入审计日志, // 等于把规则库暴露给所有有日志读权限的角色--攻击者可借此枚举检测盲区, // 按规则名精心构造绕过 payload. // 改为只记录命中数量(count),审计目的("写入被拦截")完全满足, // 规则名留在内存中,不落盘,不出网. // 替代方案:<加密存储 rule_ids,解密密钥由 AuditSink 持有> // - 否决原因:引入密钥管理复杂度,且 count 已足够 SLA 报警和趋势分析. Reason: "secret_detected:count=" + strconv.Itoa(extractInt(data, "count")), }) } // ─── 辅助函数 ───────────────────────────────────────────────────────────────── // extractString 从 map 中安全提取字符串值(不存在或类型不符时返回空串). func extractString(data map[string]any, key string) string { if v, ok := data[key]; ok { if s, ok := v.(string); ok { return s } } return "" } // extractInt 从 map 中安全提取整数值(不存在或类型不符时返回 0). func extractInt(data map[string]any, key string) int { if v, ok := data[key]; ok { switch n := v.(type) { case int: return n case int64: return int(n) case float64: return int(n) } } return 0 } // operationFromTool 返回工具的审计操作类型. // // 升华改进(ELEVATED): P1 L1191 修复 - 查询顺序: // 1. Registry.MetadataFor(toolName).AuditOperation (工具自声明, 优先) // 2. fallbackOperationFromTool(toolName) (启发式, 兜底) // // 消费者工具声明 AuditOperation 后直接走路径 1; 未声明或 registry==nil // 时走路径 2 (与重构前行为等价, 零回归). // // 精妙之处(CLEVER): 方法而非包级函数 - 需要访问 a.registry, 且未来可能 // 加 observer-level 的覆盖映射 (比如 a.operationOverrides map), 方法形式 // 为扩展预留空间, 包级函数改为 method 后只有这一处内部调用受影响 (由 // handleOperationRecorded 直接调用, 无第三方调用, 破坏面为零). func (a *AuditObserver) operationFromTool(tool string) string { if a.registry != nil { if meta, ok := a.registry.MetadataFor(tool); ok && meta.AuditOperation != "" { return meta.AuditOperation } } return fallbackOperationFromTool(tool) } // fallbackOperationFromTool 是硬编码启发式兜底, 用于: // 1. 未设置 Registry 的场景 (测试 / 嵌入式) // 2. 工具未声明 Metadata.AuditOperation 的场景 // // 历史包袱(LEGACY): 本函数保留为 trip-wire - 如果未来有"新工具都应声明" // 的要求, 可以在 fallback 命中时打 warning / panic 驱动全量迁移. // 当前保留零回归兜底, 不打警告. // // 精妙之处(CLEVER): 不依赖工具实现--通过命名约定推断. // "Write"/"Edit" = 写; "Read"/"Glob"/"Grep" = 读; "Bash" = 执行; 其他 = invoke. // 这避免了工具层和审计层的强耦合. func fallbackOperationFromTool(tool string) string { switch tool { case "Write", "FileWrite": return "write" case "Edit", "FileEdit": return "edit" case "Read", "FileRead", "Glob", "Grep": return "read" case "Bash": return "execute" default: return "invoke" } }