package engine // team.go 实现 Team 协调器(Leader + N Workers 模式). // // 模块定位: // // Team = Leader Engine + N Worker SubAgents + InboxRouter(同进程,MemoryInbox). // Leader 是拥有完整上下文的主 Agent; // Workers 是并发执行的功能专用子 Agent(能力由 AgentDefinition 约束). // // 核心设计决策: // 1. P0:Worker 工具集由 AgentDefinition 预先确定,无动态权限请求. // 权限冒泡(Worker→Leader 申请权限)是 P1 特性-- // 精妙之处(CLEVER): P0 先跑通"并发 Workers + 结果汇报"主链路, // 权限冒泡作为可选扩展插入,不破坏现有接口. // 替代方案:P0 就实现权限冒泡(复杂度翻倍,验证主链路困难). // 2. task-notification XML 注入 Leader 消息流-- // 精妙之处(CLEVER): 不用 channel/回调通知 Leader,而是作为"user 消息"注入-- // 让 Leader 在下一轮对话中自然读到 Worker 完成状态,无需额外接口. // XML 格式让模型可以结构化解析,纯文本容易混淆. // 替代方案:直接 callback 函数(打破 Leader 对话流,需要额外同步). // 3. PermissionHandler nil = 自动批准-- // 精妙之处(CLEVER): nil 语义明确(自动批准),而非"未配置报错"-- // 单测和快速原型不需要实现 PermissionHandler,降低接入门槛. // 替代方案:nil 时报错(增加测试负担). import ( "bytes" "context" cryptorand "crypto/rand" "encoding/hex" "encoding/json" "encoding/xml" "fmt" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/inbox" "git.flytoex.net/yuanwei/flyto-agent/pkg/permission" "git.flytoex.net/yuanwei/flyto-agent/pkg/tasklist" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools/builtin" ) // contextWithTeammateMessageSender 是 builtin.WithTeammateMessageSender 的 engine 包别名. // // 精妙之处(CLEVER): 重新导出符号让 team.go 内部代码无需直接 import builtin // (其他地方已导入), 同时保持调用点语义清晰. teamMessageSender 实现 builtin // 包定义的接口, 通过此函数注入到 ctx, 跨包传递的胶水层。 func contextWithTeammateMessageSender(ctx context.Context, sender builtin.TeammateMessageSender) context.Context { return builtin.WithTeammateMessageSender(ctx, sender) } // PermissionHandler is the consumer-layer permission confirmation interface. // CLI consumers pop a terminal dialog; SDK consumers can auto-approve / deny; // HTTP consumers can push to a frontend over WebSocket. // // PermissionHandler 是消费层实现的权限确认接口. // CLI 消费层弹出确认框; SDK 消费层可以自动批准或拒绝; // HTTP 消费层可通过 WebSocket 推到前端. // // 升华改进(ELEVATED): 接口而非函数指针-- // 消费层可以实现有状态的 PermissionHandler(如记录审批历史,UI 对话框). // 替代方案:func(ctx, req) (bool, string)(无状态,无法携带 UI 上下文). // // Shape: synchronous callback. Team Leader calls // HandlePermissionRequest synchronously when a Worker SubAgent bubbles up // a tool permission ask; Leader blocks for approval/deny + optional // updatedInput rewrite. // // 形态: 同步回调. Worker SubAgent 冒泡权限请求时, Team Leader 同步调 // HandlePermissionRequest, 阻塞等批准/拒绝 + 可选 updatedInput 改写. type PermissionHandler interface { // HandlePermissionRequest handles a Worker's permission request. // Returns approved=true to allow, false to deny. // updatedInput, when non-nil, replaces the original tool input (consumer-layer // sanitization, e.g. rewriting Bash commands or restricting file paths). // reason is a human-readable explanation surfaced in logs and tool-result errors. // // HandlePermissionRequest 处理来自 Worker 的权限请求. // approved=true 批准, false 拒绝. // updatedInput 非 nil 时替换原始 tool input (消费层 sanitize, 如改写 Bash 命令 // 或限制文件路径). // reason 是人类可读说明, 写入日志并显示在 tool-result 错误里. HandlePermissionRequest(ctx context.Context, req inbox.PermissionRequestPayload) (approved bool, updatedInput map[string]any, reason string) } // TeamConfig 是创建 Team 的配置. type TeamConfig struct { // LeaderEngine 是 Leader Agent 引擎 LeaderEngine *Engine // PermissionHandler 消费层实现的权限确认(nil = 自动批准所有请求) PermissionHandler PermissionHandler // SharedTaskList 是 Team 共享任务清单 (可选). // 非 nil 时, Worker/Leader 的 ctx 会携带 TaskListProvider, 四个 shared_task_* // 工具可正常使用 (add_shared_task / list_shared_tasks / claim_shared_task / // complete_shared_task). nil 时工具返回 "not in a Team" 错误. // // 跨行业扩展: 消费层可传入任意 Store 实现 (tasklist.New(myStore)) -- // 医疗用 HIPAA 合规存储, 金融用 PostgreSQL, 仓储用 WMS DB 等. // 编程客户可用 tasklist.NewMarkdownStore 和 Anthropic Claude Code 互操作. // // 升华改进(ELEVATED): 可选字段 -- 不配置时 Team 退化为 P0 (Leader+Workers 单向 // 汇报), 配置后升级为 peer-to-peer + shared task board 完整 Agent Teams 体验. SharedTaskList *tasklist.TaskList } // WorkerSpec 定义一个 Worker 的规格. type WorkerSpec struct { // AgentType 使用的 Agent 类型(从 LeaderEngine.agentRegistry 查找) // 空字符串默认为 "general-purpose" AgentType string // Prompt 分配给该 Worker 的任务提示 Prompt string // Description 任务描述(用于追踪) Description string // Model 覆盖默认模型("" = 使用 AgentDefinition 的 Model 或继承 Leader) Model string } // WorkerResult 是单个 Worker 的执行结果. type WorkerResult struct { WorkerID string AgentType string Description string Result string Error error Duration time.Duration } // taskNotificationTmpl is the XML template for Worker completion notifications. // // carries the caller-supplied task intent (WorkerSpec.Description) // so the Leader can reverse-map a result to the business task that was // assigned. Without it, three concurrent Explore-type Workers running // different prompts look identical in Leader's inbox (random WorkerID + // same agent-type + different summary), and Leader cannot reliably decide // which path a summary belongs to. // // taskNotificationTmpl 是 Worker 完成通知的 XML 模板. // // 字段携带调用方填入的任务意图 (WorkerSpec.Description), 让 // Leader 能把一条结果反向映射回"这是哪个业务任务". 没有它, 三个并发跑不同 // 提示的 Explore 类型 Worker 在 Leader 收件箱里完全同形 (随机 WorkerID + // 同 agent-type + 不同 summary), Leader 无法可靠判定一条 summary 属于哪条路径. // // 精妙之处(CLEVER): XML 格式让 LLM 可以结构化解析-- // 模型内置了 XML 理解能力,比 JSON 更自然地嵌入对话消息中. // 替代方案:JSON(不如 XML 在对话消息中直观可读). const taskNotificationTmpl = ` %s %s %s %s %s %d ` // Team 管理一组并行 Worker,协调 Leader 与 Workers 的通信. // // 升华改进(ELEVATED): Team 是 Agent 协作的第一层抽象-- // 上层可以组合多个 Team(嵌套协调),实现层级式 Agent 网络. // 跨行业扩展:制造场景的"工程师 + 多个机器人臂",金融场景的"分析师 + 多个数据抓取器". // 替代方案:直接在 Engine 中内置 Worker 调度(耦合高,难以测试). type Team struct { cfg TeamConfig // router 是 Agent Teams peer-to-peer 通讯的消息总线 (对齐 Anthropic Claude Code // Agent Teams v2.1.32, 2026-02-05 发布). // // 当前状态 (2026-04-16 L1330 MVP 接线完成): // - Worker 构造时 (runWorker) 注入 IncomingInbox = router.Inbox(workerID), // 并通过 context 传递 teamMessageSender (from=workerID, router=t.router). // - Leader Engine 同样挂载 router.Inbox("leader") 作为 IncomingInbox, 接受 // Worker 主动发来的消息 (不止 task-notification, 还可以是中途求助/提问). // - send_message 内置工具从 ctx 读 sender, 调用 router.Send 投递消息. // - Worker/Leader 的 runLoop 每轮开始 Poll IncomingInbox, 收到消息包装成 // XML 注入对话流, 同时发 flyto.Event 给 Observer. // // 主动拒绝接线的 Anthropic 特性 (已评估, 登记于 core/docs/agent_teams.md): // - shared task list 走文件 + file locking → 改为 TaskList 接口 + 多 Store 实现 // (L1330 C2/C3 交付), 避免硬编码路径对医疗/金融合规的伤害. // - TeammateIdle/TaskCreated/TaskCompleted 三个独立 shell hook → 复用 // flyto.EventObserver 事件总线, 不架构回退 L1326 收敛. // - FLYTO_EXPERIMENTAL_AGENT_TEAMS=1 env flag → 默认启用, "不假设场景" 原则. // - no nested teams / lead fixed / no session resumption → 不照抄研究阶段保守限制. // // 2026-04-14 session 早期曾判断 router 是 dead code 建议删除, 该判断错误: // 错误基础是不知道 Anthropic 已在 2 个月前推出 Agent Teams. 路径订正后 // 确认保留并登记 MVP 接线 TODO, 于 2026-04-16 交付. router *inbox.Router mu sync.Mutex workers []*SubAgent } // NewTeam 创建 Team 实例, 同时布线 Leader 的 Agent Teams 收件箱. // // 精妙之处(CLEVER): Leader 的 IncomingInbox 在 NewTeam 里直接赋值 -- // 调用方只需在构造 Engine 时不设 IncomingInbox (默认), 由 NewTeam 接管. // 已设置的 IncomingInbox 会被保留 (允许 SaaS 场景注入自定义 Inbox 实现). // // 替代方案: <要求调用方显式调用 Engine.SetIncomingInbox()> - // 否决: 多一步骤容易漏, 默认不接线的 Team 等于退化成 P0 单向模式. func NewTeam(cfg TeamConfig) *Team { t := &Team{ cfg: cfg, router: inbox.NewRouter(), } // Leader 收件箱自动接线 (Worker → Leader 主动求助 / 补充材料等场景). // Engine 字段为 nil 时赋值; 非 nil 则保留调用方的显式选择 (例如注入审计 Inbox). if cfg.LeaderEngine != nil { if cfg.LeaderEngine.incomingInbox == nil { cfg.LeaderEngine.incomingInbox = t.router.Inbox("leader") } if cfg.LeaderEngine.agentName == "" { cfg.LeaderEngine.agentName = "leader" } // Wire the Leader Engine for the Worker→Leader permission bubble path: // teamRouter lets runLoop send MsgPermissionResponse back; teamPermissionHandler // is the consumer-layer decision maker (nil = auto-approve, see godoc). // // 把 Leader Engine 接到 Worker→Leader 权限冒泡路径: teamRouter 让 runLoop // 能发 MsgPermissionResponse 回去; teamPermissionHandler 是消费层决策者 // (nil = 自动批准, 见 godoc). cfg.LeaderEngine.teamRouter = t.router cfg.LeaderEngine.teamPermissionHandler = cfg.PermissionHandler } return t } // ContextForLeader 返回一个包装了 Leader TeammateMessageSender + TaskListProvider // 的 context. 调用方在 Engine.Run 之前用这个 context 替换原 ctx, Leader 即可 // 调用 send_message / add_shared_task / list_shared_tasks / 等工具. // // 使用示例: // // team := engine.NewTeam(teamCfg) // ctx := team.ContextForLeader(context.Background()) // ch := leaderEngine.Run(ctx, "协调 Worker 完成 X") // // 精妙之处(CLEVER): 把"给 Leader 绑定发件身份 + 共享清单"的复杂性藏在 // Team 对象里 -- 调用方不需要知道 builtin.WithTeammateMessageSender 等底层 // API, 也不需要手动构造 teamMessageSender. func (t *Team) ContextForLeader(ctx context.Context) context.Context { ctx = contextWithTeammateMessageSender(ctx, teamMessageSender{ router: t.router, from: "leader", }) ctx = builtin.WithTaskListProvider(ctx, t.providerFor("leader")) return ctx } // providerFor 创建一个绑定到指定 agent 名的 TaskListProvider. // 返回 nil 时表示 Team 未配置 SharedTaskList, 调用方应安全处理. // // 精妙之处(CLEVER): Provider 是值语义 (teamTaskListProvider struct 直接返回), // 每个 Worker 拿到的 Provider 只绑定自己的 actor 名, 不会污染其他 Worker. func (t *Team) providerFor(actorName string) builtin.TaskListProvider { return teamTaskListProvider{ list: t.cfg.SharedTaskList, actor: actorName, } } // teamTaskListProvider 实现 builtin.TaskListProvider 接口. // 值类型 + 不可变字段: 无需锁, 跨 goroutine 安全. type teamTaskListProvider struct { list *tasklist.TaskList actor string } // SharedTaskList 返回 Team 共享 TaskList (可能为 nil, 工具需自行处理). func (p teamTaskListProvider) SharedTaskList() *tasklist.TaskList { return p.list } // ActorName 返回当前 Agent 名 (用于 Claim 时的 by 字段). func (p teamTaskListProvider) ActorName() string { return p.actor } // RunWorkers 并发启动所有 Worker,等待全部完成(或 ctx 取消),返回结果列表. // // 执行流程: // 1. 为每个 WorkerSpec 创建 SubAgent(从 Leader 的 AgentRegistry 解析工具集) // 2. 并发 RunSync 所有 Worker // 3. 每个 Worker 完成后构建 task-notification XML,注入 Leader 消息队列 // 4. 等待所有 Worker 完成(或 ctx 取消) // // 精妙之处(CLEVER): 用 WaitGroup 管理并发,用独立 goroutine 收集结果-- // 不需要复杂的 fan-out/fan-in channel 设计. // 替代方案:errgroup(需要在第一个错误时取消,但 Team 希望收集所有结果). func (t *Team) RunWorkers(ctx context.Context, specs []WorkerSpec) ([]WorkerResult, error) { if len(specs) == 0 { return nil, nil } leader := t.cfg.LeaderEngine if leader == nil { return nil, fmt.Errorf("team: LeaderEngine is required") } results := make([]WorkerResult, len(specs)) var wg sync.WaitGroup for i, spec := range specs { wg.Add(1) go func(idx int, ws WorkerSpec) { defer wg.Done() results[idx] = t.runWorker(ctx, ws) // Worker 完成后注入 task-notification 到 Leader 消息队列 r := results[idx] status := "completed" summary := r.Result if r.Error != nil { status = "failed" summary = r.Error.Error() } // 截断摘要(避免 XML 过长) if len(summary) > 300 { summary = summary[:300] + "..." } agentType := r.AgentType if agentType == "" { agentType = "general-purpose" } // 升华改进(ELEVATED): 使用 xml.EscapeText 转义字符串字段-- // summary 来自 Worker 输出(用户/模型可控),agentType/WorkerID 来自配置, // 任何含 <>&"' 的内容若不转义都会破坏 XML 结构甚至允许注入. // 替代方案: - // 否决:encoding/xml 实现更全面,覆盖所有 XML 1.0 非法控制字符. // // r.Description is escaped for the same reason: caller-supplied // strings can contain XML metacharacters. // // r.Description 同样转义: 调用方传入字符串可能含 XML 元字符. xml := fmt.Sprintf(taskNotificationTmpl, xmlEscape(r.WorkerID), status, // 来自代码常量,无需转义 xmlEscape(r.Description), xmlEscape(summary), xmlEscape(agentType), r.Duration.Milliseconds(), ) leader.enqueueTeamNotification(xml) }(i, spec) } wg.Wait() return results, nil } // runWorker 运行单个 Worker,返回其执行结果. func (t *Team) runWorker(ctx context.Context, spec WorkerSpec) WorkerResult { start := time.Now() leader := t.cfg.LeaderEngine // 默认 AgentType agentType := spec.AgentType if agentType == "" { agentType = "general-purpose" } desc := spec.Description if desc == "" { desc = fmt.Sprintf("Worker (%s)", agentType) } // 解析工具集 var allowedMap map[string]bool if leader.agentRegistry != nil { def, ok := leader.agentRegistry.Get(agentType) if ok { // 从 Leader 注册表中获取父工具集名称 parentTools := leader.tools.Names() resolved := leader.agentRegistry.ResolveToolset(def, parentTools) if len(resolved) > 0 { allowedMap = make(map[string]bool, len(resolved)) for _, name := range resolved { allowedMap[name] = true } } } } // 确定模型 model := spec.Model if model == "" && leader.agentRegistry != nil { if def, ok := leader.agentRegistry.Get(agentType); ok { model = def.Model } } // 创建 SubAgent 配置 cfg := &SubAgentConfig{ Description: desc, Model: model, AllowedTools: allowedMap, MaxTurns: 10, } // Fork SubAgent sa := SpawnSubAgent(leader, cfg) // Agent Teams peer-to-peer 接线: Worker 的 incomingInbox = router.Inbox(workerID). // 精妙之处(CLEVER): 赋值在 SpawnSubAgent 之后, 不侵入 SpawnSubAgent 签名 -- // SDK 用户直接用 SpawnSubAgent 时默认无 Teams 能力, 只有 Team.runWorker // 显式接线. 职责隔离清晰. // 替代方案: 把 router 塞进 SubAgentConfig (所有 SubAgent 都背上 Teams 依赖). sa.incomingInbox = t.router.Inbox(sa.ID) // Permission-bubble wiring: build a permission.Checker whose Handler turns // permission.Request into MsgPermissionRequest, sends to Leader, blocks for // MsgPermissionResponse, returns permission.Response. SubAgent.runLoop calls // Check() before each tool.Execute (see subagent.go permission gate). // // Mode=ModeDefault forces every tool through the Ask path so the Leader-side // PermissionHandler always gets a chance to decide. Worker SubAgents in a // Team have stricter intent than standalone ones -- the team contract is // "Leader supervises Worker tool use" -- so default-Ask is the right baseline. // // 权限冒泡接线: 构造一个 permission.Checker, 它的 Handler 把 // permission.Request 转成 MsgPermissionRequest 发给 Leader, 阻塞等 // MsgPermissionResponse, 返回 permission.Response. SubAgent.runLoop 在每次 // tool.Execute 前调 Check() (见 subagent.go 权限闸). // // Mode=ModeDefault 强制每个工具走 Ask 路径, 让 Leader 侧 PermissionHandler // 总有机会决策. Team 内的 Worker SubAgent 比独立 SubAgent 意图更严 // (team 契约是 "Leader 监督 Worker 工具使用"), 默认 Ask 是合适基线. sa.permissionChecker = permission.NewEngine(permission.ModeDefault, t.bubblingHandlerFor(sa)) // 记录到 Team workers 列表 t.mu.Lock() t.workers = append(t.workers, sa) t.mu.Unlock() // 同步运行 -- ctx 携带 teamMessageSender + TaskListProvider 供内置工具使用. // 精妙之处(CLEVER): 通过 context 注入 request-scoped 值 (而非 SubAgentConfig 字段) -- // SubAgent.runLoop 不需要知道 sender/provider 的存在, 工具自己从 ctx 读, // 遵守 Go 惯例, 解耦彻底. workerCtx := contextWithTeammateMessageSender(ctx, teamMessageSender{ router: t.router, from: sa.ID, }) workerCtx = builtin.WithTaskListProvider(workerCtx, t.providerFor(sa.ID)) result, err := sa.RunSync(workerCtx, spec.Prompt) duration := time.Since(start) return WorkerResult{ WorkerID: sa.ID, AgentType: agentType, Description: desc, Result: result, Error: err, Duration: duration, } } // --- Engine 层 team-notification 注入 --- // enqueueTeamNotification 将 task-notification XML 加入待注入队列. // 线程安全:多个 Worker goroutine 可并发调用. // // 注意:方法名故意拼写为 "enqueu"(而非 "enqueue")-- // 这是内部 API,与 Go 惯例一致(无需 exported). // 精妙之处(CLEVER): 持有 notifState.mu 锁追加,drain 时原子交换-- // 生产者(Worker goroutine)和消费者(runLoop 主 goroutine)完全解耦. func (e *Engine) enqueueTeamNotification(xml string) { e.notifState.mu.Lock() defer e.notifState.mu.Unlock() e.notifState.pendingNotifications = append(e.notifState.pendingNotifications, xml) } // xmlEscape 转义 XML 特殊字符,防止 XML 注入. // // 精妙之处(CLEVER): 用 encoding/xml.EscapeText 而非手写替换规则-- // 除了 &<>"' 五个基本字符,XML 1.0 还有控制字符需要处理, // 手写规则遗漏控制字符会导致解析器行为未定义. // 替代方案:strings.NewReplacer 手写 5 条规则(遗漏控制字符,不够全面). func xmlEscape(s string) string { var buf bytes.Buffer if err := xml.EscapeText(&buf, []byte(s)); err != nil { // EscapeText 理论上不会出错(写 bytes.Buffer 不会失败),这里是防御性处理 return s } return buf.String() } // bubblingHandlerFor returns a permission.Handler closure bound to a SubAgent // (Worker). The handler converts permission.Request → MsgPermissionRequest, // routes it to the Leader via t.router.Send("leader", ...), then blocks on the // SubAgent's pendingPermissions chan for the matching MsgPermissionResponse. // // The mapping (approved bool, updatedInput map[string]any, reason string) → // permission.Response is lossy by design: the Leader-side handler has already // produced the terminal Allow/Deny -- no DecisionAsk further bubble-up. // RiskLevel and SuggestedRules are left zero (consumer-layer terminal decisions // don't surface those engine-internal hints). // // The returned closure is value-bound to (t, sa); each Worker gets its own // instance so concurrent Workers don't share pending-map keys or Send paths. // // bubblingHandlerFor 返回一个绑到 SubAgent (Worker) 的 permission.Handler 闭包. // handler 把 permission.Request 转成 MsgPermissionRequest, 经 // t.router.Send("leader", ...) 路由给 Leader, 然后阻塞在 SubAgent 的 // pendingPermissions chan 上等匹配的 MsgPermissionResponse. // // (approved bool, updatedInput map[string]any, reason string) → // permission.Response 的映射故意是 lossy 的: Leader 侧 handler 已产生终态 // Allow/Deny -- 不存在 DecisionAsk 继续冒泡. RiskLevel / SuggestedRules 留零 // (消费层终态决策不暴露这些引擎内部 hint). // // 返回的闭包绑 (t, sa) 值; 每个 Worker 拿独立实例, 并发 Worker 不共享 pending // map key 或 Send 路径. func (t *Team) bubblingHandlerFor(sa *SubAgent) permission.Handler { return func(ctx context.Context, req *permission.Request) (*permission.Response, error) { requestID, err := newPermissionRequestID() if err != nil { return nil, fmt.Errorf("permission bubble: generate request id: %w", err) } payload := inbox.PermissionRequestPayload{ RequestID: requestID, ToolName: req.ToolName, ToolUseID: req.ToolID, Description: req.Message, Input: req.Input, } ch := sa.registerPendingPermission(requestID) msg, err := inbox.NewMessage(sa.ID, "leader", inbox.MsgPermissionRequest, payload) if err != nil { // Clean up the pending entry we just registered before returning -- // otherwise a future stray response would block on a chan no one reads. // // 清理刚注册的 pending entry, 否则未来误投响应会卡死无人读的 chan. sa.permMu.Lock() delete(sa.pendingPermissions, requestID) sa.permMu.Unlock() return nil, fmt.Errorf("permission bubble: build message: %w", err) } if err := t.router.Send("leader", msg); err != nil { sa.permMu.Lock() delete(sa.pendingPermissions, requestID) sa.permMu.Unlock() return nil, fmt.Errorf("permission bubble: route to leader: %w", err) } respPayload, err := sa.waitForPermissionResponse(ctx, requestID, ch) if err != nil { return nil, err } decision := permission.DecisionDeny if respPayload.Approved { decision = permission.DecisionAllow } return &permission.Response{ Decision: decision, Reason: respPayload.Reason, UpdatedInput: respPayload.UpdatedInput, }, nil } } // newPermissionRequestID returns a random 16-byte hex string suitable for a // PermissionRequestPayload.RequestID. crypto/rand keeps it unguessable so the // inbox response routing can't be spoofed by Worker-side adversarial inputs. // // newPermissionRequestID 返回 16 字节 hex 随机串, 给 // PermissionRequestPayload.RequestID 使用. crypto/rand 保证不可猜, Worker 侧 // 恶意输入无法伪造 inbox 响应路由. func newPermissionRequestID() (string, error) { buf := make([]byte, 16) if _, err := cryptorand.Read(buf); err != nil { return "", err } return hex.EncodeToString(buf), nil } // handleWorkerPermissionRequest resolves a Worker permission bubble end-to-end: // decode the payload, ask the consumer handler (or auto-approve when nil), // build the response payload, send it back through teamRouter to msg.From. // // Called by Engine.runLoop's incomingInbox poll when msg.Type == // MsgPermissionRequest. Errors at any stage emit a deny response so the Worker // doesn't hang indefinitely -- fail-closed matches permission engine semantics // (no handler / handler timeout / decode failure all result in deny). // // handleWorkerPermissionRequest 端到端解决 Worker 权限冒泡: 解码 payload, // 询问消费层 handler (nil 时自动批准), 构造响应 payload, 经 teamRouter 发回 // msg.From. // // Engine.runLoop 的 incomingInbox poll 收到 msg.Type == MsgPermissionRequest 时 // 调用. 任一阶段出错都发 deny 响应让 Worker 不卡死 -- fail-closed 对齐 // permission engine 语义 (无 handler / handler 超时 / 解码失败都拒绝). func (e *Engine) handleWorkerPermissionRequest(ctx context.Context, msg *inbox.Message) { if e.teamRouter == nil { // No router = can't reply. Should not happen if NewTeam wired things, but // silently dropping is safer than panicking on a control-protocol path. // // 没 router 就回不了消息. NewTeam 接线后不该出现, 但 control 协议路径上 // 静默丢弃比 panic 安全. return } var payload inbox.PermissionRequestPayload if err := json.Unmarshal(msg.Payload, &payload); err != nil { e.sendPermissionResponse(msg.From, inbox.PermissionResponsePayload{ RequestID: payload.RequestID, Approved: false, Reason: fmt.Sprintf("leader: decode permission request failed: %v", err), }) return } approved := true var updatedInput map[string]any reason := "auto-approved (no PermissionHandler configured)" if e.teamPermissionHandler != nil { approved, updatedInput, reason = e.teamPermissionHandler.HandlePermissionRequest(ctx, payload) } if e.observer != nil { // Emit the full request input alongside the decision so audit logs / SaaS // dashboards can review what arguments the Worker was about to use -- // without this, deny postmortems would have to ask the Worker to repeat. // SECURITY: "input" may carry secrets (Bash argv with tokens, file_path // with internal paths). Consumer-supplied observers MUST redact before // persisting to shared destinations. // // 把完整请求 input 和决策一起发出去, 让审计日志 / SaaS 面板能复盘 Worker // 准备使用的参数 -- 没这个的话, 拒绝事故复盘只能让 Worker 重述. // 安全: "input" 可能带敏感信息 (Bash argv 含 token, file_path 含内部路径). // 消费层 observer 写入共享目的地前必须脱敏. e.observer.Event("team_permission_resolved", map[string]any{ "request_id": payload.RequestID, "worker_id": msg.From, "tool_name": payload.ToolName, "tool_use_id": payload.ToolUseID, "description": payload.Description, "input": payload.Input, "approved": approved, "reason": reason, }) } e.sendPermissionResponse(msg.From, inbox.PermissionResponsePayload{ RequestID: payload.RequestID, Approved: approved, UpdatedInput: updatedInput, Reason: reason, }) } // sendPermissionResponse wraps the response payload into a Message and routes // it to the target Worker via teamRouter. Errors are logged via observer if // available -- there is no recovery path beyond logging (the Worker will // eventually time out via its own ctx). // // sendPermissionResponse 把响应 payload 包成 Message, 经 teamRouter 路由给目标 // Worker. 错误若有 observer 则记录 -- 除了日志没别的恢复路径 (Worker 最终会 // 通过自己的 ctx 超时). func (e *Engine) sendPermissionResponse(workerID string, payload inbox.PermissionResponsePayload) { msg, err := inbox.NewMessage("leader", workerID, inbox.MsgPermissionResponse, payload) if err != nil { if e.observer != nil { e.observer.Event("team_permission_send_error", map[string]any{ "worker_id": workerID, "phase": "build_message", "error": err.Error(), }) } return } if err := e.teamRouter.Send(workerID, msg); err != nil { if e.observer != nil { e.observer.Event("team_permission_send_error", map[string]any{ "worker_id": workerID, "phase": "route_send", "error": err.Error(), }) } } } // drainTeamNotifications 取走并清空待注入的 task-notification 列表. // 线程安全. func (e *Engine) drainTeamNotifications() []string { e.notifState.mu.Lock() defer e.notifState.mu.Unlock() if len(e.notifState.pendingNotifications) == 0 { return nil } drained := e.notifState.pendingNotifications e.notifState.pendingNotifications = nil return drained } // --- Agent Teams peer-to-peer 通讯 --- // teammateMessageTmpl 是同伴消息的 XML 模板. // // 精妙之处(CLEVER): 与 task-notification 使用不同的 XML 标签 -- // 模型通过标签区分"Worker 完成汇报"和"同伴主动发消息", 语义不混淆. // 跨行业扩展: XML 格式中立, 金融/医疗/仓储场景都可直接解析. // 替代方案: JSON payload 嵌入 user message (模型解析 JSON 不如 XML 自然). const teammateMessageTmpl = ` %s ` // formatTeammateMessageXML 将 inbox.Message 格式化为 teammate-message XML, // 供 runLoop 注入到对话流. // // 精妙之处(CLEVER): 把 Payload (json.RawMessage) 转字符串后 XML 转义 -- // 保留 JSON 结构供模型解读, 同时转义防止内容包含 等 // 序列破坏标签边界 (安全防护, 对齐 team.go xmlEscape 相同防御). func formatTeammateMessageXML(msg *inbox.Message) string { payload := string(msg.Payload) return fmt.Sprintf(teammateMessageTmpl, xmlEscape(msg.From), xmlEscape(msg.To), xmlEscape(string(msg.Type)), xmlEscape(msg.ID), xmlEscape(payload), ) } // teamMessageSender 实现 builtin.TeammateMessageSender 接口. // 每个 Worker 创建时绑定自己的 from 字段, 通过 router 投递消息. // // 精妙之处(CLEVER): 值类型 (非指针) + 不可变字段 -- // Worker 并发调 send_message 时无需锁, from/router 只读. // 替代方案: 全局单例 + Worker ID 从 ctx 读取 (额外 ctx.Value 调用开销). type teamMessageSender struct { router *inbox.Router from string } // SendTeammateMessage 实现 builtin.TeammateMessageSender 接口. // content 会被包装成 JSON 对象 ({"content": "..."}) 作为 Message.Payload. // // 升华改进(ELEVATED): 消息结构统一 JSON 封装, 消费层/观察者无需猜测 payload 格式 -- // 所有 send_message 产生的消息都是 {"content": string} 结构, 可预测可解析. // 跨行业扩展: 金融场景想带结构化字段 (价格/数量/标的), 可扩展 SendTeammateMessageStructured // 接口接受 any payload, 不影响 send_message 基础用法. // 替代方案: 直接把 content 作为 raw bytes 塞进 Payload (消费者需要猜测编码). func (s teamMessageSender) SendTeammateMessage(ctx context.Context, to, content string, msgType inbox.MessageType) error { if s.router == nil { return fmt.Errorf("teamMessageSender: router is nil") } if to == "" { return fmt.Errorf("teamMessageSender: to is required") } payload := map[string]string{"content": content} msg, err := inbox.NewMessage(s.from, to, msgType, payload) if err != nil { return fmt.Errorf("teamMessageSender: build message: %w", err) } return s.router.Send(to, msg) }