package engine // subagent_registry.go 实现子 Agent 注册表. // // 注册表维护所有子 Agent 的引用,支持查找,列出和监控. // 主要用于后台运行的子 Agent 的管理--前台同步子 Agent 通常不需要注册. // // 线程安全:所有操作都通过 sync.RWMutex 保护. import ( "fmt" "sync" ) // SubAgentRegistry 是子 Agent 注册表,管理所有活跃的子 Agent. // // 生命周期:创建后使用,不再需要时调用 Close() 释放资源. type SubAgentRegistry struct { mu sync.RWMutex agents map[string]*SubAgent // id -> SubAgent onComplete []func(sa *SubAgent) // 子 Agent 完成时的回调列表 // 升华改进(ELEVATED): stopCh 用于 Close() 时通知所有 watchCompletion goroutine 退出-- // 若某个 SubAgent 因 bug 永远不关闭 done channel,goroutine 会永久阻塞. // stopCh 提供最后一道退出路径:Registry 关闭时,未完成的 watchCompletion 全部退出. // 替代方案:<不加 stopCh,依赖 SubAgent 保证 done 总会关闭> // - 否决:依赖 SubAgent 实现正确性,防御深度不足;Close() 是标准 Go 资源管理惯例. stopCh chan struct{} stopOnce sync.Once } // NewSubAgentRegistry 创建一个新的子 Agent 注册表. func NewSubAgentRegistry() *SubAgentRegistry { return &SubAgentRegistry{ agents: make(map[string]*SubAgent), stopCh: make(chan struct{}), } } // Close 停止注册表,通知所有 watchCompletion goroutine 退出. // 幂等(多次调用安全).通常在 Engine.Close() 时调用. func (r *SubAgentRegistry) Close() { r.stopOnce.Do(func() { close(r.stopCh) }) } // Register 注册一个子 Agent. // 如果同 ID 的子 Agent 已存在,返回错误. func (r *SubAgentRegistry) Register(sa *SubAgent) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.agents[sa.ID]; exists { return fmt.Errorf("subagent %s already registered", sa.ID) } r.agents[sa.ID] = sa // 启动监控 goroutine,当子 Agent 完成时触发回调 go r.watchCompletion(sa) return nil } // Get 按 ID 获取子 Agent. func (r *SubAgentRegistry) Get(id string) (*SubAgent, bool) { r.mu.RLock() defer r.mu.RUnlock() sa, ok := r.agents[id] return sa, ok } // List 列出所有注册的子 Agent. func (r *SubAgentRegistry) List() []*SubAgent { r.mu.RLock() defer r.mu.RUnlock() result := make([]*SubAgent, 0, len(r.agents)) for _, sa := range r.agents { result = append(result, sa) } return result } // ListByStatus 列出指定状态的子 Agent. func (r *SubAgentRegistry) ListByStatus(status SubAgentStatus) []*SubAgent { r.mu.RLock() defer r.mu.RUnlock() var result []*SubAgent for _, sa := range r.agents { if sa.GetStatus() == status { result = append(result, sa) } } return result } // Remove 从注册表中移除子 Agent. // 不会取消正在运行的子 Agent--调用者应先调用 Cancel(). func (r *SubAgentRegistry) Remove(id string) bool { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.agents[id]; !exists { return false } delete(r.agents, id) return true } // CancelAll 取消所有正在运行的子 Agent. func (r *SubAgentRegistry) CancelAll() { r.mu.RLock() agents := make([]*SubAgent, 0, len(r.agents)) for _, sa := range r.agents { agents = append(agents, sa) } r.mu.RUnlock() for _, sa := range agents { if sa.GetStatus() == SubAgentStatusRunning { sa.Cancel() } } } // Count 返回注册表中的子 Agent 数量. func (r *SubAgentRegistry) Count() int { r.mu.RLock() defer r.mu.RUnlock() return len(r.agents) } // OnComplete 注册一个子 Agent 完成时的回调函数. // 当任何注册的子 Agent 完成运行(成功,失败或取消)时,回调会被调用. func (r *SubAgentRegistry) OnComplete(fn func(sa *SubAgent)) { r.mu.Lock() defer r.mu.Unlock() r.onComplete = append(r.onComplete, fn) } // watchCompletion 监控子 Agent 的完成状态,完成时触发回调. // // 精妙之处(CLEVER): select 同时监听 sa.done 和 r.stopCh-- // 正常路径:sa.done 关闭(runLoop defer 保证),触发回调. // 异常路径:r.Close() 关闭 stopCh(Registry 生命周期结束),goroutine 安全退出, // 避免因 SubAgent 意外挂起导致永久泄漏. func (r *SubAgentRegistry) watchCompletion(sa *SubAgent) { select { case <-sa.done: // 正常完成,继续触发回调 case <-r.stopCh: // Registry 已关闭,静默退出(不触发回调,SubAgent 可能处于未定义状态) return } // 精妙之处(CLEVER): 在锁内复制 callbacks slice,锁外执行回调-- // 锁内持有期间执行回调可能导致死锁(回调内部可能尝试加锁). // 复制 slice 是 O(n),但回调数量通常极少(<5),开销可忽略. r.mu.RLock() callbacks := make([]func(sa *SubAgent), len(r.onComplete)) copy(callbacks, r.onComplete) r.mu.RUnlock() for _, fn := range callbacks { fn(sa) } } // Summary 返回注册表的摘要信息. func (r *SubAgentRegistry) Summary() map[SubAgentStatus]int { r.mu.RLock() defer r.mu.RUnlock() summary := make(map[SubAgentStatus]int) for _, sa := range r.agents { status := sa.GetStatus() summary[status]++ } return summary }