# INF-2 操作历史/回滚 - 完整调研报告 > 内部文档.指导 INF-2 的实现设计. ## 执行摘要 本报告基于对业界操作历史和回滚机制的深入调研,针对 Flyto Agent Engine 的三大场景(编程,仓储,企业API)提供设计建议.**核心观点**:不存在"一个方案适应所有场景"的银弹,应采取**分层混合架构**: - **编程场景**:采用 Git 风格的**内容寻址 + 快照**(当前已有基础) - **仓储场景**:结合**审计日志 + 补偿事务 + 版本向量**,优先保障数据一致性和防冲突 - **企业 API**:基于 **Saga 补偿 + 事件流**,因不可逆操作多 --- ## 第一部分:业界方案深度分析 ### 1. 文件操作历史 #### 1.1 Git Object Model(内容寻址存储) **核心原理** Git 是内容寻址文件系统(Content-Addressable Filesystem): - 每个对象(blob/tree/commit/tag)由其内容的 SHA-1 哈希标识 - 三层模型:blob(文件内容) → tree(目录结构) → commit(快照) - **关键特性**:相同内容自动去重(同一个 index.js 在多个 commit 中只存一份) 存储结构: ``` .git/objects/ ├── ab/ # 哈希前 2 位作为目录 │ └── cdef1234... # 剩余 38 位作为文件名 └── ... ``` **优点**: - 极端高效的存储利用(内容重复自动去重) - 完全的不可篡改性(改一个字节,哈希完全不同) - 天然支持时间旅行(DAG 结构) - 跨系统兼容(只用内容标识,不依赖文件路径) **缺点**: - 查询低效:找某个文件某个版本需要遍历整个 commit tree - 删除策略复杂:垃圾回收(GC)需要追踪可达对象 - 冲突解决复杂:需要手动合并 **对 Flyto 的适配** 早期方案 采用了简化版:`hash@v{n}` 命名方案,基于内容哈希创建快照文件,绑定消息 ID.建议 Flyto 沿用这个方案: ``` ~/.flyto/history/{project-hash}/ ├── {file-content-hash}@v1.bak ├── {file-content-hash}@v2.bak └── metadata.json ``` #### 1.2 Copy-on-Write(COW)快照 **核心原理** 修改时不覆盖原文件,而是创建新版本.Btrfs/ZFS 文件系统在块级别实现: 1. 创建快照:瞬间操作(O(1)),仅记录时间点 2. 修改时:新数据写入新块,元数据指向新块 3. 原快照:保持指向旧块 **优点**:快照创建极快,空间高效,天然原子性 **缺点**:数据库不兼容(随机写导致碎片化),文件系统依赖(不跨平台) **对 Flyto 的适配**:不推荐直接用.Flyto 需跨平台,但可以在应用层模拟 COW 的优点(延迟复制,差异记录). #### 1.3 文件方案总评 | 方案 | 文件历史 | 回滚 | 存储效率 | 跨平台 | 查询性能 | |------|---------|------|---------|--------|---------| | Git Model | ★★★★★ | ★★★★ | ★★★★★ | ★★★★★ | ★★ | | COW 快照 | ★★★★ | ★★★★★ | ★★★★★ | ❌ | ★★★ | | 朴素备份 | ★★ | ★★★ | ★ | ★★★★★ | ★★★ | **推荐**:Git Model + 应用层 diff 记录 --- ### 2. 数据库操作历史 数据库场景是仓储系统最关键的.库存数据改错会导致发错货--真金白银的损失. #### 2.1 Change Data Capture(CDC) **核心原理** 捕获数据库的行级变更事件: ``` MySQL 二进制日志: [INSERT] table=inventory, id=SKU-001, qty=100 [UPDATE] table=inventory, id=SKU-001, qty=100 → qty=95 [DELETE] table=inventory, id=SKU-002 ``` 主流工具:Debezium(业界标准),Maxwell(MySQL 专用) **优点**:零业务代码改动,不漏事件,实时,多消费者 **缺点**: - 只捕获数据变化,不捕获"为什么改"(语义丢失) - 和 Agent 工具调用的关联性弱 - 恢复操作复杂 **对 Flyto 的适配**:不直接依赖 Debezium,而是在**工具层**做类似的事--截获 SQL 执行前后的状态,记录 `(tool_call_id, sql, before_state, after_state)`.这等于"应用层的 CDC",带上下文. #### 2.2 Event Sourcing(事件溯源) **核心原理** 不存储最终状态,存储**所有事件序列**.任意时刻的状态 = 重放从起始到该时刻的所有事件. ``` 传统方式: inventory表: SKU-001 qty=95 Event Sourcing 方式: [1] InventoryCreated{sku="SKU-001", qty=100} [2] StockDecremented{sku="SKU-001", qty=-5, reason="发货"} 查询当前库存:重放所有事件 → qty=95 ``` **优点**:完整审计,任意时间点回滚,因果关系清晰 **缺点**:查询性能极差(需重放),存储膨胀,Schema 演化困难 **对 Flyto 的适配**:Event Sourcing 的核心思想("每个操作是一个事件,可重放")非常适合.但不需要完全采用(存储爆炸),只需记录"操作序列"和"补偿操作". #### 2.3 Audit Trail(审计表)- 推荐 **核心原理** 最简单,最实用:每次修改时在审计表记录 before/after 值. ```sql CREATE TABLE inventory_audit_log ( id BIGINT AUTO_INCREMENT PRIMARY KEY, inventory_id BIGINT, action VARCHAR(20), -- INSERT/UPDATE/DELETE old_qty INT, new_qty INT, changed_by VARCHAR(50), -- Agent ID changed_at TIMESTAMP, reason VARCHAR(500) -- 操作原因 ); ``` **优点**:实现极简,查询高效,灵活,互联网公司标配 **缺点**:双写复杂性,性能开销(每次修改 2 个写操作) **对 Flyto 的适配**:最推荐.Flyto 内部的 OperationLog: ```go type OperationLog struct { ID int64 MessageID string TurnNumber int ToolName string ToolInput string // JSON ToolOutput string // JSON BeforeState string // 修改前状态快照 AfterState string // 修改后状态快照 UndoInfo string // 回滚指令 JSON ChangedAt time.Time AgentID string Status string // success / failed / compensated } ``` #### 2.4 Temporal Tables(时序表) SQL:2011 标准,PostgreSQL/SQL Server 原生支持.数据库自动维护历史版本,可查询 `AS OF SYSTEM TIME`. **优点**:数据库原生,自动维护,查询简单 **缺点**:特定数据库,不记录"为什么",回滚困难 **对 Flyto 的适配**:不作为主要方案(需跨数据库),但 PostgreSQL 用户可选. #### 2.5 数据库方案总评 | 方案 | 审计完整性 | 查询性能 | 回滚难度 | 跨平台 | |------|----------|---------|--------|--------| | CDC | ★★ | ★★★★★ | ★★★ | ★★★★ | | Event Sourcing | ★★★★★ | ❌ | ★★★★★ | ★★★★★ | | Audit Trail | ★★★★ | ★★★★ | ★★★ | ★★★★★ | | Temporal Tables | ★★★ | ★★★★ | ★★ | ★★ | **推荐**:Audit Trail 作为基础,结合应用层的 Event Sourcing 思想 --- ### 3. API 操作回滚 API 场景特点:第三方系统,不可信,不可逆操作多. #### 3.1 Saga 模式 - 推荐 **核心原理** 分布式事务中每个步骤有对应的**补偿操作**.Orchestration 模式(集中协调): ``` Saga 协调者: ① 调用 PaymentService.charge(100) → OK ② 调用 OrderService.create(orderId) → OK ③ 调用 ShippingService.dispatch(orderId) → FAIL ④ 反向补偿: - OrderService.cancel(orderId) - PaymentService.refund(100) ``` **对 Flyto 的适配**:Flyto Agent 天然就是"协调者",每个工具调用是一个"步骤". ```go // 发货单处理 message { steps: [ {tool: "SQLQuery", input: "UPDATE inventory ...", undoInfo: "UPDATE inventory SET qty=qty+50 ..."}, {tool: "HTTPCall", input: "POST /shipment", undoInfo: "DELETE /shipment/xxx"}, {tool: "SendEmail", input: "to: customer@...", undoInfo: null}, // 不可逆 ] } // 回滚:倒序执行 undo for i = len(steps)-1; i >= 0; i-- { if steps[i].undoInfo != null { execute(steps[i].undoInfo) } else { log("WARNING: step %d 不可逆", i) } } ``` #### 3.2 Compensating Transaction(补偿事务) 不能真正"回滚",而是执行"反向业务操作": | 操作 | 补偿 | |------|------| | INSERT user | DELETE user | | TRANSFER A→B | TRANSFER B→A | | SendEmail | [不可补偿] | #### 3.3 Command Pattern(命令模式) GoF 设计模式:每个操作封装为对象,包含 execute() 和 undo(). **对 Flyto 的适配**:Tool 接口扩展 UndoInfo,但不强制所有工具都实现. --- ## 第二部分:关键设计问题 ### 问题 a)回滚粒度 | 粒度 | 描述 | 复杂度 | 推荐 | |------|------|--------|------| | 按操作 | 只撤销一个工具调用 | 中 | 不推荐(可能留下不一致状态) | | **按消息** | 撤销整条消息的所有操作 | 中 | **推荐**(原子性好,用户易理解) | | 按会话 | 撤销多条消息 | 高 | 因果链追踪困难 | | 按检查点 | 回到标记的稳定点 | 中 | 可选扩展 | **推荐按消息回滚**:语义清晰("撤销最后一条"),实现可行(倒序执行 UndoInfo). ### 问题 b)不可逆操作 **处理策略**: 1. 工具声明 `Irreversible: true` 2. 权限系统执行前警告用户 3. UndoInfo 不是反向操作,而是"人工操作清单" 4. 回滚时标记为"需要人工审查" ```go // SendEmail 的 UndoInfo UndoInfo: { action: "manual_followup_required", instruction: "邮件已发送无法撤回。建议发送补充邮件说明。", } ``` ### 问题 c)并发冲突 **场景**:Agent A 和 Agent B 同时修改同一行库存. **推荐乐观锁 + 版本号**: ```sql -- 库存表加版本号 ALTER TABLE inventory ADD COLUMN version INT DEFAULT 1; -- Agent A 更新(检查版本) UPDATE inventory SET qty=50, version=version+1 WHERE sku='SKU-001' AND version=100; -- 成功 → rows_affected=1 -- Agent B 更新(版本已变) UPDATE inventory SET qty=80, version=version+1 WHERE sku='SKU-001' AND version=100; -- 失败 → rows_affected=0 → 冲突检测 ``` 回滚时也检查版本号,防止覆盖其他 Agent 的修改. ### 问题 d)性能开销 **优化策略**: 1. **批量操作**:一次 INSERT INTO backup SELECT * FROM ... WHERE id IN (...) 而非逐行 2. **快照而非逐行备份**:O(1) 时间标记时间点 3. **异步审计日志**:后台写,不阻塞主线程 4. **内容去重**:SHA256 哈希,相同内容只存一份 5. **增量审计**:只记录改动的列,不记录整行 ### 问题 e)存储策略 **分级保留**: - 最近 7 天:全量备份,随时回滚(热数据) - 7 天 - 3 个月:每天一个快照(温数据) - 3 个月 - 5 年:每月一个快照,只保留关键表(冷数据) **垃圾回收**:每周运行,扫描活跃消息引用的备份,删除孤立版本. --- ## 第三部分:Flyto 三大场景具体实现 ### 场景 1:编程(文件编辑) ```go // FileEdit 执行前自动备份 func (t *FileEditTool) Execute(...) { backup := createBackup(filePath) // SHA256 内容寻址 result := executeEdit(...) result.UndoInfo = &UndoInfo{ ToolName: "FileRestore", Input: map[string]interface{}{"path": filePath, "backup_id": backup.ID}, } return result } ``` ### 场景 2:仓储(数据库库存) ```go // SQLQuery 执行前记录 before 状态 func (t *SQLQueryTool) Execute(...) { beforeState := captureState(sql) result := db.Exec(sql) afterState := captureState(sql) undoSQL := generateReverseSQL(beforeState, afterState) // 记录审计日志 auditLog.Insert(&OperationLog{ BeforeState: beforeState, AfterState: afterState, UndoInfo: undoSQL, }) result.UndoInfo = &UndoInfo{ToolName: "SQLQuery", Input: undoSQL} return result } ``` ### 场景 3:企业 API(Saga 补偿) ```go // Engine.Rollback 倒序执行补偿 func (e *Engine) Rollback(ctx context.Context, messageID string) error { ops := e.log.GetOperations(messageID) for i := len(ops) - 1; i >= 0; i-- { if ops[i].UndoInfo == nil { // 不可逆操作 → 生成人工操作清单 e.generateManualActionChecklist(ops[i]) continue } // 执行补偿 tool, _ := e.tools.Get(ops[i].UndoToolName) tool.Execute(ctx, ops[i].UndoInput, nil) } e.log.UpdateStatus(messageID, "compensated") return nil } ``` --- ## 第四部分:实现路线图 ### Phase 1:编程场景(2 周) - FileEdit/FileWrite 前自动备份 - 内容寻址存储(SHA256 去重) - FileRestore 工具 - OperationLog 内存模型 - 单消息回滚 API ### Phase 2:基础架构(3 周) - OperationLog 持久化(SQLite 或文件) - Observer 事件埋点 - Rollback API 完整实现 - 测试:文件回滚,并发修改,性能基准 ### Phase 3:仓储场景(4 周) - SQLQuery 工具扩展(before/after 状态捕获) - 审计表设计 - 版本号乐观锁集成 - Saga 补偿逻辑 - 并发冲突检测 ### Phase 4:企业 API 场景(3 周) - HTTPCall 工具 UndoInfo 生成 - 不可逆操作检测与标记 - 人工操作清单生成 ### Phase 5:存储优化(2 周) - 内容去重 - 分级保留策略 - 垃圾回收 ### Phase 6:生产就绪(2 周) - 性能测试(大表回滚) - 恢复测试(数据一致性) - 文档和监控告警 --- ## 第五部分:对现有代码的影响 ### Tool 接口扩展 ```go type Result struct { Output string IsError bool Data interface{} UndoInfo *UndoInfo // 新增 } type UndoInfo struct { ToolName string Input map[string]interface{} Description string Irreversible bool // 标记不可逆 } ``` ### Engine 新增 API ```go func (e *Engine) Rollback(ctx context.Context, messageID string) error func (e *Engine) ListSnapshots(ctx context.Context) ([]*Snapshot, error) func (e *Engine) CanRollback(ctx context.Context, messageID string) (bool, []string) ``` ### 风险与缓解 | 风险 | 缓解 | |------|------| | 大表回滚性能差 | 分批执行,异步化 | | 补偿失败无法恢复 | 人工清单,告警,重试 | | 存储爆炸 | GC,分级保留,去重 | | 并发冲突频繁 | 版本号,监控,重试 | | 不可逆操作丢失 | 预先声明,强制审批 | --- ## 参考资源 - [Git Internals - Git Objects](https://git-scm.com/book/en/v2/Git-Internals-Git-Objects) - [Event Sourcing - Redpanda Guide](https://www.redpanda.com/guides/event-stream-processing-event-sourcing-database) - [Debezium CDC](https://debezium.io/) - [Saga Pattern - Microsoft Learn](https://learn.microsoft.com/en-us/azure/architecture/patterns/saga) - [PostgreSQL MVCC](https://www.postgresql.org/docs/7.1/mvcc.html) - [Version Vectors for Conflict Resolution](https://shanoj.com/2024/11/14/distributed-systems-design-pattern-version-vector-for-conflict-resolution-supply-chain-use/)