// Package daemon 实现 Flyto Agent Platform 的多会话守护进程管理器. // // DaemonManager 是引擎"服务模式"的核心: // - 接受来自 BridgeTransport 的客户端连接 // - 管理引擎 Session 的生命周期(创建/恢复/关闭) // - 控制并发会话数量(SessionPool) // - 维持会话心跳,检测空闲超时 // - 崩溃恢复(会话异常结束后自动重连) // // 架构位置: // // 客户端 → BridgeTransport → DaemonManager → engine.Session → LLM // // 升华改进(ELEVATED): 早期方案 是 2999 行巨石文件, // 会话管理/心跳/容量控制/崩溃恢复全部耦合在一起,无法单独测试. // 我们将每个关注点拆分为独立类型(SessionPool/HeartbeatService/IdleTimer/ // CrashRecovery),DaemonManager 是它们的组合器,每个子组件独立可测. // 替代方案:<直接翻译早期方案的平铺结构> - 否决:2999 行文件在 Go // 中同样不可测,同样是技术债. package daemon import ( "context" "fmt" "log" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/bridge" "git.flytoex.net/yuanwei/flyto-agent/pkg/engine" ) // DaemonConfig 是 DaemonManager 的配置. type DaemonConfig struct { // MaxSessions 最大并发会话数,0 = 无限制(不推荐生产使用). MaxSessions int // IdleTimeout 会话空闲超时.超时后会话自动关闭. // 0 = 无超时(历史包袱(LEGACY): 早期方案固定 24h,我们改为可配置). // 推荐值:30 分钟(多数仓储场景操作员不会中途离开超过 30 分钟). IdleTimeout time.Duration // HeartbeatInterval 心跳检测间隔,0 = 不检测. HeartbeatInterval time.Duration // HeartbeatTimeout 心跳超时阈值.超过此时间无心跳视为连接断开. HeartbeatTimeout time.Duration // Isolation 会话隔离模式,默认 SharedIsolation. Isolation SessionIsolation // BaseDir 工作目录基础路径(IsolatedIsolation 会在此下创建子目录). BaseDir string // CrashRecovery 崩溃恢复配置,nil = 不自动恢复. CrashRecovery *CrashRecoveryConfig } // ManagedSession 是被 DaemonManager 管理的会话状态. type ManagedSession struct { session *engine.Session isolation SessionIsolation workDir string idleTimer *IdleTimer heartbeat *sessionHeartbeat createdAt time.Time mu sync.RWMutex lastActive time.Time conn bridge.SessionConn // 当前绑定的客户端连接(可能因断线而替换) } func (ms *ManagedSession) touch() { ms.mu.Lock() ms.lastActive = time.Now() ms.mu.Unlock() if ms.idleTimer != nil { ms.idleTimer.Reset() } } // DaemonManager 管理多个并发的 Agent 会话. type DaemonManager struct { eng *engine.Engine transport bridge.BridgeTransport pool *SessionPool cfg DaemonConfig // heartbeat is the single-ticker session liveness scanner. It is // constructed iff DaemonConfig.HeartbeatInterval > 0 (see // NewDaemonManager) and drives the runtime wire of // HeartbeatInterval + HeartbeatTimeout: Start/Stop by daemon // lifecycle, Register/Unregister by session open/close, // Beat by inbound client message. Timeout callback routes to // dm.closeSession, so stale "half-dead" connections are actually // culled -- not just detectable on paper. // // heartbeat 是单 ticker 的会话活性扫描器. 仅当 // DaemonConfig.HeartbeatInterval > 0 时构造 (见 NewDaemonManager), // 承载 HeartbeatInterval + HeartbeatTimeout 的运行时 wire: // Start/Stop 随 daemon 生命周期, Register/Unregister 随会话开/关, // Beat 随客户端消息到达. 超时回调指向 dm.closeSession, 半死连接 // 真的被剔除 -- 不只是账面上能检测. heartbeat *HeartbeatService // crashRecovery is the exponential-backoff retry wrapper for // session prompt runs. Constructed iff DaemonConfig.CrashRecovery // is non-nil. When present, receiveMessages wraps handlePrompt // invocation in RunWithRecovery so a panicked prompt run -- caught // by defer-recover and reified as an error -- triggers a retry // under the configured backoff policy. OnCrash / OnGiveUp // callbacks surface to operator audit sinks. This converts // DaemonConfig.CrashRecovery from "declared retry policy" into // "actual retry behavior at the prompt boundary". // // crashRecovery 是 session prompt 运行的指数退避重试包装器. // 仅当 DaemonConfig.CrashRecovery 非 nil 时构造. 存在时 // receiveMessages 用 RunWithRecovery 包裹 handlePrompt 调用, // panic 被 defer-recover 捕获并抽象为 error, 按配置退避策略 // 重试. OnCrash / OnGiveUp 回调给运维 audit sink. 把 // DaemonConfig.CrashRecovery 从 "声明的重试策略" 变 "prompt // 边界真实重试行为". crashRecovery *CrashRecovery mu sync.RWMutex sessions map[string]*ManagedSession ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewDaemonManager 创建 DaemonManager. // // eng: 引擎实例,所有会话共享同一引擎(但每个会话有独立的 engine.Session). // transport: 客户端连接来源(SSETransport/WebSocketTransport/...). // cfg: Daemon 配置. func NewDaemonManager( eng *engine.Engine, transport bridge.BridgeTransport, cfg DaemonConfig, ) *DaemonManager { ctx, cancel := context.WithCancel(context.Background()) if cfg.Isolation == nil { cfg.Isolation = SharedIsolation{} } dm := &DaemonManager{ eng: eng, transport: transport, pool: NewSessionPool(cfg.MaxSessions), cfg: cfg, sessions: make(map[string]*ManagedSession), ctx: ctx, cancel: cancel, } // Wire heartbeat iff interval is configured. Reading // cfg.HeartbeatInterval + cfg.HeartbeatTimeout here is the single // production read-site that turns these two DaemonConfig fields // from "declared intent" into "runtime behavior": the ticker scan // frequency and the staleness threshold respectively. Timeout // callback points at dm.closeSession so expiring a session actually // releases pool slot + cleans isolation + drops client conn. // // 仅在配置了 interval 时 wire heartbeat. 这里读 // cfg.HeartbeatInterval + cfg.HeartbeatTimeout 是把这两个 // DaemonConfig 字段从 "声明意图" 变 "运行时行为" 的唯一产线 // 读点: 分别驱动 ticker 扫描频率和超时阈值. 超时回调指向 // dm.closeSession, 会话超时真释放 pool 槽 + 清 isolation + // 断客户端连接. if cfg.HeartbeatInterval > 0 { dm.heartbeat = NewHeartbeatService( HeartbeatConfig{ Interval: cfg.HeartbeatInterval, Timeout: cfg.HeartbeatTimeout, }, dm.closeSession, ) } // Wire crash recovery iff operator provided a policy. Reading // cfg.CrashRecovery here is the single production read-site that // turns the DaemonConfig field from "declared retry policy" into // "actual retry behavior" -- the wrapper is used later in // receiveMessages to surround handlePrompt invocations. // // 仅当运维提供策略时 wire crash recovery. 这里读 cfg.CrashRecovery // 是把 DaemonConfig 字段从 "声明的重试策略" 变 "prompt 边界真 // 重试行为" 的唯一产线读点 -- 包装器稍后在 receiveMessages 里 // 包裹 handlePrompt 调用. if cfg.CrashRecovery != nil { dm.crashRecovery = NewCrashRecovery(*cfg.CrashRecovery) } return dm } // Start 启动 DaemonManager,开始接受客户端连接. // 非阻塞,返回后 DaemonManager 在后台运行. func (dm *DaemonManager) Start() { if dm.heartbeat != nil { dm.heartbeat.Start() } dm.wg.Add(1) go dm.acceptLoop() } // acceptLoop 持续从 BridgeTransport 接受新连接并分发给会话处理协程. func (dm *DaemonManager) acceptLoop() { defer dm.wg.Done() for { select { case <-dm.ctx.Done(): return case conn, ok := <-dm.transport.Accept(): if !ok { return // transport 已关闭 } dm.wg.Add(1) go func() { defer dm.wg.Done() dm.handleConn(conn) }() } } } // handleConn 处理一个新的客户端连接. // 获取或创建对应会话,然后将连接绑定到会话,开始事件转发循环. func (dm *DaemonManager) handleConn(conn bridge.SessionConn) { sessionID := conn.SessionID() // 获取或创建会话 ms, err := dm.getOrCreateSession(sessionID) if err != nil { log.Printf("daemon: failed to handle conn for session %s: %v", sessionID, err) conn.Close() return } // 绑定新连接(可能替换旧的断线连接) ms.mu.Lock() ms.conn = conn ms.mu.Unlock() ms.touch() // 事件转发:引擎 → 客户端 dm.wg.Add(1) go func() { defer dm.wg.Done() dm.forwardEvents(ms, conn) }() // 消息接收:客户端 → 引擎 dm.wg.Add(1) go func() { defer dm.wg.Done() dm.receiveMessages(ms, conn) }() } // getOrCreateSession 获取已有会话或创建新会话. // // 精妙之处(CLEVER): 双重检查锁定(先 RLock 检查,未命中再 Lock 创建)-- // 热路径(会话已存在的重连)只需读锁,不阻塞其他并发操作. // 只有第一次创建时才需要写锁,是少数情况. func (dm *DaemonManager) getOrCreateSession(sessionID string) (*ManagedSession, error) { // 快路径:读锁检查 dm.mu.RLock() if ms, exists := dm.sessions[sessionID]; exists { dm.mu.RUnlock() return ms, nil } dm.mu.RUnlock() // 慢路径:写锁创建 dm.mu.Lock() defer dm.mu.Unlock() // 再次检查(防止并发创建) if ms, exists := dm.sessions[sessionID]; exists { return ms, nil } // 申请会话槽位(可能阻塞等待容量) if err := dm.pool.Acquire(dm.ctx); err != nil { return nil, fmt.Errorf("acquire slot: %w", err) } // 创建工作目录(按隔离策略) workDir, err := dm.cfg.Isolation.Setup(dm.ctx, sessionID, dm.cfg.BaseDir) if err != nil { dm.pool.Release() return nil, fmt.Errorf("setup isolation: %w", err) } // 创建引擎会话 session := dm.eng.Session(sessionID) _ = workDir // workDir 通过引擎 Config.Cwd 配置,这里记录备用 ms := &ManagedSession{ session: session, isolation: dm.cfg.Isolation, workDir: workDir, createdAt: time.Now(), lastActive: time.Now(), } // 配置空闲超时 if dm.cfg.IdleTimeout > 0 { ms.idleTimer = NewIdleTimer(dm.cfg.IdleTimeout, func() { log.Printf("daemon: session %s idle timeout, closing", sessionID) dm.closeSession(sessionID) }) } dm.sessions[sessionID] = ms // Register with heartbeat scanner so the ticker loop can detect // staleness. The scan thread will call dm.closeSession on timeout. // // 注册到心跳扫描器, 让 ticker 循环检测超时. 扫描线程超时时 // 会调 dm.closeSession. if dm.heartbeat != nil { dm.heartbeat.Register(sessionID) } return ms, nil } // forwardEvents 将引擎会话事件转发给客户端. // 在独立 goroutine 中运行,直到连接断开或会话结束. func (dm *DaemonManager) forwardEvents(ms *ManagedSession, conn bridge.SessionConn) { serializer := bridge.NewEventSerializer(ms.session.ID()) // 历史包袱(LEGACY): 引擎 Session 目前没有独立的 Events() channel-- // 事件通过 Session.Send() 返回的 channel 获取. // 这意味着每次 Send() 调用都有独立的 channel,转发逻辑在 receiveMessages 中. // 未来改进:引擎增加 Session.Events() 全局事件 channel, // 让 forwardEvents 可以独立运行,不依赖每次 Send 的 channel. _ = serializer _ = conn // forwardEvents 目前是占位实现,实际转发在 receiveMessages 的 Send() 回调中. } // receiveMessages 接收客户端消息并调用引擎处理. func (dm *DaemonManager) receiveMessages(ms *ManagedSession, conn bridge.SessionConn) { serializer := bridge.NewEventSerializer(ms.session.ID()) for { select { case <-dm.ctx.Done(): return case <-conn.Done(): return case msg, ok := <-conn.Recv(): if !ok { return } ms.touch() // Beat on any inbound client message. This is the "keep // alive" signal -- as long as the client is sending // prompts/replies/close, the session stays fresh. // // 任何客户端入站消息都算心跳. 只要客户端在发 // prompt/reply/close, 会话就视为活跃. if dm.heartbeat != nil { dm.heartbeat.Beat(ms.session.ID()) } switch msg.Type { case bridge.ClientMessagePrompt: // Wrap handlePrompt under CrashRecovery iff configured. // Without a policy we keep the original contract (a // panic propagates, daemon-level supervisor decides // fate). With a policy, panics become retryable and // RunWithRecovery applies exponential backoff + // OnCrash / OnGiveUp callbacks. // // 若配置了 CrashRecovery 就在其下包 handlePrompt. // 无策略时保持原语义 (panic 继续传播, 由 daemon 级 // 监管决定去留); 有策略时 panic 变可重试, // RunWithRecovery 应用指数退避 + OnCrash / OnGiveUp // 回调. promptMsg := msg.Prompt runFn := func() error { return runWithRecover(func() { dm.handlePrompt(ms, conn, serializer, promptMsg) }) } if dm.crashRecovery != nil { if err := dm.crashRecovery.RunWithRecovery(dm.ctx, ms.session.ID(), runFn); err != nil { log.Printf("daemon: session %s prompt gave up: %v", ms.session.ID(), err) } } else { _ = runFn() } case bridge.ClientMessagePermissionReply: ms.session.ResolvePermission(msg.RequestID, msg.Allow) case bridge.ClientMessageClose: dm.closeSession(ms.session.ID()) return } } } } // handlePrompt 处理客户端 prompt,运行引擎并将事件流推给客户端. func (dm *DaemonManager) handlePrompt( ms *ManagedSession, conn bridge.SessionConn, serializer *bridge.EventSerializer, prompt string, ) { eventCh := ms.session.Send(dm.ctx, prompt) for evt := range eventCh { bridgeEvt := serializer.Serialize(evt) if err := conn.Send(dm.ctx, bridgeEvt); err != nil { // 连接断开,停止转发但不关闭会话(客户端可能重连) log.Printf("daemon: send failed for session %s: %v", ms.session.ID(), err) return } } } // runWithRecover runs fn and converts a panic into an error. This is // the panic-to-error bridge that lets CrashRecovery.RunWithRecovery // treat a panicked prompt-handler goroutine as a retryable crash: // RunWithRecovery needs fn to return non-nil error to trigger a // retry, but handlePrompt has void signature and any real crash is // a panic rather than a returned error. // // Normal completion (no panic) returns nil so CrashRecovery does // *not* retry -- retries only fire for genuine crashes, not for // regular prompt completion. Connection-level errors inside // handlePrompt are intentionally logged-and-ignored (they represent // client disconnect, not a session crash) and still reach this // function's caller as a normal return. // // 运行 fn 并把 panic 转为 error. 是 CrashRecovery.RunWithRecovery // 能把 panic 的 prompt 处理 goroutine 视作可重试崩溃的桥接: // RunWithRecovery 需要 fn 返回 non-nil error 才触发重试, 但 // handlePrompt 是 void 签名, 真正的崩溃表现为 panic 而非 error. // // 正常完成 (无 panic) 返回 nil, CrashRecovery **不**重试 -- 重试 // 只对真正崩溃开火, 不对正常 prompt 完成. handlePrompt 内部的 // 连接级错误有意 log-and-ignore (代表客户端断线, 非会话崩溃), // 仍作为正常 return 到达此函数调用方. func runWithRecover(fn func()) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("prompt handler panicked: %v", r) } }() fn() return nil } // closeSession 关闭并清理指定会话. func (dm *DaemonManager) closeSession(sessionID string) { dm.mu.Lock() ms, exists := dm.sessions[sessionID] if !exists { dm.mu.Unlock() return } delete(dm.sessions, sessionID) dm.mu.Unlock() // Unregister from heartbeat: normal close path (idle timeout, // client close) must not leave the sessionID tracked, or the next // timeout scan would fire closeSession again for a non-existent // session (benign no-op but noisy log). // // 从心跳摘掉: 正常关闭路径 (idle 超时 / 客户端主动关闭) 不能留着 // sessionID 在 heartbeat 里, 否则下次超时扫描会对已失踪会话再调 // closeSession (虽无害但日志噪音大). if dm.heartbeat != nil { dm.heartbeat.Unregister(sessionID) } // 停止空闲计时器 if ms.idleTimer != nil { ms.idleTimer.Stop() } // 关闭客户端连接 ms.mu.RLock() conn := ms.conn ms.mu.RUnlock() if conn != nil { conn.Close() } // 清理隔离环境 if err := ms.isolation.Teardown(context.Background(), sessionID); err != nil { log.Printf("daemon: teardown isolation for %s: %v", sessionID, err) } // 释放槽位(唤醒等待容量的其他请求) dm.pool.Release() } // Shutdown 优雅关闭 DaemonManager. // 停止接受新连接,等待现有会话完成或超时. func (dm *DaemonManager) Shutdown(ctx context.Context) error { if dm.heartbeat != nil { dm.heartbeat.Stop() } dm.cancel() // 通知所有内部 goroutine 退出 dm.pool.Close() // 让阻塞的 Acquire 立即返回 dm.transport.Close() // 关闭传输层,停止接受新连接 // 关闭所有现有会话 dm.mu.Lock() sessionIDs := make([]string, 0, len(dm.sessions)) for id := range dm.sessions { sessionIDs = append(sessionIDs, id) } dm.mu.Unlock() for _, id := range sessionIDs { dm.closeSession(id) } // 等待所有 goroutine 退出 done := make(chan struct{}) go func() { dm.wg.Wait() close(done) }() select { case <-done: return nil case <-ctx.Done(): return ctx.Err() } } // SessionCount 返回当前活跃会话数(监控用). func (dm *DaemonManager) SessionCount() int { dm.mu.RLock() defer dm.mu.RUnlock() return len(dm.sessions) } // sessionHeartbeat 是每个会话的心跳状态(预留,由 HeartbeatService 管理). type sessionHeartbeat struct { lastBeat time.Time mu sync.Mutex } func (sh *sessionHeartbeat) update() { sh.mu.Lock() sh.lastBeat = time.Now() sh.mu.Unlock() } func (sh *sessionHeartbeat) isAlive(timeout time.Duration) bool { sh.mu.Lock() defer sh.mu.Unlock() return time.Since(sh.lastBeat) < timeout }