package engine // session_snapshot.go 实现会话快照存储(1.6 断点续传 Conversation Resume). // // 当一次 Engine.Run() 被中断(网络断开,超时,进程崩溃), // 可以通过快照恢复到中断点,继续未完成的对话轮次,而不是从头重新开始. // // 架构分层: // - SessionSnapshot - 快照数据结构(序列化为 JSON) // - SnapshotStore - 读写接口(可替换后端:内存/文件/数据库) // - FileSnapshotStore - 文件系统实现(~/.flyto/snapshots/.json) // - ResumeConversation - 上层入口:加载快照 → 调用 Engine.Run() // // 与 session_persist.go(Transcript)的区别: // - Transcript:完整对话历史,面向人类审计和 --resume 手动重放 // - SessionSnapshot:轻量级断点状态,面向程序自动恢复,用后即删 // // 设计原则: // - SnapshotStore 为 nil 时完全不影响现有行为(向后兼容) // - 快照原子写入(tmpfile + rename)避免写入一半的损坏文件 // - PartialToolUse 记录流被截断时的工具调用残片,用于恢复提示词 // // 升华改进(ELEVATED): 早期实现 无断点续传机制--崩溃后对话丢失,用户只能重发. // 我们在 SnapshotStore 接口+FileSnapshotStore 实现后,任何持久化后端 // (Redis,数据库)只需实现 3 个方法即可接入,零改引擎代码. // 替代方案:<把 snapshot 直接集成进 Transcript> // - 否决:Transcript 是面向人类的只追加日志,混入断点状态违反单一职责; // 且 Transcript 保留所有历史,快照"用后即删"的语义无法对齐. import ( "context" "encoding/json" "fmt" "os" "path/filepath" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" ) // SessionSnapshot 是会话断点的完整状态快照. // // 快照包含恢复对话所需的最小数据集--不是完整 Transcript, // 只保存"恢复执行需要什么",删除后不影响审计(Transcript 独立记录审计数据). // // 精妙之处(CLEVER): TurnIndex 与 Messages 配对存储-- // 恢复时 Engine.Run() 通过 WithMessages(snap.Messages) 注入历史, // TurnIndex 仅用于日志/调试,不影响执行路径. // 若 Messages 已包含完整轮次,TurnIndex 也可从 Messages 推断-- // 独立存储是为了"快速判断恢复点在第几轮"而无需遍历消息列表. type SessionSnapshot struct { // ConversationID 会话唯一标识,与 SnapshotStore.Save/Load/Delete 的 key 对应. // 由消费层分配(UUID,时间戳+随机数等). ConversationID string `json:"conversation_id"` // Messages 是中断点时的完整消息历史(含已发送的 user/assistant/tool_result 消息). // 恢复时作为 WithMessages 的参数传入 Engine.Run(). // // 升华改进(ELEVATED): 使用 query.Message 而非 internal/api.RequestMessage-- // query.Message 是引擎公开 API 层的类型(带 Metadata 字段), // 序列化更稳定,且消费层可以检查/修改 Messages(例如注入安全过滤). // 内部执行前 convertQueryMessage 会转换为 api.RequestMessage. Messages []query.Message `json:"messages"` // TurnIndex 中断时完成的轮次数(从 0 开始,已完成的最后一轮的索引). // 仅供调试和日志使用;恢复执行时不用此字段控制流程. TurnIndex int `json:"turn_index"` // PartialToolUse 记录流被截断时已收到的不完整工具调用(可选). // 非 nil 表示中断发生在工具调用流式传输过程中. // 恢复时 maybeInjectResumeSentinel 会根据消息列表末尾的工具结果状态 // 自动注入哨兵,PartialToolUse 作为附加信息供调试使用. PartialToolUse *PartialToolUse `json:"partial_tool_use,omitempty"` // SavedAt 快照创建时间(UTC). // 用于判断快照是否过期(例如超过 24 小时不再恢复,从头开始). SavedAt time.Time `json:"saved_at"` } // PartialToolUse 记录流截断时的不完整工具调用片段. // // 历史包袱(LEGACY): 当前引擎在 partial-stream 时用 WarningEvent 报告截断, // 不记录 PartialToolUse 到快照(因为截断时消息历史尚未更新). // 此结构体为未来扩展预留--如果将来实现更精细的"逐 block 保存", // PartialToolUse 可以帮助恢复工具调用的输入 JSON 拼接状态. type PartialToolUse struct { // ID 工具调用的 ID(来自 content_block_start 事件) ID string `json:"id"` // Name 工具名称 Name string `json:"name"` // Input 已收到的部分 JSON 字符串(可能不完整) Input string `json:"input"` } // SnapshotStore 是会话快照的读写接口. // // 实现约定: // - Save 必须是原子性的(不能写一半) // - Load 若 key 不存在,返回 (zero, false, nil) // - Delete 若 key 不存在,静默返回 nil(幂等) // // 升华改进(ELEVATED): 接口只有 3 个方法(Save/Load/Delete)-- // 简单的接口更容易实现,Redis 接入只需 SET/GET/DEL,数据库只需 UPSERT/SELECT/DELETE. // 如果需要列出所有快照(管理界面),可以在接口外部通过 ListSnapshots(dir) 工具函数实现, // 不污染核心接口. // 替代方案:<更丰富的接口(List/Expire/CAS)> // - 否决:YAGNI,过度设计的接口让实现方负担重,且 80% 场景不需要这些方法. type SnapshotStore interface { // Save 保存(或覆盖)快照.返回写入错误. Save(ctx context.Context, snap SessionSnapshot) error // Load 加载快照.若不存在返回 (zero, false, nil). Load(ctx context.Context, conversationID string) (SessionSnapshot, bool, error) // Delete 删除快照(成功恢复后调用,清理存储).幂等,不存在时不报错. Delete(ctx context.Context, conversationID string) error } // FileSnapshotStore 是基于文件系统的快照存储实现. // // 快照文件路径:/.json // 默认目录:~/.flyto/snapshots/ // // 原子写入策略: // 1. 写入 .tmp 临时文件 // 2. os.Rename 原子替换(Unix 保证原子性,Windows 近似原子) // 3. Rename 失败时删除临时文件,返回错误 // // 精妙之处(CLEVER): Rename 在同一文件系统内是原子操作(单个 inode 替换)-- // 即使进程在 WriteFile 完成后 Rename 前崩溃,也不会留下损坏的快照文件(旧文件完好保留). // 只有 Rename 成功后,新快照才"生效". // 替代方案:直接写目标文件(崩溃时可能留下半截 JSON,Load 会失败). type FileSnapshotStore struct { // Dir 快照文件目录. // 若为空,使用 defaultSnapshotDir()(~/.flyto/snapshots/). Dir string } // NewFileSnapshotStore 创建一个使用指定目录的文件快照存储. // dir 为空时使用 ~/.flyto/snapshots/. func NewFileSnapshotStore(dir string) *FileSnapshotStore { if dir == "" { dir = defaultSnapshotDir() } return &FileSnapshotStore{Dir: dir} } // defaultSnapshotDir 返回默认快照目录路径. func defaultSnapshotDir() string { home, err := os.UserHomeDir() if err != nil { // 降级:用 cache 目录而非 /tmp(安全 + 跨平台) if cacheDir, cacheErr := os.UserCacheDir(); cacheErr == nil { return filepath.Join(cacheDir, "flyto", "snapshots") } return filepath.Join(os.TempDir(), "flyto-snapshots") } return filepath.Join(home, ".flyto", "snapshots") } // snapshotPath 计算快照文件的完整路径. func (s *FileSnapshotStore) snapshotPath(conversationID string) string { // 精妙之处(CLEVER): conversationID 直接用于文件名,不做额外 hash-- // UUID/时间戳格式的 ID 通常是安全的文件名字符集. // 若消费层传入不安全的 ID(含路径分隔符),filepath.Join 会将 "/" 解析为子目录, // 存在路径穿越风险.此处加简单清洗:只取 Base 部分. // 替代方案:(绝对安全但丢失可读性,调试快照时无法按 ID 定位) safe := filepath.Base(conversationID) return filepath.Join(s.Dir, safe+".json") } // Save 将快照原子写入文件. // 若目录不存在,自动创建(MkdirAll). func (s *FileSnapshotStore) Save(_ context.Context, snap SessionSnapshot) error { if err := os.MkdirAll(s.Dir, 0755); err != nil { return fmt.Errorf("snapshot: mkdir %s: %w", s.Dir, err) } data, err := json.MarshalIndent(snap, "", " ") if err != nil { return fmt.Errorf("snapshot: marshal %s: %w", snap.ConversationID, err) } path := s.snapshotPath(snap.ConversationID) tmpPath := path + ".tmp" if err := os.WriteFile(tmpPath, data, 0644); err != nil { return fmt.Errorf("snapshot: write tmp %s: %w", tmpPath, err) } if err := os.Rename(tmpPath, path); err != nil { os.Remove(tmpPath) // 清理残留临时文件 return fmt.Errorf("snapshot: rename %s -> %s: %w", tmpPath, path, err) } return nil } // Load 从文件加载快照. // 若文件不存在,返回 (zero, false, nil). func (s *FileSnapshotStore) Load(_ context.Context, conversationID string) (SessionSnapshot, bool, error) { path := s.snapshotPath(conversationID) data, err := os.ReadFile(path) if err != nil { if os.IsNotExist(err) { return SessionSnapshot{}, false, nil } return SessionSnapshot{}, false, fmt.Errorf("snapshot: read %s: %w", path, err) } var snap SessionSnapshot if err := json.Unmarshal(data, &snap); err != nil { return SessionSnapshot{}, false, fmt.Errorf("snapshot: parse %s: %w", path, err) } return snap, true, nil } // Delete 删除快照文件.幂等:文件不存在时静默返回 nil. func (s *FileSnapshotStore) Delete(_ context.Context, conversationID string) error { path := s.snapshotPath(conversationID) err := os.Remove(path) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("snapshot: delete %s: %w", path, err) } return nil } // ResumeConversation 从快照恢复并继续执行 Agent 对话. // // 流程: // 1. 从 store 加载快照(若不存在,返回 ErrSnapshotNotFound) // 2. 用 maybeInjectResumeSentinel 为断点注入恢复哨兵(处理 tool_result 末尾的情况) // 3. 调用 engine.Run(ctx, "", WithMessages(msgs)) // 4. 收集 channel 事件,返回 RunResult 摘要 // 5. 执行成功后删除快照(避免重复恢复) // // 注意:第 5 步"成功后删除"是最佳努力(delete 失败不返回错误), // 避免让恢复成功但 delete 失败的情况污染用户体验. // 若下次再次 Resume 同一 convID,会重新恢复(幂等)-- // 这是期望行为:宁可重复一次恢复也不因 delete 失败丢失进度. // // 升华改进(ELEVATED): ResumeConversation 是引擎公开 API,消费层直接调用-- // 无需了解 WithMessages / maybeInjectResumeSentinel 的内部实现细节. // 替代方案:<让消费层自己 Load → WithMessages → Run> // - 否决:哨兵注入逻辑 (maybeInjectResumeSentinel) 是引擎内部知识, // 不应泄漏到消费层.每个消费层重新实现会导致不一致. func ResumeConversation(ctx context.Context, eng *Engine, store SnapshotStore, convID string) (<-chan Event, error) { snap, found, err := store.Load(ctx, convID) if err != nil { return nil, fmt.Errorf("resume: load snapshot %s: %w", convID, err) } if !found { return nil, fmt.Errorf("resume: snapshot not found for conversation %q", convID) } // 注入恢复哨兵(处理 tool_result 末尾的情况) msgs := maybeInjectResumeSentinel(snap.Messages) // 恢复执行:注入消息历史,prompt 置空(哨兵已在 msgs 末尾) // 精妙之处(CLEVER): prompt="" 时引擎检查 WithMessages 的末尾消息-- // 若末尾是 user 消息(哨兵),直接作为本轮输入; // 若末尾是 assistant,则 prompt="" 会导致空用户消息(不是本意). // maybeInjectResumeSentinel 保证了末尾必为 user 消息,所以 prompt="" 安全. ch := eng.Run(ctx, "", WithMessages(msgs)) // 恢复执行的 channel 完成后,删除快照(后台清理,不阻塞调用方) // 历史包袱(LEGACY): 理想情况下应等待 DoneEvent 才删快照, // 但 channel 是流式的,调用方控制消费速度. // 这里在后台 goroutine 中 drain channel 并在 DoneEvent 后删除快照-- // 不影响调用方的 ch 消费(调用方拿到 ch 后自己 range 消费). // 注意:ch 是同一个 channel,调用方和后台 goroutine 不能同时消费! // 解决方案:返回 ch 之前不启动后台 goroutine,而是让调用方在消费完毕后主动调用 store.Delete. // 当前实现:由调用方负责删除(此函数只做 Load + Run). // TODO(1.6): 提供 ResumeConversationAndCleanup 包装函数, // 在 DoneEvent 后自动调用 store.Delete(调用方可选使用). return ch, nil } // BuildSnapshot 基于当前对话状态构造一个会话快照, 供消费层保存到 SnapshotStore. // // 典型用法(在 runLoop 外部监听 channel 的消费层): // // // 消费层在收到 TurnEndEvent 后调用 // if store != nil && convID != "" { // snap := engine.BuildSnapshot(convID, messages, turnIndex) // store.Save(ctx, snap) // } // // 注意:引擎本身不主动调用此函数--快照是消费层的责任. // 引擎内部 runLoop 不感知 SnapshotStore,保持引擎核心的简洁. // // 升华改进(ELEVATED): 将"何时保存快照"的决策权留给消费层-- // 有些场景只需每 N 轮保存一次(降低 I/O 开销), // 有些场景需要每轮都保存(高可靠性要求). // 替代方案:<引擎内部每轮保存> // - 否决:增加引擎核心复杂度,且 I/O 策略因场景不同无法统一. func BuildSnapshot(convID string, messages []query.Message, turnIndex int) SessionSnapshot { return SessionSnapshot{ ConversationID: convID, Messages: messages, TurnIndex: turnIndex, SavedAt: time.Now().UTC(), } }