package inbox // router.go 实现 Agent 收件箱路由器. // // 模块定位: // // InboxRouter 管理 agentName → Inbox 的映射, // 是 Agent 间通信的中间邮局--知道每个 Agent 的"收件地址". // // 核心设计决策: // 1. 自动创建(lazy init):Inbox(agentName) 若不存在则创建-- // 精妙之处(CLEVER): 无需提前注册所有 Agent,支持动态加入的 Worker. // Send 也通过 Inbox() 自动创建,确保消息不会因收件人尚未注册而丢失. // 替代方案:显式 Register(需要提前注册,启动顺序有依赖). // 2. factory 函数可替换--测试时可注入 mock Inbox,生产用 MemoryInbox. // 精妙之处(CLEVER): 工厂模式让 Router 本身不依赖具体实现, // 是依赖倒置的又一应用. // 替代方案:Router 内部直接 NewMemoryInbox()(无法测试替换). // 3. Close 关闭所有 Inbox--Router 生命周期结束时统一清理, // 避免 goroutine 泄漏(等待中的 Recv 都会收到 ErrInboxClosed). import ( "sync" ) // Router 管理 agentName → Inbox 的映射. // // 升华改进(ELEVATED): Router 是 Agent 协作的中枢-- // 未来可在 Send 前做消息审计,优先级排序,流量控制. // 替代方案:每个 Agent 直接持有对方的 Inbox 引用(耦合高,无法集中管理). type Router struct { mu sync.RWMutex inboxes map[string]Inbox factory func() Inbox // Inbox 创建工厂,默认 NewMemoryInbox } // NewRouter 创建 Router,默认使用 MemoryInbox. func NewRouter() *Router { return &Router{ inboxes: make(map[string]Inbox), factory: func() Inbox { return NewMemoryInbox() }, } } // NewRouterWithFactory 使用自定义 Inbox 工厂创建 Router(主要用于测试). func NewRouterWithFactory(factory func() Inbox) *Router { return &Router{ inboxes: make(map[string]Inbox), factory: factory, } } // Inbox 获取指定 Agent 的收件箱,不存在则自动创建. // // 精妙之处(CLEVER): 双重检查锁(读锁快路径 + 写锁慢路径)-- // 大多数情况下 Inbox 已存在,读锁足够; // 只有首次访问才需要写锁,争用极少. // 替代方案:始终持有写锁(性能损失大). func (r *Router) Inbox(agentName string) Inbox { // 快路径:读锁检查 r.mu.RLock() box, ok := r.inboxes[agentName] r.mu.RUnlock() if ok { return box } // 慢路径:写锁创建 r.mu.Lock() defer r.mu.Unlock() // 再次检查(避免重复创建) if box, ok = r.inboxes[agentName]; ok { return box } box = r.factory() r.inboxes[agentName] = box return box } // Send 向指定 Agent 发送消息. // 若目标 Agent 不存在,自动创建其 Inbox. func (r *Router) Send(to string, msg *Message) error { return r.Inbox(to).Send(msg) } // Remove 移除指定 Agent 的 Inbox 记录(不关闭 Inbox). // 适用于 Agent 退出但 Inbox 还有等待者的场景. // // 注意:调用者负责在适当时机调用 box.Close(). func (r *Router) Remove(agentName string) { r.mu.Lock() defer r.mu.Unlock() delete(r.inboxes, agentName) } // RemoveAndClose 移除并关闭指定 Agent 的 Inbox. func (r *Router) RemoveAndClose(agentName string) { r.mu.Lock() box, ok := r.inboxes[agentName] if ok { delete(r.inboxes, agentName) } r.mu.Unlock() if ok { _ = box.Close() } } // Close 关闭所有 Inbox. // Router 生命周期结束时调用,确保所有等待中的 Recv goroutine 能退出. // // 精妙之处(CLEVER): 先快照再关闭,避免持锁时调用 Close(可能导致死锁 // 如果 Close 内部也需要加锁的话). // 替代方案:持写锁逐一关闭(风险:Close 内部有锁时死锁). func (r *Router) Close() error { r.mu.Lock() snapshot := make([]Inbox, 0, len(r.inboxes)) for _, box := range r.inboxes { snapshot = append(snapshot, box) } r.inboxes = make(map[string]Inbox) // 清空 r.mu.Unlock() var lastErr error for _, box := range snapshot { if err := box.Close(); err != nil { lastErr = err } } return lastErr } // AgentNames 返回当前注册的所有 Agent 名称(按任意顺序,仅供调试). func (r *Router) AgentNames() []string { r.mu.RLock() defer r.mu.RUnlock() names := make([]string, 0, len(r.inboxes)) for name := range r.inboxes { names = append(names, name) } return names }