package engine // 会话管理器. // // 统一管理所有会话的生命周期,提供创建,恢复,列出,清理等功能. // 替代原来散落在 Engine 中的简单 map 管理. // // 功能: // - 创建和获取会话 // - 从磁盘恢复已保存的会话 // - 列出所有活跃会话的摘要信息 // - 自动保存(每 N 轮自动保存 transcript,防崩溃丢数据) // - 会话过期清理(超过指定时间未活跃的会话自动释放内存) // - 最大会话数限制 import ( "fmt" "sort" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" ) // 默认配置常量 const ( defaultMaxSessions = 100 // 默认最大会话数 defaultAutoSaveEvery = 5 // 默认每 5 轮自动保存 defaultExpireAfter = 24 * time.Hour // 默认 24 小时过期 defaultCleanupInterval = 10 * time.Minute // 默认每 10 分钟检查一次过期 ) // SessionManagerConfig 是会话管理器的配置. type SessionManagerConfig struct { // MaxSessions 最大并发会话数,0 使用默认值 (100) MaxSessions int // AutoSaveEvery 每多少轮自动保存一次 transcript,0 表示不自动保存 AutoSaveEvery int // ExpireAfter 会话过期时间,超过此时间未活跃的会话将被清理. // 0 表示不过期 ExpireAfter time.Duration // Cwd 工作目录,用于生成 transcript 路径 Cwd string // Model 模型名称,用于保存 transcript 元数据 Model string } // SessionManager 管理所有会话的生命周期. type SessionManager struct { mu sync.Mutex engine EngineRef sessions map[string]*ManagedSession cfg *SessionManagerConfig stopCh chan struct{} // 停止清理 goroutine 的信号 cleanupDone chan struct{} // cleanupLoop 退出信号(Close 等待后再清 sessions map) stopped bool } // ManagedSession 是被管理器管理的会话,包含额外的元数据. type ManagedSession struct { // Session 是底层会话对象 Session *Session // Info 是会话摘要信息 Info SessionInfo // turnsUnsaved 是自上次保存以来的轮次数 turnsUnsaved int } // SessionInfo 是会话的摘要信息,用于列表展示. type SessionInfo struct { // ID 会话 ID ID string // Title 会话标题(从第一次对话自动生成或用户指定) Title string // CreatedAt 创建时间 CreatedAt time.Time // LastActiveAt 最后活跃时间 LastActiveAt time.Time // TurnCount 总轮次数 TurnCount int // InputTokens 累计输入 token 数 InputTokens int // OutputTokens 累计输出 token 数 OutputTokens int // CostUSD 累计花费(美元) CostUSD float64 // MessageCount 消息数量 MessageCount int // Closed 会话是否已关闭 Closed bool } // NewSessionManager 创建一个新的会话管理器. func NewSessionManager(engine EngineRef, cfg *SessionManagerConfig) *SessionManager { if cfg == nil { cfg = &SessionManagerConfig{} } if cfg.MaxSessions <= 0 { cfg.MaxSessions = defaultMaxSessions } if cfg.AutoSaveEvery <= 0 { cfg.AutoSaveEvery = defaultAutoSaveEvery } if cfg.ExpireAfter <= 0 { cfg.ExpireAfter = defaultExpireAfter } sm := &SessionManager{ engine: engine, sessions: make(map[string]*ManagedSession), cfg: cfg, stopCh: make(chan struct{}), cleanupDone: make(chan struct{}), } // 启动后台过期清理 goroutine go sm.cleanupLoop() return sm } // CreateSession 创建一个新的会话. // // 如果已达到最大会话数限制,会先清理最不活跃的会话. // 如果指定 ID 的会话已存在,返回已有会话. func (sm *SessionManager) CreateSession(id string) (*Session, error) { sm.mu.Lock() defer sm.mu.Unlock() // 如果已存在,直接返回 if ms, ok := sm.sessions[id]; ok { ms.Info.LastActiveAt = time.Now() return ms.Session, nil } // 检查最大会话数限制 if len(sm.sessions) >= sm.cfg.MaxSessions { // 清理最不活跃的会话 sm.evictLeastActive() } // 再次检查(evict 可能失败) if len(sm.sessions) >= sm.cfg.MaxSessions { return nil, NewEngineError(ErrInternal, fmt.Sprintf("已达到最大会话数限制 (%d)", sm.cfg.MaxSessions), nil) } // 创建新会话 session := sm.engine.Session(id) now := time.Now() ms := &ManagedSession{ Session: session, Info: SessionInfo{ ID: id, Title: "", CreatedAt: now, LastActiveAt: now, }, } sm.sessions[id] = ms return session, nil } // GetSession 获取已有的会话. // 如果会话不存在,返回 nil 和错误. func (sm *SessionManager) GetSession(id string) (*Session, error) { sm.mu.Lock() defer sm.mu.Unlock() ms, ok := sm.sessions[id] if !ok { return nil, NewEngineError(ErrSessionNotFound, fmt.Sprintf("会话 %s 不存在", id), nil) } ms.Info.LastActiveAt = time.Now() return ms.Session, nil } // ResumeSession 从磁盘恢复一个会话. // // 读取指定路径的 transcript 文件,恢复消息历史和统计信息. func (sm *SessionManager) ResumeSession(transcriptPath string) (*Session, error) { // 加载 transcript transcript, err := LoadTranscript(transcriptPath) if err != nil { return nil, WrapError(err, ErrSessionNotFound, fmt.Sprintf("无法加载会话记录: %s", transcriptPath)) } sm.mu.Lock() defer sm.mu.Unlock() // 检查是否已存在同 ID 的会话 if ms, ok := sm.sessions[transcript.SessionID]; ok { ms.Info.LastActiveAt = time.Now() return ms.Session, nil } // 检查最大会话数限制 if len(sm.sessions) >= sm.cfg.MaxSessions { sm.evictLeastActive() } // 创建新会话并恢复消息历史 session := sm.engine.Session(transcript.SessionID) // 恢复并规范化消息历史. // // 升华改进(ELEVATED): 早期方案直接 session.messages = transcript.Messages, // 不做任何清理.磁盘上的 transcript 可能包含: // 1. 孤立 thinking 块(压缩或中断时产生)→ API 返回 400 // 2. 未配对的 tool_result(工具执行后 AI 崩溃)→ API 返回 400 // 3. 空 assistant 消息 → API 返回 400 // 规范化管道在恢复时统一清理,保证恢复后的会话可以直接继续. // 替代方案:<在 Run() 入口处规范化> - 否决:问题暴露太晚, // 出错时调试困难(无法区分是恢复阶段还是运行阶段引入的问题). restoredMessages := DefaultNormalizePipeline().Run(transcript.Messages) // 中断哨兵注入:如果会话在"工具执行完成但 AI 未响应"时中断, // 注入 "Continue from where you left off." 用户消息,避免 AI 对着 // 工具结果发呆不响应. // // 精妙之处(CLEVER): 只在最后一条非系统消息是 tool_result 角色时注入-- // 区分两种 user 消息: // a) 最后消息是"用户真实请求" → AI 崩溃前未响应 → 不需要哨兵, // Run() 会直接调用 API 响应该请求. // b) 最后消息是"工具结果" → AI 崩溃在工具处理阶段 → 需要哨兵, // 否则 API 会收到 [tool_result] 结尾的序列,模型不知道该继续. // 不区分这两种情况会导致:用户请求前多出一条多余的 "Continue", // 让 AI 产生无意义的"continue"响应而不是直接回答用户. restoredMessages = maybeInjectResumeSentinel(restoredMessages) session.mu.Lock() session.messages = restoredMessages session.mu.Unlock() now := time.Now() ms := &ManagedSession{ Session: session, Info: SessionInfo{ ID: transcript.SessionID, Title: "", CreatedAt: transcript.CreatedAt, LastActiveAt: now, TurnCount: transcript.Stats.TurnCount, InputTokens: transcript.Stats.TotalInputTokens, OutputTokens: transcript.Stats.TotalOutputTokens, CostUSD: transcript.Stats.TotalCostUSD, MessageCount: len(restoredMessages), }, } sm.sessions[transcript.SessionID] = ms return session, nil } // ListSessions 列出所有活跃会话的摘要信息. // // 返回按最后活跃时间倒序排列的列表. func (sm *SessionManager) ListSessions() []SessionInfo { sm.mu.Lock() defer sm.mu.Unlock() infos := make([]SessionInfo, 0, len(sm.sessions)) for _, ms := range sm.sessions { // 更新消息计数 ms.Info.MessageCount = len(ms.Session.Messages()) // 升华改进(ELEVATED): 早期方案直接裸读 Session.closed 字段(此处持有 sm.mu, // 而 Session.closed 由 s.mu 保护),存在数据竞争. // 改为调用 IsClosed()(持有 s.mu 的方法),消除 data race. ms.Info.Closed = ms.Session.IsClosed() infos = append(infos, ms.Info) } // 按最后活跃时间倒序排列(O(n log n)) sort.Slice(infos, func(i, j int) bool { return infos[i].LastActiveAt.After(infos[j].LastActiveAt) }) return infos } // UpdateSessionInfo 更新会话的统计信息. // // 每轮对话结束后由引擎调用,更新 token 计数,花费等信息. func (sm *SessionManager) UpdateSessionInfo(id string, inputTokens, outputTokens int, costUSD float64) { sm.mu.Lock() defer sm.mu.Unlock() ms, ok := sm.sessions[id] if !ok { return } ms.Info.TurnCount++ ms.Info.InputTokens += inputTokens ms.Info.OutputTokens += outputTokens ms.Info.CostUSD += costUSD ms.Info.LastActiveAt = time.Now() // 自动保存检查 ms.turnsUnsaved++ if sm.cfg.AutoSaveEvery > 0 && ms.turnsUnsaved >= sm.cfg.AutoSaveEvery { sm.autoSave(ms) ms.turnsUnsaved = 0 } } // SetSessionTitle 设置会话标题. func (sm *SessionManager) SetSessionTitle(id, title string) { sm.mu.Lock() defer sm.mu.Unlock() if ms, ok := sm.sessions[id]; ok { ms.Info.Title = title } } // RemoveSession 移除并关闭一个会话. func (sm *SessionManager) RemoveSession(id string) { sm.mu.Lock() defer sm.mu.Unlock() if ms, ok := sm.sessions[id]; ok { ms.Session.Close() delete(sm.sessions, id) } } // Close 关闭会话管理器,保存所有未保存的会话,停止后台清理. // // 升华改进(ELEVATED): 等待 cleanupLoop goroutine 真正退出后再清空 sessions map, // 避免 cleanupLoop 的最后一次 cleanupExpired() 与 Close() 的清空操作发生竞争. // 原方案:close(stopCh) 后立刻清空 map,cleanupLoop 若刚好持有 sm.mu 等待则在 // // Close() 释放锁后会访问一个空 map(语义错误,虽不 panic 但日志可能产生噪音). // // 新方案:release mu → 等待 cleanupDone → 重新加锁 → 清空 map. // 替代方案:<在 cleanupExpired 中检查 sm.stopped,有则直接返回> // - 否决:仍有极短窗口期(close(stopCh) 之前 ticker 触发),且需要在 cleanupExpired // 里增加额外读锁判断,不如直接 drain goroutine 更干净. func (sm *SessionManager) Close() { sm.mu.Lock() if sm.stopped { sm.mu.Unlock() return } sm.stopped = true close(sm.stopCh) sm.mu.Unlock() // 等待 cleanupLoop goroutine 退出,再执行最终清理. // 精妙之处(CLEVER): 释放锁后等待,让 cleanupLoop 有机会拿到锁(如正在等 sm.mu) // 并完成当前执行,然后收到 stopCh 信号安全退出. <-sm.cleanupDone sm.mu.Lock() defer sm.mu.Unlock() // 保存所有未保存的会话(此时 cleanupLoop 已退出,无并发访问 sessions 风险) for _, ms := range sm.sessions { if ms.turnsUnsaved > 0 { sm.autoSave(ms) } ms.Session.Close() } sm.sessions = make(map[string]*ManagedSession) } // autoSave 自动保存会话到磁盘. // 调用前必须持有 sm.mu 锁. // // 升华改进(ELEVATED): 原方案静默丢弃所有保存错误(`_ = UpdateTranscript(...)`), // 磁盘满,权限不足等错误完全不可见,导致"自动保存"承诺形同虚设. // 新方案:保存失败时通过 observer 上报错误,至少让监控系统感知到. // 替代方案:<返回 error 给调用方> - 否决:autoSave 从 goroutine 内调用时无处返回, // // 且保存失败不应中断主流程(仍是 fail-open 设计),observer 上报是最低代价的可见化. func (sm *SessionManager) autoSave(ms *ManagedSession) { if sm.cfg.Cwd == "" { return } path := TranscriptPath(sm.cfg.Cwd, ms.Info.ID) messages := ms.Session.Messages() sessionID := ms.Info.ID stats := TranscriptStats{ TurnCount: ms.Info.TurnCount, TotalInputTokens: ms.Info.InputTokens, TotalOutputTokens: ms.Info.OutputTokens, TotalCostUSD: ms.Info.CostUSD, } // 在后台执行保存,不阻塞主流程. // 精妙之处(CLEVER): 捕获 sessionID 而非 ms 指针,避免 ms 在 goroutine 执行期间被修改. obs := sm.engine.Observer() go func() { if err := UpdateTranscript(path, sessionID, sm.cfg.Model, messages, stats); err != nil { obs.Error(err, map[string]any{ "op": "auto_save", "session_id": sessionID, "path": path, }) } }() } // evictLeastActive 驱逐最不活跃的会话以释放空间. // 优先驱逐已关闭的会话,其次驱逐最久未活跃的会话. // 调用前必须持有 sm.mu 锁. func (sm *SessionManager) evictLeastActive() { // 先查找已关闭的会话 for id, ms := range sm.sessions { if ms.Session.IsClosed() { // 避免裸读 Session.closed 造成 data race if ms.turnsUnsaved > 0 { sm.autoSave(ms) } delete(sm.sessions, id) return } } // 没有已关闭的,找最久未活跃的 var oldestID string var oldestTime time.Time first := true for id, ms := range sm.sessions { if first || ms.Info.LastActiveAt.Before(oldestTime) { oldestID = id oldestTime = ms.Info.LastActiveAt first = false } } if oldestID != "" { ms := sm.sessions[oldestID] if ms.turnsUnsaved > 0 { sm.autoSave(ms) } ms.Session.Close() delete(sm.sessions, oldestID) } } // cleanupLoop 后台定期清理过期会话. // 退出时关闭 cleanupDone,通知 Close() 可以安全执行最终清理. func (sm *SessionManager) cleanupLoop() { defer close(sm.cleanupDone) ticker := time.NewTicker(defaultCleanupInterval) defer ticker.Stop() for { select { case <-sm.stopCh: return case <-ticker.C: sm.cleanupExpired() } } } // cleanupExpired 清理所有过期的会话. func (sm *SessionManager) cleanupExpired() { sm.mu.Lock() defer sm.mu.Unlock() if sm.cfg.ExpireAfter <= 0 { return } expireBefore := time.Now().Add(-sm.cfg.ExpireAfter) var toRemove []string for id, ms := range sm.sessions { if ms.Info.LastActiveAt.Before(expireBefore) { toRemove = append(toRemove, id) } } for _, id := range toRemove { ms := sm.sessions[id] if ms.turnsUnsaved > 0 { sm.autoSave(ms) } ms.Session.Close() delete(sm.sessions, id) } } // SessionCount 返回当前活跃会话数. func (sm *SessionManager) SessionCount() int { sm.mu.Lock() defer sm.mu.Unlock() return len(sm.sessions) } // maybeInjectResumeSentinel 检测会话是否在"工具执行完成但 AI 未响应"时中断, // 如果是则注入续行哨兵 "Continue from where you left off.". // // 升华改进(ELEVATED): 早期实现的 detectTurnInterruption + // 哨兵注入逻辑.Go 版本只处理与 API 交互相关的核心场景: // - tool_result 结尾:AI 在处理工具结果过程中崩溃 → 需要哨兵 // - user 消息结尾(非 tool_result):用户发送了请求,AI 还没响应 → 不需要哨兵 // - assistant 消息结尾:AI 正常完成上一轮 → 不需要哨兵 // // 精妙之处(CLEVER): isToolResultOnly 检查区分了两种 user 消息-- // 真实用户请求(text content)vs. 工具执行结果(tool_result content). // TS 版本通过 isToolUseResultMessage(lastMessage) 做相同判断. func maybeInjectResumeSentinel(messages []query.Message) []query.Message { if len(messages) == 0 { return messages } // 找最后一条有意义的消息(跳过 system 消息) lastIdx := -1 for i := len(messages) - 1; i >= 0; i-- { if messages[i].Role != query.RoleSystem { lastIdx = i break } } if lastIdx < 0 { return messages // 全是 system 消息,不做任何处理 } last := messages[lastIdx] // 情形 1:最后是 assistant 消息 → 正常完成的轮次,无需哨兵 if last.Role == query.RoleAssistant { return messages } // 情形 2:最后是 user 消息,但内容是"真实用户请求"(text) // → AI 崩溃前未响应,Run() 会直接用这些消息调 API,无需哨兵 if last.Role == query.RoleUser && !isToolResultOnlyMessage(last) { return messages } // 情形 3:最后是 user 消息且全部内容是 tool_result → 中途中断,注入哨兵 sentinel := query.Message{ Role: query.RoleUser, Content: []query.Content{ { Type: query.ContentText, Text: "Continue from where you left off.", }, }, Metadata: map[string]any{ "is_resume_sentinel": true, }, } return append(messages, sentinel) } // isToolResultOnlyMessage 检查 user 消息是否全部由 tool_result 块组成. // // 精妙之处(CLEVER): 空消息(len(Content)==0)返回 false-- // 空 user 消息是规范化管道应该删除的异常情况,不应被识别为工具结果消息, // 避免为空消息误注入哨兵. func isToolResultOnlyMessage(msg query.Message) bool { if len(msg.Content) == 0 { return false } for _, c := range msg.Content { if c.Type != query.ContentToolResult { return false } } return true }