package engine // FileHistory 文件操作历史 -- 内容寻址备份 + 按消息回滚. // // 在 FileEdit/FileWrite 执行前自动备份原文件,支持按消息回滚. // // 精妙之处(CLEVER): 内容寻址存储(和 Git 一样). // 文件内容做 SHA256 → hash 前 16 字符作为备份文件名. // 相同内容不重复存储(编辑 10 次但 3 次内容相同 = 只存 7 份). // // 备份绑定消息 ID:回滚时按消息 ID 找到该消息期间修改的所有文件, // 全部恢复到修改前的状态. // // 历史包袱(LEGACY): 数据库/API 的备份逻辑各不相同(Neon 快照,API 幂等键), // 但文件备份是最基础的--几乎所有 Agent 操作最终都会修改文件. // 所以文件历史先行,数据库/API 后续接入. import ( "context" "crypto/sha256" "fmt" "io/fs" "os" "path/filepath" "sort" "sync" "time" ) // FileHistoryView 是文件历史的只读查询接口, 供消费者绑定 Engine.FileHistoryView() 返回值. // // 升华改进(ELEVATED): L1186 修复 - 窄接口提取. // // 历史背景: Engine.FileHistoryRef() 返回具体 *FileHistory 指针, 消费者查询 // "能否回滚这轮对话" 时必须绑定具体类型, 不利于测试 mock 和 API 稳定性声明. // 此接口明确声明 "我们对消费者承诺这几个只读查询方法稳定" -- 未来 FileHistory // 内部重构时可以在不破坏该契约的前提下自由演化. // // 为什么不放 Rollback/Prune: 这两个是执行/管理动作, 应该通过 Engine.Rollback() // 走完整的 context + 事件发射 + 错误处理路径, 不应让消费者通过 FileHistoryView // 绕过引擎直接调用.View 的命名本身就暗示 "只读观察" 语义. // // 同源 precedent: // - memory.Backupper (1 方法, 写侧, for memory 消费者) // - tools/builtin.FileHistoryRecorder (2 方法, 写侧, for tool 消费者) // - FileHistoryView (2 方法, 读侧, 本接口, for 外部消费者的 "查询" 场景) // // 三个接口构成 FileHistory 面对不同消费场景的 role interface 分层, // 每个消费者只绑定他需要的窄契约, 不暴露完整的 FileHistory 结构. // // 替代方案 A: 改 FileHistoryRef() 签名为 FileHistoryView 返回类型 - 否决: // // 破坏现有 API (docs/configuration.md 有示例), 成本高于收益. // // 替代方案 B: 把 Rollback/Prune 也放进来 - 否决: // // 执行类方法应该只通过 Engine 层调用, 不给消费者 "半侧门" 绕过上下文管理. // // Shape: pull. Consumer calls Engine.FileHistoryView() to obtain the // read-only view, then queries CanRollback / SnapshotCount on demand // for UI rendering, monitoring, or audit. // // 形态: 调取 (pull). 消费者调 Engine.FileHistoryView() 拿只读视图, 按需 // 查 CanRollback / SnapshotCount 供 UI 渲染 / 监控 / 审计. type FileHistoryView interface { // CanRollback 检查指定 messageID 是否存在可回滚的快照. // 返回: (是否可回滚, 该次提交涉及的文件路径列表) CanRollback(messageID string) (bool, []string) // SnapshotCount 返回当前持有的快照总数. // 用于监控 / 诊断 / 容量告警场景. SnapshotCount() int } // FileHistory 文件操作历史管理器. type FileHistory struct { baseDir string // ~/.flyto/history// snapshots []*FileSnapshot // 所有快照(按时间顺序) maxSnapshots int // 最大快照数,默认 100 observer EventObserver // 可观测性接口 mu sync.Mutex } // FileSnapshot 一次消息期间的文件快照. type FileSnapshot struct { MessageID string // 关联的消息 ID Timestamp time.Time // 快照创建时间 FileBackups map[string]*FileBackup // filePath → backup } // FileBackup 单个文件的备份信息. type FileBackup struct { ContentHash string // SHA256 前 16 字符 BackupPath string // 备份文件的磁盘路径 OriginalExists bool // 原文件是否存在(新建文件为 false) FileMode fs.FileMode // 原文件的权限 } // NewFileHistory 创建文件历史管理器. // cwd 是当前工作目录,用于生成项目级的备份目录. func NewFileHistory(cwd string, observer EventObserver) *FileHistory { // 精妙之处(CLEVER): 用 cwd 的 hash 作为备份目录名-- // 不同项目的备份互不干扰,且路径固定可预测. cwdHash := fmt.Sprintf("%x", sha256.Sum256([]byte(cwd)))[:12] homeDir, _ := os.UserHomeDir() baseDir := filepath.Join(homeDir, ".flyto", "history", cwdHash) if observer == nil { observer = &NoopObserver{} } return &FileHistory{ baseDir: baseDir, snapshots: make([]*FileSnapshot, 0), maxSnapshots: 100, observer: observer, } } // NewFileHistoryWithDir 创建指定备份目录的文件历史管理器(主要用于测试). func NewFileHistoryWithDir(baseDir string, observer EventObserver) *FileHistory { if observer == nil { observer = &NoopObserver{} } return &FileHistory{ baseDir: baseDir, snapshots: make([]*FileSnapshot, 0), maxSnapshots: 100, observer: observer, } } // Backup 实现 memory.Backupper 接口. // 在写入记忆文件前调用,无需绑定消息 ID--使用固定的 "memory-autosave" 消息 ID. // // 精妙之处(CLEVER): memory.Save() 不知道当前消息 ID(记忆写入是异步的, // 可能发生在对话结束后),所以用固定的 "memory-autosave" 作为分组键. // 这意味着所有记忆文件备份归入同一个快照组--这是合理的, // 因为记忆文件的变化是增量的,用一个"记忆自动保存"组统一管理比每次生成随机 ID 更可预测. // 替代方案:让 Save() 传入 messageID(需要 Store 接口改动,破坏封装). // 替代方案:每次生成 UUID 作为 messageID(快照爆炸,每次 Save 都新建一个组). func (h *FileHistory) Backup(ctx context.Context, filePath string) error { return h.BeforeWrite(filePath, "memory-autosave") } // BeforeEdit 编辑前调用,备份原文件. // 如果文件不存在,返回错误(FileEdit 要求文件已存在). func (h *FileHistory) BeforeEdit(filePath string, messageID string) error { h.mu.Lock() defer h.mu.Unlock() if _, err := os.Stat(filePath); os.IsNotExist(err) { return fmt.Errorf("file_history: file not found: %s", filePath) } return h.createSnapshotEntry(filePath, messageID) } // BeforeWrite 写入前调用,备份原文件(如果存在). // 如果文件不存在(新建),记录一个 OriginalExists=false 的条目. func (h *FileHistory) BeforeWrite(filePath string, messageID string) error { h.mu.Lock() defer h.mu.Unlock() return h.createSnapshotEntry(filePath, messageID) } // Rollback 按消息 ID 回滚所有文件修改. // 倒序恢复该消息期间修改的所有文件到修改前的状态. func (h *FileHistory) Rollback(messageID string) error { h.mu.Lock() defer h.mu.Unlock() // 找到对应的快照 var snapshot *FileSnapshot var snapshotIdx int for i, s := range h.snapshots { if s.MessageID == messageID { snapshot = s snapshotIdx = i break } } if snapshot == nil { return fmt.Errorf("file_history: no snapshot found for message %s", messageID) } // 倒序恢复每个文件 var lastErr error restoredCount := 0 for filePath, backup := range snapshot.FileBackups { if !backup.OriginalExists { // 原文件不存在(是新建的),回滚 = 删除 if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { lastErr = fmt.Errorf("file_history: failed to remove %s: %w", filePath, err) h.observer.Error(lastErr, map[string]any{ "file": filePath, "message_id": messageID, }) continue } restoredCount++ continue } // 从备份恢复 backupData, err := os.ReadFile(backup.BackupPath) if err != nil { lastErr = fmt.Errorf("file_history: failed to read backup %s: %w", backup.BackupPath, err) h.observer.Error(lastErr, map[string]any{ "file": filePath, "backup_path": backup.BackupPath, "message_id": messageID, }) continue } // 确保目录存在 dir := filepath.Dir(filePath) if err := os.MkdirAll(dir, 0755); err != nil { lastErr = fmt.Errorf("file_history: failed to create dir %s: %w", dir, err) continue } if err := os.WriteFile(filePath, backupData, backup.FileMode); err != nil { lastErr = fmt.Errorf("file_history: failed to restore %s: %w", filePath, err) h.observer.Error(lastErr, map[string]any{ "file": filePath, "message_id": messageID, }) continue } restoredCount++ } // 标记快照为已回滚(从列表中移除) h.snapshots = append(h.snapshots[:snapshotIdx], h.snapshots[snapshotIdx+1:]...) h.observer.Event("file_history_rollback", map[string]any{ "message_id": messageID, "files_restored": restoredCount, "files_total": len(snapshot.FileBackups), }) return lastErr } // CanRollback 检查是否可以回滚指定消息的文件修改. // 返回是否可回滚,以及会影响的文件列表. func (h *FileHistory) CanRollback(messageID string) (bool, []string) { h.mu.Lock() defer h.mu.Unlock() for _, s := range h.snapshots { if s.MessageID == messageID { files := make([]string, 0, len(s.FileBackups)) for filePath := range s.FileBackups { files = append(files, filePath) } return true, files } } return false, nil } // SnapshotCount 返回当前快照数量(主要用于测试). func (h *FileHistory) SnapshotCount() int { h.mu.Lock() defer h.mu.Unlock() return len(h.snapshots) } // createSnapshotEntry 为指定文件在指定消息下创建备份条目. // 必须在持有锁的情况下调用. func (h *FileHistory) createSnapshotEntry(filePath string, messageID string) error { // 查找或创建该消息的快照 var snapshot *FileSnapshot for _, s := range h.snapshots { if s.MessageID == messageID { snapshot = s break } } if snapshot == nil { snapshot = &FileSnapshot{ MessageID: messageID, Timestamp: time.Now(), FileBackups: make(map[string]*FileBackup), } h.snapshots = append(h.snapshots, snapshot) h.enforceMaxSnapshots() } // 如果该文件已经在此快照中备份过,跳过(只保留第一次修改前的状态) if _, exists := snapshot.FileBackups[filePath]; exists { return nil } // 创建备份 backup, err := h.createBackup(filePath) if err != nil { return err } snapshot.FileBackups[filePath] = backup h.observer.Event("file_history_backup", map[string]any{ "file": filePath, "message_id": messageID, "content_hash": backup.ContentHash, "original_exists": backup.OriginalExists, }) return nil } // createBackup 内容寻址备份. // 精妙之处(CLEVER): SHA256 哈希作为文件名,相同内容自动去重. // 10 次编辑如果 3 次内容相同,只会存储 7 个不同的备份文件. func (h *FileHistory) createBackup(filePath string) (*FileBackup, error) { info, err := os.Stat(filePath) if os.IsNotExist(err) { // 文件不存在(将要新建),记录为 OriginalExists=false return &FileBackup{ OriginalExists: false, }, nil } if err != nil { return nil, fmt.Errorf("file_history: stat %s: %w", filePath, err) } // 读取文件内容 data, err := os.ReadFile(filePath) if err != nil { return nil, fmt.Errorf("file_history: read %s: %w", filePath, err) } // 计算内容哈希 hash := fmt.Sprintf("%x", sha256.Sum256(data))[:16] // 确保备份目录存在 if err := os.MkdirAll(h.baseDir, 0755); err != nil { return nil, fmt.Errorf("file_history: mkdir %s: %w", h.baseDir, err) } // 内容寻址:如果备份文件已存在(相同内容),跳过写入 backupPath := filepath.Join(h.baseDir, hash) if _, err := os.Stat(backupPath); os.IsNotExist(err) { // 升华改进(ELEVATED): 早期方案直接 os.WriteFile(backupPath, ...)-- // 若进程在写入中途崩溃,会留下一个截断的备份文件(内容与 hash 不符). // 下次 Stat 发现文件已存在就跳过,回滚时读出的却是损坏数据,静默地还原成垃圾. // 修复:先写 .tmp 临时文件,再 os.Rename 原子替换. // Rename 在同一文件系统内是原子 inode 替换(参考 session_snapshot.go 的正确模式): // 崩溃时旧文件完好或临时文件孤立,下次清理时 Prune 会自动扫描并删除 .tmp 文件. // 替代方案:写入后校验 SHA256(额外一次全量读,高频备份时性能影响明显). // 替代方案:直接写(现有行为,崩溃后损坏文件被内容寻址索引"认领",回滚必现 bug). tmpPath := backupPath + ".tmp" if err := os.WriteFile(tmpPath, data, 0644); err != nil { return nil, fmt.Errorf("file_history: write backup tmp %s: %w", tmpPath, err) } if err := os.Rename(tmpPath, backupPath); err != nil { _ = os.Remove(tmpPath) // 清理临时文件,忽略二次错误 return nil, fmt.Errorf("file_history: rename backup %s: %w", backupPath, err) } } return &FileBackup{ ContentHash: hash, BackupPath: backupPath, OriginalExists: true, FileMode: info.Mode(), }, nil } // enforceMaxSnapshots 确保快照数不超过上限. // 超过时删除最旧的快照(FIFO). func (h *FileHistory) enforceMaxSnapshots() { for len(h.snapshots) > h.maxSnapshots { // 移除最旧的快照 h.snapshots = h.snapshots[1:] } } // RetentionPolicy 定义备份文件的保留策略. // 满足任一条件的备份都会被删除(OR 逻辑). // // 升华改进(ELEVATED): 双维度保留策略(时间 + 版本数)满足不同场景需求-- // 研发机:MaxVersions=50 防止磁盘爆满; // 审计场景:MaxAgeDays=90 合规保留; // 资源受限设备:MaxAgeDays=7 + MaxVersions=20 双重限制. // 0 表示不限制该维度,可以灵活组合. // 替代方案:只支持 MaxAgeDays(无法控制高频修改文件的磁盘占用). // 替代方案:只支持 MaxVersions(时间无限延伸,审计场景不可用). type RetentionPolicy struct { // MaxAgeDays 超过 N 天的备份文件删除,0 表示不按时间清理. MaxAgeDays int // MaxVersions 每个原始文件保留最近 N 个版本,0 表示不按版本数清理. MaxVersions int } // DefaultRetentionPolicy 是 Engine.Close() 使用的默认保留策略. // // 精妙之处(CLEVER): 30 天 + 50 版本是经验值-- // 30 天覆盖一个迭代周期,足以回滚任何合理的"最近修改"; // 50 版本限制单文件备份上限,防止频繁修改的文件(如日志,配置)无限累积. // 两个条件 OR:任意一个满足就删除--更激进,磁盘占用更可控. var DefaultRetentionPolicy = RetentionPolicy{ MaxAgeDays: 30, MaxVersions: 50, } // Prune 清理超出策略的备份文件. // 在 Engine.Close() 时自动调用,也可以手动调用(如定期维护). // // 清理逻辑: // 1. 遍历 baseDir 目录下的所有备份文件 // 2. 按 mtime 排序,删除超过 MaxAgeDays 的文件 // 3. 备份文件是内容寻址的(hash 为文件名),无法直接按"原始文件"分组; // 因此 MaxVersions 按全局备份文件总数控制--超过上限时删除最旧的. // 4. 删除空目录 // // 精妙之处(CLEVER): 内容寻址备份的 MaxVersions 语义是"全局版本数上限"而非 // "每文件版本数上限"--因为 hash 文件名无法反向映射原始文件. // 这是内容寻址存储的固有权衡:节省空间(去重)换取无法按文件分组. // 如果需要"每文件版本数",需要额外的索引文件记录 hash→原始路径的映射. // 替代方案:维护 hash→原始路径的 JSON 索引(复杂度高,失去内容寻址的简洁性). // // 历史包袱(LEGACY): 第一版不处理 hash→原始路径映射,MaxVersions 语义是全局总数. // 未来版本可以加索引支持真正的"每文件版本数". func (h *FileHistory) Prune(ctx context.Context, policy RetentionPolicy) error { // 策略全为 0 表示不清理 if policy.MaxAgeDays == 0 && policy.MaxVersions == 0 { return nil } // 读取备份目录 entries, err := os.ReadDir(h.baseDir) if err != nil { if os.IsNotExist(err) { return nil // 目录不存在无需清理 } return fmt.Errorf("file_history: prune read dir %s: %w", h.baseDir, err) } // 收集备份文件信息(只处理普通文件,非目录) type backupFile struct { path string mtime time.Time } var files []backupFile for _, entry := range entries { select { case <-ctx.Done(): return ctx.Err() default: } if entry.IsDir() { continue } info, err := entry.Info() if err != nil { continue // 无法获取信息,跳过 } files = append(files, backupFile{ path: filepath.Join(h.baseDir, entry.Name()), mtime: info.ModTime(), }) } if len(files) == 0 { return nil } // 按 mtime 升序排序(最旧的在前) sort.Slice(files, func(i, j int) bool { return files[i].mtime.Before(files[j].mtime) }) now := time.Now() deleted := 0 var lastErr error // 阶段一:按时间清理(MaxAgeDays > 0) if policy.MaxAgeDays > 0 { cutoff := now.AddDate(0, 0, -policy.MaxAgeDays) for _, f := range files { select { case <-ctx.Done(): return ctx.Err() default: } if f.mtime.Before(cutoff) { if err := os.Remove(f.path); err != nil && !os.IsNotExist(err) { lastErr = fmt.Errorf("file_history: prune remove %s: %w", f.path, err) h.observer.Error(lastErr, map[string]any{ "path": f.path, "reason": "max_age", }) } else { deleted++ } } } } // 阶段二:按版本数清理(MaxVersions > 0) // 精妙之处(CLEVER): 重新扫描目录(阶段一可能已删除部分文件), // 避免基于已删除文件的 stale 列表做决策. if policy.MaxVersions > 0 { remaining, err := os.ReadDir(h.baseDir) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("file_history: prune re-read dir: %w", err) } // 重新排序剩余文件 var remainFiles []backupFile for _, entry := range remaining { if entry.IsDir() { continue } info, err := entry.Info() if err != nil { continue } remainFiles = append(remainFiles, backupFile{ path: filepath.Join(h.baseDir, entry.Name()), mtime: info.ModTime(), }) } sort.Slice(remainFiles, func(i, j int) bool { return remainFiles[i].mtime.Before(remainFiles[j].mtime) }) // 超过 MaxVersions 的最旧文件删除 toDelete := len(remainFiles) - policy.MaxVersions for i := 0; i < toDelete; i++ { select { case <-ctx.Done(): return ctx.Err() default: } if err := os.Remove(remainFiles[i].path); err != nil && !os.IsNotExist(err) { lastErr = fmt.Errorf("file_history: prune remove %s: %w", remainFiles[i].path, err) h.observer.Error(lastErr, map[string]any{ "path": remainFiles[i].path, "reason": "max_versions", }) } else { deleted++ } } } h.observer.Event("file_history_pruned", map[string]any{ "deleted": deleted, "max_age_days": policy.MaxAgeDays, "max_versions": policy.MaxVersions, }) // 阶段三:删除空目录(baseDir 本身不删) // 精妙之处(CLEVER): 只删一级--baseDir 已经是最深的目录(文件平铺存储), // 所以不需要递归删除.如果未来改为按项目子目录组织,这里需要扩展. if afterEntries, err := os.ReadDir(h.baseDir); err == nil && len(afterEntries) == 0 { // baseDir 为空但不删除--保留目录结构,下次备份直接用 // (删除后重建目录有竞态风险) _ = afterEntries } return lastErr }