package engine // OperationLog 统一操作日志 -- 记录所有工具操作,支持按消息 ID 回滚. // // 升华改进(ELEVATED): 统一文件/数据库/API 三种操作的日志格式. // 数据库和 API 的回滚逻辑由各自的工具实现(通过 UndoInfo), // 引擎只负责编排(倒序执行补偿操作). // 替代方案:每种操作类型独立管理回滚(散乱,不可组合). // // 和 FileHistory 的关系: // - FileHistory 负责"文件内容的物理备份"(磁盘层面) // - OperationLog 负责"所有操作的逻辑记录"(编排层面) // - 文件回滚时两者协同:OperationLog 提供顺序,FileHistory 提供内容 import ( "context" "encoding/json" "fmt" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // 操作状态常量 const ( opStatusSuccess = "success" opStatusFailed = "failed" opStatusRolledBack = "rolled_back" ) // 观察者事件名常量 const ( opEventRecorded = "operation_recorded" opEventRolledBack = "operation_rolled_back" ) // OperationLog 统一操作日志. type OperationLog struct { entries []*OperationEntry observer EventObserver mu sync.Mutex } // OperationEntry is a single operation record in OperationLog. // // Field consumption forms: // - ID / MessageID / ToolName / Input / UndoInfo / Timestamp / Status // are consumed by the engine rollback path (RollbackMessage walks // entries, dispatches UndoInfo via UndoExecutor) AND emitted in the // opEventRecorded observer event. // - Output / TurnNumber are external pull-API fields: consumed by // audit consumers via OperationLog.GetByMessage() / .GetByMessageLocked(). // The engine's rollback path itself does not read them. Note the // observer event opEventRecorded also carries a parallel turn_number // key (read by AuditObserver in audit_observer.go) -- the observer // path and the OperationEntry pull path are two disjoint sinks, not // redundant: observer is push, OperationEntry is pull-on-demand. // - Truncated / StoredPath see field-level godoc below. // // OperationEntry 是 OperationLog 中一条操作记录. // // 字段消费形态: // - ID / MessageID / ToolName / Input / UndoInfo / Timestamp / Status // 由引擎 rollback 路径消费 (RollbackMessage 遍历 entries, 经 // UndoExecutor 派发 UndoInfo), 同时在 opEventRecorded observer // 事件中 emit. // - Output / TurnNumber 是外部调取 (pull) API 字段: 审计消费者经 // OperationLog.GetByMessage() / .GetByMessageLocked() 读取. 引擎自身 // rollback 路径不读. 注意 observer 事件 opEventRecorded 同时携带 // parallel turn_number key (由 audit_observer.go 的 AuditObserver 读), // observer 路径和 OperationEntry 调取路径是两个互不相交的 sink, 非 // 冗余: observer 是推送 (push), OperationEntry 是按需调取 (pull). // - Truncated / StoredPath 见字段级 godoc. type OperationEntry struct { ID string // 操作唯一 ID(通常等于 tool_use ID) MessageID string // 关联的消息 ID TurnNumber int // 对话轮次号 (pull API, 外部审计经 GetByMessage 读) ToolName string // 工具名称 Input json.RawMessage // 工具输入参数 Output string // 工具输出 (pull API, 可能是 orchestrator 截断后的短摘要, 见 Truncated) UndoInfo *tools.UndoInfo // 撤销信息(可选) Timestamp time.Time // 操作时间 Status string // opStatusSuccess / opStatusFailed / opStatusRolledBack // Truncated marks that Output is a short summary -- the full tool // result was persisted to StoredPath by the orchestrator. Audit // consumers walking OperationLog need this flag to avoid treating // summary as the authoritative record. // // Truncated 标记 Output 是短摘要 -- 完整工具结果已由 orchestrator 落盘到 // StoredPath. 走 OperationLog 的审计消费者据此避免把摘要当作权威记录. Truncated bool // StoredPath is the persistence location of the full tool result. // Path shape is consumer-defined (local path / sandbox path / object // key). Empty when Truncated is false. SECURITY: same constraints as // flyto.ToolResultEvent.StoredPath -- treat as caller-specific data. // // StoredPath 是完整工具结果的持久化位置. Path shape 由消费层定义 (本地 // 路径 / sandbox 路径 / 对象 key). Truncated=false 时为空串. 安全: 约束 // 同 flyto.ToolResultEvent.StoredPath -- 当调用方特定数据对待. StoredPath string } // NewOperationLog 创建统一操作日志. func NewOperationLog(observer EventObserver) *OperationLog { if observer == nil { observer = &NoopObserver{} } return &OperationLog{ entries: make([]*OperationEntry, 0), observer: observer, } } // Record 记录一次操作. func (l *OperationLog) Record(entry *OperationEntry) { l.mu.Lock() defer l.mu.Unlock() if entry.Timestamp.IsZero() { entry.Timestamp = time.Now() } l.entries = append(l.entries, entry) l.observer.Event(opEventRecorded, map[string]any{ "id": entry.ID, "message_id": entry.MessageID, "tool": entry.ToolName, "status": entry.Status, "has_undo": entry.UndoInfo != nil, "input_len": len(entry.Input), // L1223 修复: Input 透传给 AuditObserver, 消费侧按 Config.AuditIncludeToolInput 决定是否落地. // 约定: 调用方 (engine.go:3806 附近) 必须传入已由 SecretStore.Redact 脱敏的 Input. // OperationLog 保持通用结构不跨包依赖 SecretStore, 与 Output 同层入口统一脱敏. "tool_input": string(entry.Input), }) } // GetByMessage 获取某消息的所有操作. func (l *OperationLog) GetByMessage(messageID string) []*OperationEntry { l.mu.Lock() defer l.mu.Unlock() var result []*OperationEntry for _, e := range l.entries { if e.MessageID == messageID { result = append(result, e) } } return result } // EntryCount 返回操作记录总数(主要用于测试). func (l *OperationLog) EntryCount() int { l.mu.Lock() defer l.mu.Unlock() return len(l.entries) } // RollbackMessage 按消息 ID 回滚(倒序执行 UndoInfo). // 精妙之处(CLEVER): Saga 补偿模式--倒序执行补偿操作,保证逆序一致性. // 如果某个补偿失败,继续执行后续补偿(best-effort),不中断. func (l *OperationLog) RollbackMessage(ctx context.Context, messageID string, executor UndoExecutor) error { l.mu.Lock() entries := l.GetByMessageLocked(messageID) l.mu.Unlock() if len(entries) == 0 { return fmt.Errorf("operation_log: no operations found for message %s", messageID) } var lastErr error rolledBack := 0 // 倒序执行补偿操作 for i := len(entries) - 1; i >= 0; i-- { entry := entries[i] if entry.UndoInfo == nil { continue // 没有撤销信息,跳过 } if entry.UndoInfo.Irreversible { l.observer.Event("operation_rollback_skip_irreversible", map[string]any{ "id": entry.ID, "tool": entry.ToolName, "manual_guide": entry.UndoInfo.ManualGuide, }) continue } if err := executor.ExecuteUndo(ctx, entry.UndoInfo); err != nil { lastErr = fmt.Errorf("operation_log: undo failed for %s (%s): %w", entry.ID, entry.ToolName, err) l.observer.Error(lastErr, map[string]any{ "id": entry.ID, "message_id": messageID, "tool": entry.ToolName, }) continue } // 标记为已回滚 l.mu.Lock() entry.Status = opStatusRolledBack l.mu.Unlock() rolledBack++ } l.observer.Event("operation_rollback_complete", map[string]any{ "message_id": messageID, "rolled_back": rolledBack, "total": len(entries), }) return lastErr } // GetByMessageLocked 获取某消息的所有操作(必须在持有锁的情况下调用或无锁场景). // 注意:RollbackMessage 内部使用,避免死锁. func (l *OperationLog) GetByMessageLocked(messageID string) []*OperationEntry { var result []*OperationEntry for _, e := range l.entries { if e.MessageID == messageID { result = append(result, e) } } return result } // UndoExecutor 撤销执行器(Engine 提供). // 升华改进(ELEVATED): 用接口而非函数--方便测试时 mock,也方便跨进程执行. // 替代方案:直接传 func(简单但不可测试,不可序列化). type UndoExecutor interface { ExecuteUndo(ctx context.Context, undo *tools.UndoInfo) error }