package engine // session_persist.go 实现会话持久化功能. // // 支持将对话消息保存为 JSON 文件,以及从文件恢复会话. // 实现 transcript / --resume 的功能. // // 设计要点: // - 使用 JSON 格式存储,人类可读 // - 保存完整的消息历史和元数据(model,时间戳,统计等) // - 恢复时只需要消息历史,其他信息用于审计 // - 文件路径由消费层决定(可以是 ~/.flyto/projects/xxx/transcript.json) import ( "encoding/json" "fmt" "os" "path/filepath" "sort" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" ) // Transcript 是会话记录,包含完整的对话历史和元数据. // 序列化为 JSON 文件用于持久化. // // 版本兼容设计(INF-6): // // FormatVersion - 格式 schema 版本(int,迁移用). // 单调递增,每次 breaking change 时 bump. // LoadTranscript 用此字段决定是否需要迁移. // // EngineVersion - 写入此文件的引擎应用版本(string,仅 audit 用). // 格式如 "1.2.3",只用于问题排查,不参与迁移逻辑. // // 升华改进(ELEVATED): 早期实现 SerializedMessage.version 只存应用版本号(如 "1.2.3"), // 无法区分"格式变了"和"应用版本升了".我们将两层分开: // // FormatVersion int → schema 迁移用(int 比较一行,无需 semver) // EngineVersion string → audit 用(人类可读) // // 替代方案:<只保留 Version string 存应用版本> - // 否决原因:无法在不引入 semver 解析的情况下做格式迁移判断. type Transcript struct { // FormatVersion 是文件格式的 schema 版本号(int,从 1 开始). // 对应迁移表 transcriptMigrations 的 key. // 旧文件(缺少此字段)反序列化为 0,LoadTranscript 会将其规范化为 1. FormatVersion int `json:"format_version"` // EngineVersion 是写入此文件的引擎应用版本,仅用于 audit/排查. // 不参与迁移逻辑.空字符串表示未知(旧文件). EngineVersion string `json:"engine_version,omitempty"` // SessionID 会话 ID SessionID string `json:"session_id"` // Model 使用的模型 ID Model string `json:"model"` // CreatedAt 会话创建时间 CreatedAt time.Time `json:"created_at"` // UpdatedAt 最后更新时间 UpdatedAt time.Time `json:"updated_at"` // Messages 完整的消息历史 Messages []query.Message `json:"messages"` // Stats 会话统计信息 Stats TranscriptStats `json:"stats"` } // TranscriptStats 是会话统计信息. type TranscriptStats struct { // TurnCount 总轮次数 TurnCount int `json:"turn_count"` // TotalInputTokens 总输入 token 数 TotalInputTokens int `json:"total_input_tokens"` // TotalOutputTokens 总输出 token 数 TotalOutputTokens int `json:"total_output_tokens"` // TotalCostUSD 总花费(美元) TotalCostUSD float64 `json:"total_cost_usd"` } // SaveTranscript 将会话消息保存为 JSON 文件. // // 参数: // - path: 保存路径(如 ~/.flyto/projects/xxx/transcript.json) // - sessionID: 会话 ID // - model: 使用的模型 // - messages: 消息历史 // - stats: 统计信息 // // 如果文件已存在,会被覆盖. // 会自动创建父目录. func SaveTranscript(path string, sessionID string, model string, messages []query.Message, stats TranscriptStats) error { transcript := &Transcript{ FormatVersion: transcriptCurrentVersion, SessionID: sessionID, Model: model, CreatedAt: time.Now(), UpdatedAt: time.Now(), Messages: messages, Stats: stats, } return saveTranscriptFile(path, transcript) } // UpdateTranscript 更新已有的会话记录文件. // 如果文件已存在,保留 CreatedAt,更新其他字段. func UpdateTranscript(path string, sessionID string, model string, messages []query.Message, stats TranscriptStats) error { // 尝试读取已有记录 existing, err := LoadTranscript(path) if err != nil { // 文件不存在或读取失败,创建新记录 return SaveTranscript(path, sessionID, model, messages, stats) } // 保留创建时间,更新其他字段 existing.UpdatedAt = time.Now() existing.Messages = messages existing.Stats = stats existing.Model = model return saveTranscriptFile(path, existing) } // saveTranscriptFile 将 Transcript 序列化并写入文件. func saveTranscriptFile(path string, transcript *Transcript) error { // 确保父目录存在 dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("create directory %s: %w", dir, err) } // 序列化为格式化 JSON(便于人类阅读和调试) data, err := json.MarshalIndent(transcript, "", " ") if err != nil { return fmt.Errorf("marshal transcript: %w", err) } // 写入文件(原子写入:先写临时文件,再重命名) tmpPath := path + ".tmp" if err := os.WriteFile(tmpPath, data, 0644); err != nil { return fmt.Errorf("write file %s: %w", tmpPath, err) } if err := os.Rename(tmpPath, path); err != nil { // 重命名失败,清理临时文件 os.Remove(tmpPath) return fmt.Errorf("rename %s -> %s: %w", tmpPath, path, err) } return nil } // LoadTranscript 从 JSON 文件加载会话记录. // 返回 Transcript 或错误(文件不存在,格式错误等). // // 用于 --resume 场景:从上次保存的位置恢复对话. func LoadTranscript(path string) (*Transcript, error) { data, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("read file %s: %w", path, err) } var transcript Transcript if err := json.Unmarshal(data, &transcript); err != nil { return nil, fmt.Errorf("parse transcript %s: %w", path, err) } // 精妙之处(CLEVER): FormatVersion == 0 表示旧格式文件(写入时无此字段). // JSON omitempty 不适用于 int(零值不省略),旧文件反序列化为 0, // 规范化为 1 以便迁移逻辑正确运行. // 之所以是 1 而非"直接跳过迁移":将来 v1→v2 迁移函数如果存在, // 旧文件也应该经历该迁移,而不是被当作"已是最新版本"跳过. if transcript.FormatVersion == 0 { transcript.FormatVersion = 1 } // MaxSupportedVersion 保护(INF-6 核心防线): // 若文件版本高于本引擎最高支持版本,说明此文件由更新版本的引擎写入. // 静默读取并在保存时以旧格式回写 = 数据损坏(新字段丢失). // 明确报错迫使用户升级引擎或不要降级读取新格式文件. // // 升华改进(ELEVATED): 早期实现 loadStatsCache 对 version > STATS_CACHE_VERSION // 返回空缓存(丢弃数据),我们返回明确错误--丢弃对 stats cache 可接受 // (可重建),但对 Transcript(用户对话历史)不可接受. // 替代方案:<返回空 Transcript,让调用方开新会话> - // 否决原因:静默丢失对话历史比报错更难排查. if transcript.FormatVersion > transcriptMaxSupportedVersion { return nil, fmt.Errorf( "transcript %s has format version %d, but this engine only supports up to v%d; "+ "please upgrade the engine to read this file", path, transcript.FormatVersion, transcriptMaxSupportedVersion, ) } // 运行迁移(当前 transcriptMigrations 为空,此调用是 no-op). // 未来有 breaking change 时,在 migrate.go 中注册迁移函数即可, // 此处代码无需修改--开闭原则. if err := migrateTranscript(&transcript); err != nil { return nil, fmt.Errorf("migrate transcript %s: %w", path, err) } return &transcript, nil } // TranscriptPath 生成默认的会话记录路径. // 格式:~/.flyto/projects//transcripts/.json // // 参数: // - cwd: 当前工作目录(用于生成项目 hash) // - sessionID: 会话 ID func TranscriptPath(cwd, sessionID string) string { // 使用工作目录的简单 hash 作为项目标识 projectHash := simpleHash(cwd) homeDir, err := os.UserHomeDir() if err != nil { // 降级:用 cache 目录而非 /tmp(安全 + 跨平台) if cacheDir, cacheErr := os.UserCacheDir(); cacheErr == nil { homeDir = cacheDir } else { homeDir = os.TempDir() // 最后一道防线 } } // 升华改进(ELEVATED): 防止 sessionID 含路径分隔符(如 "../../evil")导致路径穿越. // filepath.Base 只取最后一个路径段:"../../evil" → "evil",不影响 UUID 格式 ID. // 早期方案直接拼接 sessionID+".json",5 层 "../" 可逃出 .flyto 目录写任意文件. safeID := filepath.Base(sessionID) return filepath.Join(homeDir, ".flyto", "projects", projectHash, "transcripts", safeID+".json") } // simpleHash 生成字符串的简单 hash(用于目录名). // 使用 FNV-1a 算法,不需要密码安全性. func simpleHash(s string) string { // FNV-1a 32-bit var hash uint32 = 2166136261 for i := 0; i < len(s); i++ { hash ^= uint32(s[i]) hash *= 16777619 } return fmt.Sprintf("%08x", hash) } // ListTranscripts 列出指定项目目录下的所有会话记录. // 返回文件路径列表,按修改时间倒序排列. func ListTranscripts(cwd string) ([]string, error) { projectHash := simpleHash(cwd) homeDir, err := os.UserHomeDir() if err != nil { return nil, fmt.Errorf("get home dir: %w", err) } transcriptDir := filepath.Join(homeDir, ".flyto", "projects", projectHash, "transcripts") entries, err := os.ReadDir(transcriptDir) if err != nil { if os.IsNotExist(err) { return nil, nil // 目录不存在,返回空列表 } return nil, fmt.Errorf("read dir %s: %w", transcriptDir, err) } // 收集 JSON 文件路径 var paths []string for _, entry := range entries { if entry.IsDir() { continue } name := entry.Name() if len(name) > 5 && name[len(name)-5:] == ".json" { paths = append(paths, filepath.Join(transcriptDir, name)) } } // 按修改时间倒序排列(最新的在前). // 早期方案用冒泡排序且在内层循环反复调用 os.Stat(O(n²) 次 Stat). // 改进:先一次性收集所有 mtime,再 sort.Slice(O(n log n) + O(n) Stat). type pathInfo struct { path string mtime time.Time } pinfos := make([]pathInfo, 0, len(paths)) for _, p := range paths { info, err := os.Stat(p) if err != nil { pinfos = append(pinfos, pathInfo{path: p}) // mtime 为零值,排到最后 continue } pinfos = append(pinfos, pathInfo{path: p, mtime: info.ModTime()}) } sort.Slice(pinfos, func(i, j int) bool { return pinfos[i].mtime.After(pinfos[j].mtime) }) for i, pi := range pinfos { paths[i] = pi.path } return paths, nil }