package engine // subagent.go 实现子 Agent 运行器(Fork 模式). // // 精妙之处(CLEVER): ForkedAgent 共享父 agent 的 prompt cache. // API 层传完整工具列表(和父 agent 一样)→ cache key 一致 → 缓存命中. // 实际执行时用 canUseTool 函数限制 → 运行时拦截. // // 经济账:多传 3K 没用的工具描述,换来 10K 系统提示的缓存命中. // 按 Sonnet 定价:多花 $0.009(3K×$3/M),省 $0.027(9K×$3/M 变成 0.9K×$0.3/M). // 每个子 agent 调用净省 ~$0.018.一个复杂任务 10 个子 agent = 省 $0.18. // // 安全性不降低:canUseTool 在运行时拦截,子 agent 请求被禁工具会收到错误消息, // 模型会自己换工具. // // 替代方案:独立 Engine 实例(原来的设计),每个子 agent 独立缓存, // 首轮都要付全量系统提示 token. // // 关键设计: // - 工具过滤:API 层传完整列表(cache key 一致),运行时 canUseTool 限制 // - 缓存共享:复用父 engine 的 API 客户端和系统提示 // - 上下文隔离:独立的消息历史,独立的轮数限制 // - 进度跟踪:工具调用计数,token 计数,最近活动 // - 可取消:通过 context 传播取消信号 // - 支持同步和异步两种运行模式 import ( "context" "encoding/json" "fmt" "path/filepath" "strings" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/internal/transport" "git.flytoex.net/yuanwei/flyto-agent/internal/transport/retry" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" "git.flytoex.net/yuanwei/flyto-agent/pkg/inbox" "git.flytoex.net/yuanwei/flyto-agent/pkg/permission" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // SubAgentStatus 是子 Agent 的运行状态. type SubAgentStatus string const ( SubAgentStatusPending SubAgentStatus = "pending" // 已创建,尚未启动 SubAgentStatusRunning SubAgentStatus = "running" // 正在运行 SubAgentStatusCompleted SubAgentStatus = "completed" // 已完成 SubAgentStatusFailed SubAgentStatus = "failed" // 运行失败 SubAgentStatusCancelled SubAgentStatus = "cancelled" // 被取消 ) // SubAgentProgress 追踪子 Agent 的运行进度. type SubAgentProgress struct { mu sync.Mutex ToolUseCount int // 工具调用计数 InputTokens int // 累计输入 token 数 OutputTokens int // 累计输出 token 数 Activities []string // 最近的工具调用描述(最多 5 个) } // AddActivity 添加一条活动记录(保留最近 5 条). func (p *SubAgentProgress) AddActivity(activity string) { p.mu.Lock() defer p.mu.Unlock() p.Activities = append(p.Activities, activity) if len(p.Activities) > 5 { p.Activities = p.Activities[len(p.Activities)-5:] } } // Snapshot 返回进度的快照副本(线程安全). func (p *SubAgentProgress) Snapshot() SubAgentProgress { p.mu.Lock() defer p.mu.Unlock() activities := make([]string, len(p.Activities)) copy(activities, p.Activities) return SubAgentProgress{ ToolUseCount: p.ToolUseCount, InputTokens: p.InputTokens, OutputTokens: p.OutputTokens, Activities: activities, } } // SubAgent 是一个子 Agent 实例(Fork 模式). // // 精妙之处(CLEVER): 不再创建独立的 Engine 实例,而是 fork 父 Engine 的关键资源. // 共享:API 客户端,系统提示,完整工具列表(缓存命中) // 独立:消息历史,权限限制,轮数限制,abort controller type SubAgent struct { // ID 是子 Agent 的唯一标识符 ID string // Description 是子 Agent 的任务描述 Description string // Model 是子 Agent 使用的模型 Model string // ParentEngine 是创建此子 Agent 的父引擎(fork 的源头). // 使用 EngineRef 接口而非 *Engine,使 SubAgent 可在测试中接受 mock. // 注:SpawnSubAgent 函数仍接受 *Engine;此处字段仅用于 activity/observer/context 访问. ParentEngine EngineRef // Status 是当前运行状态 Status SubAgentStatus // Progress 是运行进度跟踪器 Progress *SubAgentProgress // StartTime 是启动时间 StartTime time.Time // EndTime 是结束时间 EndTime time.Time // Result 是运行结果文本 Result string // Error 是运行错误(如果有) Error error // Cwd 是子 Agent 的工作目录(可能与父 Agent 不同,如 worktree 模式) Cwd string // fork 模式的核心字段 // 升华改进(ELEVATED): Provider 是必填--引擎不预设任何供应商. // 替代方案: - 否决:G9-C 移除了 Anthropic 耦合. provider flyto.ModelProvider // 注入的模型 Provider(必须非 nil) systemPrompt string // 复用父 engine 的系统提示(cache key 一致) allToolDefs []api.ToolDef // 完整工具定义列表(API 层传给模型,cache key 一致) allowedTools map[string]bool // 实际允许使用的工具(运行时拦截);nil = 不限制 toolRegistry *tools.Registry // 父 engine 的工具注册表(用于执行工具) maxTurns int // 最大轮数限制 chain *QueryChainTracking // 查询链追踪(从父 engine fork 而来) historyMessages []query.Message // 预置历史消息(在任务 prompt 之前;记忆提取用) memoryDirRestrict string // 非空时 Edit/Write 只允许写入此目录(记忆提取用) // sharedSystemPromptBytes 是父 Engine 实际发送给 API 的系统提示字节(模块 13.3). // // 精妙之处(CLEVER): nil 时用 systemPrompt string 字段(向后兼容旧路径), // 非 nil 时直接用此字节(绕过字符串转换,零拷贝)-- // 确保父子 Agent 发送给 Anthropic 的系统提示内容哈希完全一致, // 父 Engine 建立的 prompt cache slot 被 SubAgent 100% 命中. // 替代方案:<始终用 systemPrompt string> // - 否决:重新 buildSystemPrompt 时若有时间戳等动态内容,字节会不一致. sharedSystemPromptBytes []byte // silentEvents 由 SubAgentConfig.SilentEvents 注入, 见 SubAgentConfig // 上同名字段 godoc. runLoop 读此值决定是否 emit SubAgentStart/End 和 // 是否转发业务事件到父引擎 Run channel. // // silentEvents is populated from SubAgentConfig.SilentEvents; see the // godoc on SubAgentConfig.SilentEvents. runLoop reads this flag to // decide whether to emit SubAgentStart/End and whether to forward // business events to the parent engine's Run channel. silentEvents bool // allowedSubAgentTypes 限制此 SubAgent 可 spawn 的子 Agent 类型(来自 AllowedSubAgentTypes). // // 精妙之处(CLEVER): 在 SpawnSubAgent 时把 []string 转为 map[string]struct{} 存储-- // canUseTool 中每次检查 Agent 工具调用都是 O(1),而非 O(n) 遍历切片. // nil = 无限制(可 spawn 任何类型). allowedSubAgentTypes map[string]struct{} // cancel 用于取消运行 cancel context.CancelFunc // done 在运行完成时关闭 done chan struct{} // mu 保护状态字段的并发访问 mu sync.Mutex // incomingInbox 是 Agent Teams peer-to-peer 通讯的收件入口 (与 Engine.incomingInbox 等价). // // 精妙之处(CLEVER): 在 SubAgent 上单独存放 (而非复用 ParentEngine.incomingInbox) -- // SubAgent 是 Worker, 应该只收发自己邮箱的消息; 复用父 Engine 的会让 Worker // 收到 Leader 的消息, 跨身份污染. // // 由 engine.Team.runWorker 显式赋值: sa.incomingInbox = router.Inbox(sa.ID). // 普通 SubAgent (不在 Team 内的) 默认 nil, runLoop poll 会跳过, 零开销. // // 替代方案: <加到 SubAgentConfig 字段> - 否决: 把 Teams 概念硬塞进通用 SubAgent, // 不在 Team 场景的 SDK 用户也得理解这个字段. incomingInbox inbox.Inbox // permissionChecker is the permission engine consulted before each tool.Execute. // Set by Team.runWorker for Worker SubAgents -- its Handler bubbles // MsgPermissionRequest up to the Leader inbox. Standalone SubAgents // (Agent-tool spawned, not in a Team) leave this nil and rely on canUseTool // alone -- preserving zero-overhead path for non-Team callers. // // permissionChecker 是工具执行前查询的权限引擎. Team.runWorker 给 Worker // SubAgent 设置 -- 其 Handler 把 MsgPermissionRequest 冒泡到 Leader inbox. // 独立 SubAgent (Agent 工具 spawn, 不在 Team 内) 留 nil 仅依赖 canUseTool, // 保留非 Team 调用方的零开销路径. permissionChecker permission.Checker // pendingPermissions tracks outstanding permission-bubble requests keyed by // RequestID. Worker's bubblingHandler creates a 1-buffered chan, registers it // here, sends MsgPermissionRequest to Leader, then blocks on chan <-. // runLoop's inbox poll, on seeing MsgPermissionResponse with matching RequestID, // decodes the payload and writes it to the chan, unblocking the handler. // // pendingPermissions 记录已发出未响应的权限冒泡请求, key 为 RequestID. Worker // 的 bubblingHandler 建一个容量 1 的 chan 注册到这里, 发出 MsgPermissionRequest // 后阻塞等. runLoop poll 收到匹配 RequestID 的 MsgPermissionResponse 后解码 // 写入 chan 唤醒 handler. pendingPermissions map[string]chan inbox.PermissionResponsePayload permMu sync.Mutex } // SubAgentConfig 是创建子 Agent 的配置. type SubAgentConfig struct { // Description 任务描述 Description string // Model 使用的模型(为空则继承父 Agent 的模型) Model string // AllowedTools 实际允许使用的工具(权限层限制). // nil = 不限制(可使用所有工具),非 nil = 白名单. AllowedTools map[string]bool // MaxTurns 最大轮数(0 = 默认 10) MaxTurns int // Cwd 工作目录(为空则继承父 Agent 的工作目录) Cwd string // Chain 父查询链(用于 fork 子链追踪) Chain *QueryChainTracking // HistoryMessages 预置的对话历史消息(在任务 prompt 之前追加到消息列表). // // 升华改进(ELEVATED): 早期实现 runForkedAgent 通过 cacheSafeParams 传入父对话历史, // SubAgent 能看到完整的父对话消息,记忆提取才能分析"发生了什么". // 早期方案 Go 的 runMemoryExtraction 只传了一条 prompt 消息,SubAgent 没有任何历史可分析. // 我们把历史消息作为显式参数传给 SubAgent,修复这个核心缺口. // 替代方案:在 prompt 里把历史消息拼成文本(丢失结构,模型难以理解轮次边界). HistoryMessages []query.Message // MemoryDirRestrict 非空时,Edit/Write 工具只允许写入此目录下的路径. // // 升华改进(ELEVATED): 早期实现 createAutoMemCanUseTool 用 isAutoMemPath(filePath) // 在 canUseTool 中检查路径.我们把"允许写入哪个目录"提升到 SubAgentConfig 层, // 不是硬编码在 canUseTool 里,而是配置驱动--记忆提取传 memDir, // 其他场景传空字符串(不限制路径). // 替代方案:在 SubAgent 内部硬编码特殊规则(不通用,无法用于其他场景). MemoryDirRestrict string // SharedSystemPromptBytes 是父 Engine 已发送给 Anthropic API 的系统提示词字节(可选). // // 精妙之处(CLEVER): Prompt Cache 命中依赖内容字节完全一致-- // 父 Engine 把实际发送的系统提示字节直接传给 SubAgent, // SubAgent 用相同字节发请求,哈希必然一致,100% 命中缓存 slot. // // nil 时退化为独立渲染(向后兼容,SubAgent 自己调 buildSystemPrompt()). // 非 nil 时,SubAgent 的 runLoop 优先使用此字节,跳过渲染. // // 替代方案: // - 否决:时间戳,随机值等动态内容会导致字节不一致,缓存 miss. SharedSystemPromptBytes []byte // AllowedSubAgentTypes 限制此子 Agent 可以通过 Agent 工具 spawn 的子 Agent 类型. // // 升华改进(ELEVATED): 对应 AgentDefinition.AllowedSubAgentTypes 字段-- // 在 SpawnSubAgent 时从 AgentDefinition 传入,在 canUseTool 中运行时检查. // 例如:Explore agent 的 AllowedSubAgentTypes=["Plan","Verification"], // 防止只读 agent 通过 spawn general-purpose 子 agent 来绕过工具限制. // nil/空 = 无限制(可 spawn 任何 AgentType). // 注意:Agent 工具本身可能被 globalDisallowed 禁用(见 DefaultGlobalDisallowedTools), // 此字段仅在 Agent 工具可用时生效. AllowedSubAgentTypes []string // SilentEvents 非 true 时子 agent 不 emit SubAgentStart/End 生命周期事件 // 也不向父引擎 Run channel 转发业务事件 (sa.Run() 自己的 channel 仍收 // 业务事件供本地消费者 — RunSync 的 resultTexts 收集等). 用于后台静默 // 任务 (runMemoryExtraction) — 不污染用户可见的事件流. 默认 false // (正常子 agent 要可见). // // When SilentEvents is true, the sub-agent skips SubAgentStart/End // lifecycle events and does not forward business events to the parent // engine's Run channel (sa.Run()'s own channel still receives business // events for local consumers — e.g. RunSync's resultTexts collector). // Used for background silent tasks (runMemoryExtraction) to avoid // polluting user-visible event streams. Default false (normal // sub-agents should be visible). SilentEvents bool } // subAgentIDCounter 用于生成子 Agent 的唯一 ID. var subAgentIDCounter struct { mu sync.Mutex counter int } // nextSubAgentID 生成下一个子 Agent ID. // 精妙之处(CLEVER): 时间戳+自增计数器双重保证 ID 唯一性-- // 即使同一毫秒创建多个子 Agent,counter 也能区分它们. // 比纯 UUID 更可读,方便在日志和 UI 中追踪. func nextSubAgentID() string { subAgentIDCounter.mu.Lock() defer subAgentIDCounter.mu.Unlock() subAgentIDCounter.counter++ return fmt.Sprintf("subagent_%d_%d", time.Now().UnixMilli(), subAgentIDCounter.counter) } // SpawnSubAgent 从父 Engine fork 一个子 agent. // // 精妙之处(CLEVER): 共享父 engine 的 API 客户端,系统提示,完整工具列表(缓存命中), // 但独立消息历史和权限限制.这样每个子 agent 的首轮 API 调用都能命中 // 父 engine 已建立的 prompt cache,省下 10K+ 系统提示 token 的重新传输费用. // // 参数: // - parentEngine: 父 Engine 实例,fork 的源头 // - cfg: 子 Agent 的配置 // // 返回创建的子 Agent(尚未启动,需要调用 Run 或 RunBackground). func SpawnSubAgent(parentEngine *Engine, cfg *SubAgentConfig) *SubAgent { model := cfg.Model if model == "" { model = parentEngine.cfg.Model } cwd := cfg.Cwd if cwd == "" { cwd = parentEngine.cfg.Cwd } maxTurns := cfg.MaxTurns if maxTurns <= 0 { maxTurns = 10 // 子 Agent 默认 10 轮 } // Provider 继承:SubAgent 直接使用父 engine 的 Provider(在 sa 构造体中赋值). // 升华改进(ELEVATED): 引擎不再假设 Anthropic--Provider 是必填的注入依赖. // 替代方案: - 否决:G9-C 移除了 APIKey/BaseURL/BearerAuth. // 模块 13.3:Prompt Cache 共享--优先使用父 Engine 已渲染的系统提示字节. // // 精妙之处(CLEVER): 三条路径的优先级: // 1. cfg.SharedSystemPromptBytes 非 nil → 直接用(调用方显式传入,最高优先级) // 2. parentEngine.cfg.SharedSystemPromptBytes 非 nil → 父 Config 有共享字节 // 3. 都为 nil → 调用 buildSystemPrompt() 独立渲染(向后兼容旧行为) // // 使用 []byte 而非 string 存储--SubAgent.runLoop 中直接赋给 req.System(json.RawMessage), // 零拷贝传入 HTTP 请求体,省去 []byte→string→[]byte 的来回转换开销. // // 替代方案:<始终 buildSystemPrompt()> // - 否决:独立渲染可能引入微小差异(如时间戳注入),导致缓存 miss, // 每个 SubAgent 首轮都要付全量系统提示 token 费用. var sharedBytes []byte switch { case len(cfg.SharedSystemPromptBytes) > 0: sharedBytes = cfg.SharedSystemPromptBytes case len(parentEngine.cfg.SharedSystemPromptBytes) > 0: sharedBytes = parentEngine.cfg.SharedSystemPromptBytes } // systemPrompt 仅在 sharedBytes 为 nil 时使用(向后兼容路径) systemPrompt := "" if len(sharedBytes) == 0 { systemPrompt = parentEngine.buildSystemPrompt() } // 完整工具定义列表(API 层传给模型,cache key 一致) // // 升华改进(ELEVATED): 原实现调用 parentEngine.buildToolDefs(), // 但 buildToolDefs 内部会调用 toolSchemaTracker.Track(),每次都推进 turnCount. // SpawnSubAgent 额外调用一次 → turnCount+1 → 稳定性判断偏移 → stableCount 可能变化 // → cache_control 打到不同工具 → tools 数组与父 engine 实际发送的不一致 → cache miss. // 修复:读取 buildToolDefs 上一次调用的缓存快照(toolDefsSnapshot),纯读不推进 turnCount. // SpawnSubAgent 在工具执行阶段调用,父 engine 本轮 buildToolDefs 已运行,快照必然非 nil. // 替代方案:<拆出 buildToolDefsNoTrack> // - 否决:需要双入口维护,且调用方顺序错误时仍会 bug;缓存快照方案在引擎内部保证正确性. toolDefs := parentEngine.toolDefsSnapshot() // 处理 AllowedTools:如果传的是非 nil,过滤掉 Agent 工具防止递归 allowedTools := cfg.AllowedTools if allowedTools != nil { delete(allowedTools, "Agent") // 防止子 agent 递归创建子 agent } // 将 AllowedSubAgentTypes 切片转为 map,供 canUseTool O(1) 查找 // // 精妙之处(CLEVER): 转换在 SpawnSubAgent 时完成(一次性), // 而非在每次 canUseTool 调用时重建--对于频繁调用 Agent 工具的 agent 尤为重要. // 替代方案:在 canUseTool 里每次遍历 []string - 否决:O(n) 查找,工具调用热路径性能差. var allowedSubAgentTypes map[string]struct{} if len(cfg.AllowedSubAgentTypes) > 0 { allowedSubAgentTypes = make(map[string]struct{}, len(cfg.AllowedSubAgentTypes)) for _, t := range cfg.AllowedSubAgentTypes { allowedSubAgentTypes[t] = struct{}{} } } sa := &SubAgent{ ID: nextSubAgentID(), Description: cfg.Description, Model: model, ParentEngine: parentEngine, Status: SubAgentStatusPending, Progress: &SubAgentProgress{}, Cwd: cwd, provider: parentEngine.cfg.Provider, systemPrompt: systemPrompt, sharedSystemPromptBytes: sharedBytes, allToolDefs: toolDefs, allowedTools: allowedTools, allowedSubAgentTypes: allowedSubAgentTypes, toolRegistry: parentEngine.tools, maxTurns: maxTurns, done: make(chan struct{}), historyMessages: cfg.HistoryMessages, memoryDirRestrict: cfg.MemoryDirRestrict, silentEvents: cfg.SilentEvents, pendingPermissions: make(map[string]chan inbox.PermissionResponsePayload), } // 查询链追踪:从父链 fork 子链 // 精妙之处(CLEVER): 链追踪是可选的--cfg.Chain 为 nil 时子 agent 无追踪, // 不影响任何功能.这让追踪可以渐进式接入. if cfg.Chain != nil { sa.chain = cfg.Chain.Fork(sa.ID) } // 埋点:SubAgent 创建 parentEngine.observer.Event("subagent_spawned", map[string]any{ "subagent_id": sa.ID, "model": model, "description": cfg.Description, "max_turns": maxTurns, }) return sa } // canUseTool 检查子 agent 是否允许使用该工具. // // 精妙之处(CLEVER): API 层传完整工具列表给模型(和父 agent 一样,cache key 一致), // 但运行时通过此函数拦截不允许的工具.模型请求被禁工具时收到错误消息, // 会自己换工具--这个行为比在 API 层裁剪工具列表更好, // 因为保持了 cache key 一致性,节省了每个子 agent 的首轮费用. // // 升华改进(ELEVATED): 加入 rawInput 参数支持路径级别的访问控制-- // 当 MemoryDirRestrict 非空时,Edit/Write 工具的 file_path 必须在该目录内. // 早期实现 createAutoMemCanUseTool 用 isAutoMemPath(filePath) 做同样的检查, // 但是硬编码在专用的 canUseTool 工厂函数中.我们把限制提升为配置参数, // 通用于任何"只允许写特定目录"的场景(记忆/梦境/沙箱等). // 替代方案:专用 canUseTool 闭包(每种场景都要写一个,不可复用). func (sa *SubAgent) canUseTool(toolName string, rawInput json.RawMessage) bool { // MemoryDirRestrict:Edit/Write 只允许写入指定目录 if sa.memoryDirRestrict != "" && (toolName == "Edit" || toolName == "Write") { var inp map[string]any if err := json.Unmarshal(rawInput, &inp); err != nil || inp == nil { return false // 无法解析 = 拒绝 } fp, _ := inp["file_path"].(string) if !isUnderDir(fp, sa.memoryDirRestrict) { return false } } // AllowedSubAgentTypes:检查 Agent 工具调用的目标 agent 类型 // // 升华改进(ELEVATED): 早期实现 resolveAgentTools() 在工具列表解析时处理 allowedAgentTypes, // 通过字符串语法 "Agent(Explore)" 把约束嵌入工具名. // 我们在运行时 canUseTool 中检查-- // (1) 语法更简洁(结构化字段 vs 字符串解析), // (2) 和 allowedTools / memoryDirRestrict 检查统一在一处,逻辑内聚, // (3) 允许动态修改约束(将来 SubAgent 可以在运行时升级权限). // 替代方案:<在 API 层裁剪 Agent 工具的描述来隐式限制> // - 否决:模型依赖工具描述中的 agent_type 列表,裁剪会破坏模型行为. if toolName == "Agent" && len(sa.allowedSubAgentTypes) > 0 { // 解析 rawInput 取出 agent_type 字段 var inp struct { AgentType string `json:"agent_type"` } if err := json.Unmarshal(rawInput, &inp); err != nil { return false // 输入无法解析,拒绝 } agentType := inp.AgentType if agentType == "" { agentType = "general-purpose" // 空类型等同于 general-purpose } _, allowed := sa.allowedSubAgentTypes[agentType] return allowed } if sa.allowedTools == nil { // nil = 不限制,但始终排除 Agent 工具防止递归 return toolName != "Agent" } return sa.allowedTools[toolName] } // isUnderDir 检查 path 是否在 dir 目录下(含 dir 本身). // // 精妙之处(CLEVER): filepath.Clean 后用 HasPrefix(cleanDir+"/") 判断, // 而非直接 HasPrefix(cleanDir)--避免 dir="/foo/bar" 误匹配 path="/foo/barbaz". // 同时处理 path == dir 的边界情况(完全相等也算"在目录下"). func isUnderDir(path, dir string) bool { if path == "" || dir == "" { return false } cleanPath := filepath.Clean(path) cleanDir := filepath.Clean(dir) return cleanPath == cleanDir || strings.HasPrefix(cleanPath, cleanDir+string(filepath.Separator)) } // Run 执行子 agent 的查询循环. // 返回事件 channel,调用者可以从中读取子 Agent 产生的所有事件. // 运行完成后 channel 会被关闭. // // 和父 engine 的 runLoop 类似,但: // 1. 系统提示复用父 engine 的(cache 命中) // 2. API 请求传完整工具列表(cache key 一致) // 3. 工具执行前用 canUseTool 过滤(运行时拦截) // 4. 独立的消息历史和轮数限制 // 5. 不执行 Dream,Memory 提取等后台任务(精简版 runLoop) // // # Channel contract (2026-04-20 行为变更) // // 返回的 channel 发送**裸类型事件** (*TextEvent / *ToolUseEvent / // *TurnEndEvent / *ErrorEvent / *DoneEvent / *TextDeltaEvent / // *ToolResultEvent / *WarningEvent), 对齐父 Engine.Run() 的 channel // shape. 消费端可以直接 `for evt := range ch; switch v := evt.(type) { // case *TextEvent: ... }` 无需 unwrap. // // **Breaking**: 2026-04-20 之前本 channel 发送的是 `*SubAgentEvent` // 包装 (SubAgentID + Inner). 直接消费 sa.Run() channel 的外部 SDK 若 // 在 type switch 上 `case *SubAgentEvent:` 现在将收不到任何事件; 请 // 改为 type-switch 裸事件类型. SubAgentID 归属现在走父引擎 Run channel // 的 `*SubAgentEvent` 包装 (在父 engine 视角) 或 observer 路径 // (后台子 agent), 不在 sa.Run() 自己的 channel 上重复暴露. // // # Channel contract (2026-04-20 behavior change) // // The returned channel sends **bare-typed events** (*TextEvent, // *ToolUseEvent, *TurnEndEvent, *ErrorEvent, *DoneEvent, *TextDeltaEvent, // *ToolResultEvent, *WarningEvent), aligning with the parent // Engine.Run() channel shape. Consumers can directly // `for evt := range ch; switch v := evt.(type) { case *TextEvent: ... }` // without unwrapping. // // **Breaking**: prior to 2026-04-20 this channel sent `*SubAgentEvent` // wrappers (SubAgentID + Inner). External SDKs that consumed // sa.Run() channel and type-switched on `case *SubAgentEvent:` will now // receive nothing; update to type-switch the bare event types. // SubAgentID attribution is exposed via `*SubAgentEvent` wrappers on // the parent engine's Run channel (from the parent engine's viewpoint) // or via the observer path (background sub-agents), not re-emitted on // sa.Run()'s own channel. func (sa *SubAgent) Run(ctx context.Context, prompt string) <-chan Event { ch := make(chan Event, 64) go func() { defer close(ch) defer func() { // 精妙之处(CLEVER): recover 防止 SubAgent 内部 panic 崩溃整个引擎进程. // sa.done 在 runLoop 内部的 defer 中关闭,panic 同样会触发它, // 所以 Wait() 不会永久阻塞--但没有 recover,panic 仍会崩溃程序. if r := recover(); r != nil { sa.mu.Lock() if sa.Error == nil { sa.Error = fmt.Errorf("subagent panic: %v", r) } sa.Status = SubAgentStatusFailed sa.mu.Unlock() } }() // 升华改进(ELEVATED): ActivitySubAgent 追踪--SubAgent 执行期间引用计数 +1, // Engine.Close() 的空闲等待会感知到有活跃 SubAgent,避免过早关闭. // 替代方案:<不追踪 SubAgent,只追踪工具执行> // - 否决原因:SubAgent 可能运行数十秒,不追踪会导致引擎误判"空闲"并关闭. if sa.ParentEngine != nil && sa.ParentEngine.Activity() != nil { sa.ParentEngine.Activity().Start(ActivitySubAgent) defer sa.ParentEngine.Activity().Stop(ActivitySubAgent) } sa.runLoop(ctx, prompt, ch) }() return ch } // RunSync 同步运行子 Agent,阻塞直到完成,返回最终结果文本. // 这是最常用的同步模式--父 Agent 等待子 Agent 完成后使用结果. func (sa *SubAgent) RunSync(ctx context.Context, prompt string) (string, error) { return sa.RunSyncWithCallback(ctx, prompt, nil) } // ToolUseInfo 是工具调用的简化描述,供 RunSyncWithCallback 回调使用. // 只暴露 Dream/MemoryExtraction 等消费方关心的字段,不暴露完整 Input map. type ToolUseInfo struct { Name string // 工具名(如 "Edit", "Write", "Bash") FilePath string // file_path 参数(仅 Edit/Write 有值,其他工具为空) } // RunSyncWithCallback 同步运行子 Agent,每个 assistant 轮次完成后调用 onTurn 回调. // // onTurn 接收: // - text:本轮 assistant 的文本内容(可能为空,如纯工具调用轮次) // - toolUses:本轮工具调用列表(含 file_path 提取,用于 filesTouched 追踪) // // 升华改进(ELEVATED): 早期实现 runForkedAgent 接受 onMessage callback, // 但只能拿到完整 Message 对象(包含原始 content blocks). // 我们提供更高级别的抽象:text + []ToolUseInfo,消费方(Dream)不需要解析 blocks. // 替代方案:<暴露原始 []Event>--否决原因:消费方需要自己处理轮次边界(TurnEndEvent), // 增加了不必要的复杂度.onTurn 的语义更清晰:每个 assistant 轮次调用一次. // // onTurn 为 nil 时退化为 RunSync,无额外开销. func (sa *SubAgent) RunSyncWithCallback( ctx context.Context, prompt string, onTurn func(text string, toolUses []ToolUseInfo), ) (string, error) { events := sa.Run(ctx, prompt) var resultTexts []string // 当前轮次的累积状态(在 TurnEndEvent 时刷新) var turnText strings.Builder var turnToolUses []ToolUseInfo for evt := range events { switch e := evt.(type) { case *TextEvent: resultTexts = append(resultTexts, e.Text) if onTurn != nil { turnText.WriteString(e.Text) } case *ToolUseEvent: // 精妙之处(CLEVER): 从 tool_use Input map 提取 file_path-- // Edit 和 Write 工具都用 "file_path" 作为参数名(与早期方案一致). // 其他工具(Bash,Grep 等)也可能有路径参数,但我们只关心写操作. // 用类型断言从 any 安全提取字符串,空值 = 非文件工具,忽略. if onTurn != nil { info := ToolUseInfo{Name: e.ToolName} if fp, ok := e.Input["file_path"].(string); ok { info.FilePath = fp } turnToolUses = append(turnToolUses, info) } case *TurnEndEvent: // 精妙之处(CLEVER): 在 TurnEndEvent 时触发回调而非 TextEvent-- // 一个 assistant 轮次可能包含多个 text blocks(如 thinking + 回复), // 等轮次结束后合并调用一次,与早期方案 onMessage(msg: Message) 语义对齐. if onTurn != nil { onTurn(turnText.String(), turnToolUses) turnText.Reset() turnToolUses = nil } case *ErrorEvent: // 记录错误但继续处理(可能有部分结果) sa.mu.Lock() sa.Error = e.Err sa.mu.Unlock() } } sa.mu.Lock() defer sa.mu.Unlock() if sa.Error != nil && len(resultTexts) == 0 { return "", sa.Error } result := strings.Join(resultTexts, "") sa.Result = result return result, nil } // RunBackground 在后台 goroutine 中运行子 Agent. // 立即返回,不阻塞调用者. // 运行完成后可以通过 Wait() 等待结果,或通过 Status/Progress 查询进度. func (sa *SubAgent) RunBackground(prompt string) { // 精妙之处(CLEVER): 从父 Engine 的 rootCtx 派生-- // Engine.Close() cancel rootCtx 时,所有后台 SubAgent 自动收到取消信号. // 替代方案:<原方案 context.Background() 导致 SubAgent 不感知引擎关闭> ctx, cancel := context.WithCancel(sa.ParentEngine.Context()) sa.mu.Lock() sa.cancel = cancel sa.mu.Unlock() go func() { sa.ParentEngine.Observer().Event("subagent_background_start", map[string]any{ "subagent_id": sa.ID, }) events := sa.Run(ctx, prompt) var resultTexts []string for evt := range events { switch e := evt.(type) { case *TextEvent: resultTexts = append(resultTexts, e.Text) case *ErrorEvent: sa.mu.Lock() sa.Error = e.Err sa.mu.Unlock() } } sa.mu.Lock() sa.Result = strings.Join(resultTexts, "") sa.mu.Unlock() sa.ParentEngine.Observer().Event("subagent_background_complete", map[string]any{ "subagent_id": sa.ID, "status": string(sa.GetStatus()), }) }() } // Cancel 取消正在运行的子 Agent. func (sa *SubAgent) Cancel() { sa.mu.Lock() cancel := sa.cancel sa.mu.Unlock() if cancel != nil { cancel() } sa.mu.Lock() if sa.Status == SubAgentStatusRunning { sa.Status = SubAgentStatusCancelled sa.EndTime = time.Now() } sa.mu.Unlock() // 不在这里关闭 done channel,让 runLoop 来关闭 } // Wait 等待子 Agent 运行完成. // 返回运行过程中的错误(如果有). func (sa *SubAgent) Wait() error { <-sa.done sa.mu.Lock() defer sa.mu.Unlock() return sa.Error } // GetStatus 返回当前状态(线程安全). func (sa *SubAgent) GetStatus() SubAgentStatus { sa.mu.Lock() defer sa.mu.Unlock() return sa.Status } // GetResult 返回运行结果(线程安全). func (sa *SubAgent) GetResult() (string, error) { sa.mu.Lock() defer sa.mu.Unlock() return sa.Result, sa.Error } // registerPendingPermission allocates a 1-buffered chan keyed by requestID and // registers it in pendingPermissions. The bubblingHandler calls this before // sending MsgPermissionRequest, then blocks on chan <- in waitForPermissionResponse. // // registerPendingPermission 申请一个容量 1 的 chan, 以 requestID 为 key 注册到 // pendingPermissions. bubblingHandler 在发出 MsgPermissionRequest 前调用, // 然后通过 waitForPermissionResponse 阻塞等待. func (sa *SubAgent) registerPendingPermission(requestID string) chan inbox.PermissionResponsePayload { ch := make(chan inbox.PermissionResponsePayload, 1) sa.permMu.Lock() sa.pendingPermissions[requestID] = ch sa.permMu.Unlock() return ch } // dispatchPermissionResponse decodes a MsgPermissionResponse payload and writes // it to the matching pendingPermissions chan, unblocking the bubblingHandler. // Silently drops responses with no matching pending entry (late arrival after // ctx cancel cleaned up the entry, or stray broadcast) -- safer than panic. // // dispatchPermissionResponse 解码 MsgPermissionResponse payload 并写入匹配的 // pendingPermissions chan, 唤醒 bubblingHandler. 无匹配 entry 时静默丢弃 // (ctx 取消后清理迟到响应, 或误投), 比 panic 安全. func (sa *SubAgent) dispatchPermissionResponse(msg *inbox.Message) { var payload inbox.PermissionResponsePayload if err := json.Unmarshal(msg.Payload, &payload); err != nil { return } sa.permMu.Lock() ch, ok := sa.pendingPermissions[payload.RequestID] if ok { delete(sa.pendingPermissions, payload.RequestID) } sa.permMu.Unlock() if !ok { return } // chan is buffered (cap 1) and we just removed the only sender path -- writing // is non-blocking and the writer-once invariant holds. // // chan 容量 1, 上面已删除唯一发送路径 -- 写入非阻塞, "只写一次" 不变量成立. ch <- payload } // waitForPermissionResponse blocks on the response chan with a 3-way select // mirroring session.WaitForPermission: response, ctx cancel, sa.done teardown. // On cancel/teardown, deletes the pending entry to prevent a future stray // MsgPermissionResponse from blocking on a chan no one will read. // // waitForPermissionResponse 三路 select 阻塞等响应, 与 session.WaitForPermission // 一致: 响应到 / ctx 取消 / sa.done 关闭. 取消/关闭时删除 pending entry, 防止 // 未来的 MsgPermissionResponse 写入无人读的 chan. func (sa *SubAgent) waitForPermissionResponse(ctx context.Context, requestID string, ch chan inbox.PermissionResponsePayload) (inbox.PermissionResponsePayload, error) { select { case payload := <-ch: return payload, nil case <-ctx.Done(): sa.permMu.Lock() delete(sa.pendingPermissions, requestID) sa.permMu.Unlock() return inbox.PermissionResponsePayload{}, ctx.Err() case <-sa.done: sa.permMu.Lock() delete(sa.pendingPermissions, requestID) sa.permMu.Unlock() return inbox.PermissionResponsePayload{}, fmt.Errorf("subagent done before permission response") } } // runLoop 是子 Agent 的精简版查询循环. // // 升华改进(ELEVATED): 比父 engine 的 runLoop 精简很多-- // 不需要 Dream,Memory 提取,输入预处理,斜杠命令检测,模型降级等后台任务. // 子 agent 的职责单一:接收 prompt,调用 API,执行工具,返回结果. // 替代方案:复用父 engine 的完整 runLoop(职责过重,子 agent 不需要那些后台功能). func (sa *SubAgent) runLoop(ctx context.Context, prompt string, ch chan<- Event) { // Parent-engine event forwarding: two forward channels, picked in order. // // 1) Channel path (preferred): caller's tool-dispatch ctx carries an // EventEmitter injected by engine.runLoop before orchestrator // dispatch (see engine.go WithEventEmitter call site). Active when // the SubAgent runs synchronously under a live parent Run — // executeSync / executeWorktree / Team.runWorker / Dream / skill // fork. Forwards typed events straight to the parent Run channel, // giving SDK / bridge / TUI the richest shape. // // 2) Observer path (fallback): when no emitter in ctx but parent // engine has an observer, forward a lossy metadata-only payload // via observer.Event. This covers background sub-agents (bgCtx // derived from engine.Context() has no emitter) so background // tasks are no longer invisible to audit sinks / metrics. Payload // is metadata (subagent_id + event_type + lifecycle core fields), // not per-inner business fields — matches the Observer audit use // case, avoids duplicating the 17 case payload builder already // carried by pkg/bridge. // // silentEvents=true skips both paths (runMemoryExtraction). // // 父引擎事件转发: 两条转发通道按优先级选一. // // 1) Channel 路径 (首选): caller 的工具派发 ctx 带 engine.runLoop // 在 orchestrator 派发前注入的 EventEmitter (见 engine.go 的 // WithEventEmitter 调用点). 当 SubAgent 在活跃父 Run 下同步运行 // (executeSync / executeWorktree / Team.runWorker / Dream / // skill fork) 时启用. 把强类型事件直接转发到父 Run channel, // 给 SDK / bridge / TUI 最丰富的 shape. // // 2) Observer 路径 (兜底): ctx 无 emitter 但父 engine 有 observer // 时, 经 observer.Event 发出 lossy 元数据 payload. 覆盖后台子 // agent (bgCtx 从 engine.Context() 派生没 emitter), 后台任务 // 不再对审计 sink / 指标系统隐形. payload 是元数据 (subagent_id // + event_type + 生命周期核心字段), 不带内层业务字段 — 对齐 // Observer 审计用法, 避免重复 pkg/bridge 已有的 17 case // payload builder. // // silentEvents=true 两路都跳过 (runMemoryExtraction 专用). parentEmit := EventEmitterFromContext(ctx) var parentObserver EventObserver if sa.ParentEngine != nil { parentObserver = sa.ParentEngine.Observer() } canForwardChannel := parentEmit != nil && !sa.silentEvents canForwardObserver := parentEmit == nil && parentObserver != nil && !sa.silentEvents // pushEvt 是统一的事件派发辅助: 始终把 inner (bare Event) 送到 sa 自己 // 的 ch (RunSync / RunSyncWithCallback / RunBackground 等本地消费者 // type-switch *TextEvent / *ToolUseEvent / *TurnEndEvent / *ErrorEvent), // 同时在 forwardToParent=true 时把 SubAgentEvent 包装版本 emit 到父 // 引擎 Run channel (让父 SDK / SSE / TUI 看到带 SubAgentID 归属的子 // 活动). // // 历史对比: 原实现直接 `ch <- &SubAgentEvent{...}` 让 sa 自己的 ch 收 // 到的是 wrapper — RunSync/RunSyncWithCallback 的 switch 匹配不到 // *TextEvent 那一档, resultTexts 永远是空字符串 (潜在 bug). 改走 // pushEvt 后 bare event 走本地 ch, wrapper 只给父引擎用, 两侧职责分明. // // pushEvt is the unified event dispatch helper: always sends `inner` // (the bare Event) to sa's own ch (local consumers — RunSync / // RunSyncWithCallback / RunBackground — type-switch on *TextEvent / // *ToolUseEvent / *TurnEndEvent / *ErrorEvent), and additionally // forwards to the parent engine via the channel path (if available) // or the observer path (fallback for background sub-agents). // // Historical contrast: the prior implementation sent // `ch <- &SubAgentEvent{...}` directly, so sa's own ch received the // wrapper — RunSync/RunSyncWithCallback's type switch never matched // the *TextEvent case and resultTexts stayed empty (latent bug). The // pushEvt split sends the bare event locally and the wrapper upward, // separating the two responsibilities. pushEvt := func(inner Event) { ch <- inner if canForwardChannel { parentEmit(&SubAgentEvent{SubAgentID: sa.ID, Inner: inner}) } else if canForwardObserver { parentObserver.Event("subagent_event", map[string]any{ "subagent_id": sa.ID, "event_type": inner.EventType(), }) } } defer func() { // 确保 done channel 只关闭一次 select { case <-sa.done: // 已经关闭 default: close(sa.done) } // SubAgent 生命周期终态 emit 给父引擎. defer 保证从任何 return // 路径 (cancel / error / maxTurns / normal) 都能 emit, 不遗漏. // sa.EndTime / Status / Error / Result 已由各路径的 setter 写完, // 这里只做读和组装. // // Lifecycle end emission to parent engine. The defer guarantees // emission from every return path (cancel / error / maxTurns / // normal); sa.EndTime / Status / Error / Result are written by // those paths' setters — this closure only reads and assembles. if canForwardChannel || canForwardObserver { sa.mu.Lock() endTime := sa.EndTime if endTime.IsZero() { endTime = time.Now() sa.EndTime = endTime } status := sa.Status result := sa.Result var errMsg string if sa.Error != nil { errMsg = sa.Error.Error() } start := sa.StartTime sa.mu.Unlock() duration := endTime.Sub(start) truncated := truncateSubAgentResult(result) if canForwardChannel { parentEmit(&SubAgentEndEvent{ SubAgentID: sa.ID, Duration: duration, Status: status, Result: truncated, Error: errMsg, }) } else if canForwardObserver { parentObserver.Event("subagent_end", map[string]any{ "subagent_id": sa.ID, "duration_ms": duration.Milliseconds(), "status": string(status), "result": truncated, "error": errMsg, }) } } }() sa.mu.Lock() sa.Status = SubAgentStatusRunning sa.StartTime = time.Now() startTime := sa.StartTime sa.mu.Unlock() // SubAgent 生命周期起始 emit. 在 business 事件之前送达, 让父消费者先收到 // SubAgentStartEvent 再开始看到 SubAgentEvent 嵌入的 inner 事件, 消费端 // 可据此建立树形节点. // // Lifecycle start emission. Sent before any business event so parent // consumers receive SubAgentStartEvent first, then the SubAgentEvent // wrappers carrying inner events — consumers can use this to construct // tree nodes. if canForwardChannel { parentEmit(&SubAgentStartEvent{ SubAgentID: sa.ID, Description: sa.Description, Cwd: sa.Cwd, Model: sa.Model, StartTime: startTime, }) } else if canForwardObserver { parentObserver.Event("subagent_start", map[string]any{ "subagent_id": sa.ID, "description": sa.Description, "cwd": sa.Cwd, "model": sa.Model, "start_time_ms": startTime.UnixMilli(), }) } // 构建初始消息:先预置历史(来自父对话),再追加任务 prompt. // // 升华改进(ELEVATED): 早期实现 runForkedAgent 通过 cacheSafeParams 自动 // 把父对话历史注入给 fork 的子 agent,子 agent 能看到完整上下文再分析. // 我们用 historyMessages 显式传入,语义相同但更透明--调用方决定传多少历史. // 替代方案:把历史拼在 prompt 文本里(丢失消息结构,模型难以理解多轮边界). messages := make([]query.Message, 0, len(sa.historyMessages)+1) messages = append(messages, sa.historyMessages...) messages = append(messages, query.NewTextMsg(query.RoleUser, prompt)) // 追踪统计 var totalInputTokens, totalOutputTokens int turnCount := 0 const defaultMaxTokens = 8192 // 核心查询循环 for { select { case <-ctx.Done(): sa.mu.Lock() sa.Status = SubAgentStatusCancelled sa.EndTime = time.Now() sa.mu.Unlock() pushEvt(newErrorEvent(WrapError(ctx.Err(), ErrInternal, "子 agent 操作已取消"))) return default: } turnCount++ // 注入 Agent Teams peer-to-peer 消息 (来自同伴 Worker / Leader 的 teammate-message XML). // 精妙之处(CLEVER): 与 Engine.runLoop 对齐, Poll 而非 goroutine -- 每轮开始非阻塞 // 取走当前积压的所有同伴消息, 包装成 XML 加入 messages, 模型本轮请求自然读到. // 替代方案: <独立 goroutine + channel 桥接> - 否决: SubAgent 已经够多 goroutine // (cancel/done/Run/RunBackground), 加 Poll 同步注入复杂度最低. if sa.incomingInbox != nil { for { msg, err := sa.incomingInbox.Poll() if err != nil || msg == nil { break } // MsgPermissionResponse is the Leader's reply to a Worker permission // bubble (see permissionChecker / pendingPermissions). Route it to // the waiting bubblingHandler instead of injecting into the chat // stream -- the model never sees these control-protocol messages. // // MsgPermissionResponse 是 Leader 对 Worker 权限冒泡的响应 // (见 permissionChecker / pendingPermissions). 路由给等待中的 // bubblingHandler 而不是注入对话流 -- 模型不看这种 control 协议消息. if msg.Type == inbox.MsgPermissionResponse { sa.dispatchPermissionResponse(msg) continue } xml := formatTeammateMessageXML(msg) messages = append(messages, query.NewTextMsg(query.RoleUser, xml)) if sa.ParentEngine != nil { if obs := sa.ParentEngine.Observer(); obs != nil { obs.Event("teammate_message_received", map[string]any{ "from": msg.From, "to": msg.To, "type": string(msg.Type), "message_id": msg.ID, "payload_size": len(msg.Payload), "subagent_id": sa.ID, }) } } } } // 检查 maxTurns 限制 if turnCount > sa.maxTurns { sa.mu.Lock() sa.Status = SubAgentStatusCompleted sa.EndTime = time.Now() sa.mu.Unlock() pushEvt(&WarningEvent{ Code: "sub_agent_max_turns", Message: fmt.Sprintf("子 agent 已达最大轮次限制: %d", sa.maxTurns), }) break } // 调用模型 API(Provider 路径 vs legacy api.Client 路径) // 精妙之处(CLEVER): 工具列表传 allToolDefs(完整列表),不是 allowedTools 过滤后的. // 这样 cache key 与父 engine 一致 → 命中 prompt cache → 省钱. // 升华改进(ELEVATED): Provider 路径--flyto.Request 是 Provider 无关的抽象. // 不依赖 Anthropic 具体 HTTP 协议,openai/vertex/本地模型等 Provider 均可工作. // 替代方案: - 否决:G9-C 移除了 Anthropic 耦合. flytoMsgs := queryMessagesToFlyto(messages) systemStr := sa.systemPrompt if len(sa.sharedSystemPromptBytes) > 0 { systemStr = string(sa.sharedSystemPromptBytes) } flytoReq := &flyto.Request{ Model: sa.Model, MaxTokens: defaultMaxTokens, System: systemStr, Messages: flytoMsgs, } // Sub-agent path: user is indirectly waiting on this call through // the parent agent, so retry-layer failures should be attributed // to sub_agent (not main_thread or background). // // 子 agent 路径: 用户通过父 agent 间接等待此调用, 重试层失败应 // 归因到 sub_agent (非 main_thread 或 background). ctx = retry.WithQuerySource(ctx, SourceSubAgent.String()) streamCh, apiErr := sa.provider.Stream(ctx, flytoReq) if apiErr != nil { sa.mu.Lock() sa.Status = SubAgentStatusFailed sa.Error = apiErr sa.EndTime = time.Now() sa.mu.Unlock() pushEvt(&ErrorEvent{Err: apiErr}) return } // 处理流式事件 // 升华改进(ELEVATED): 早期方案需要维护 blocks 状态机(Start/Delta/Stop 三段式)-- // wire.ParseAnthropicStream 已在内部完成 block 聚合,这里直接处理语义完整的事件. var stopReason string var turnInputTokens, turnOutputTokens int hasToolUseBlocks := false completedToolUseCount := 0 // 收集 assistant 回复块(消息历史 + 工具调用列表) var assistantContent []query.Content var pendingToolCalls []tools.ToolCall for evt := range streamCh { switch e := evt.(type) { case *flyto.TextDeltaEvent: if e.Text != "" { pushEvt(&TextDeltaEvent{Text: e.Text}) } case *flyto.ThinkingDeltaEvent: // thinking 增量:SubAgent 不对外暴露(父 agent 感知即可) _ = e case *flyto.TextEvent: if e.Text != "" { pushEvt(&TextEvent{Text: e.Text}) assistantContent = append(assistantContent, query.Content{Type: query.ContentText, Text: e.Text}) } case *flyto.ThinkingEvent: // thinking 完整内容:SubAgent 不对外暴露,但追加到历史 if e.Text != "" { assistantContent = append(assistantContent, query.Content{Type: query.ContentThinking, Text: e.Text}) } case *flyto.ToolUseEvent: hasToolUseBlocks = true completedToolUseCount++ pushEvt(&ToolUseEvent{ ID: e.ID, ToolName: e.ToolName, Input: e.Input, }) assistantContent = append(assistantContent, query.Content{ Type: query.ContentToolUse, ID: e.ID, Name: e.ToolName, Input: e.Input, }) rawInput, _ := json.Marshal(e.Input) if rawInput == nil { rawInput = json.RawMessage("{}") } pendingToolCalls = append(pendingToolCalls, tools.ToolCall{ ID: e.ID, Name: e.ToolName, Input: rawInput, }) case *flyto.UsageEvent: stopReason = e.StopReason turnInputTokens += e.InputTokens turnOutputTokens += e.OutputTokens case *flyto.ErrorEvent: sa.mu.Lock() sa.Error = fmt.Errorf("stream error: %w", e.Err) sa.mu.Unlock() pushEvt(&ErrorEvent{Err: sa.Error}) return } } // 更新 token 统计 totalInputTokens += turnInputTokens totalOutputTokens += turnOutputTokens sa.Progress.mu.Lock() sa.Progress.InputTokens += turnInputTokens sa.Progress.OutputTokens += turnOutputTokens sa.Progress.mu.Unlock() // 推送轮次结束事件 pushEvt(&TurnEndEvent{ TurnNumber: turnCount, InputTokens: turnInputTokens, OutputTokens: turnOutputTokens, }) // 将 assistant 回复追加到消息历史(assistantContent 在流循环中已收集完毕) messages = append(messages, query.Message{Role: query.RoleAssistant, Content: assistantContent}) // 如果有工具调用,执行工具 needsToolExecution := (stopReason == "tool_use") || (hasToolUseBlocks && completedToolUseCount > 0) if needsToolExecution { toolCalls := pendingToolCalls if len(toolCalls) > 0 { var toolResultContent []query.Content for _, call := range toolCalls { // 精妙之处(CLEVER): 运行时拦截--canUseTool 决定是否允许执行. // 不允许的工具返回错误消息给模型,模型会自己换工具. // 这比在 API 层裁剪工具列表更好,因为保持了 cache key 一致性. // canUseTool 同时支持工具名白名单 + 路径级别的目录限制(MemoryDirRestrict). if !sa.canUseTool(call.Name, call.Input) { pushEvt(&ToolResultEvent{ ID: call.ID, ToolName: call.Name, Output: fmt.Sprintf("Error: tool %q is not available to this sub-agent. Please use a different approach.", call.Name), IsError: true, }) toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: call.ID, Text: fmt.Sprintf("Error: tool %q is not available to this sub-agent. Please use a different approach.", call.Name), IsError: true, }) continue } // 执行允许的工具 sa.Progress.mu.Lock() sa.Progress.ToolUseCount++ sa.Progress.mu.Unlock() sa.Progress.AddActivity(fmt.Sprintf("调用工具: %s", call.Name)) tool, ok := sa.toolRegistry.Get(call.Name) if !ok { toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: call.ID, Text: fmt.Sprintf("Error: tool %q not found", call.Name), IsError: true, }) continue } // Permission gate: when permissionChecker is set (Worker in a Team), // consult it before executing. The Checker's Handler bubbles the // request to Leader's inbox and blocks for the response. On Allow, // permResp.UpdatedInput (if non-nil) replaces call.Input to support // consumer-layer sanitization (e.g. Bash command rewrite). // // 权限闸: permissionChecker 非 nil 时 (Worker 在 Team 内), 执行前先 // 询问. Checker 的 Handler 把请求冒泡到 Leader inbox 并阻塞等响应. // Allow 时 permResp.UpdatedInput (非 nil) 替换 call.Input 支持消费层 // sanitize (如改写 Bash 命令). execInput := call.Input if sa.permissionChecker != nil { var inpMap map[string]any if len(call.Input) > 0 { _ = json.Unmarshal(call.Input, &inpMap) } permResp, perr := sa.permissionChecker.Check(ctx, &permission.Request{ ToolName: call.Name, ToolID: call.ID, Input: inpMap, }) if perr != nil { denyMsg := fmt.Sprintf("Error: permission check failed for tool %q: %v", call.Name, perr) pushEvt(&ToolResultEvent{ ID: call.ID, ToolName: call.Name, Output: denyMsg, IsError: true, }) toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: call.ID, Text: denyMsg, IsError: true, }) continue } if permResp == nil || permResp.Decision != permission.DecisionAllow { reason := "" if permResp != nil { reason = permResp.Reason } denyMsg := fmt.Sprintf("Permission denied for tool %q: %s", call.Name, reason) pushEvt(&ToolResultEvent{ ID: call.ID, ToolName: call.Name, Output: denyMsg, IsError: true, }) toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: call.ID, Text: denyMsg, IsError: true, }) continue } if permResp.UpdatedInput != nil { // Re-encode the modified input to json.RawMessage; tool.Execute // expects RawMessage. Marshal failure is unexpected (UpdatedInput // is map[string]any from a decoded handler response), but fall back // to original input rather than crash. // // 把修改后的 input 重新编码成 json.RawMessage; tool.Execute 接收 // RawMessage. Marshal 失败非预期 (UpdatedInput 来自已解码的 handler // 响应), 失败则回退原 input 而非 crash. if reEncoded, mErr := json.Marshal(permResp.UpdatedInput); mErr == nil { execInput = reEncoded } } } // Inject sa.Cwd so cwd-aware tools (Bash / FileEdit / // Gitignore / BashToolBackground) execute in the sub-agent's // working directory instead of the parent engine's. This is // how worktree isolation actually takes effect — previously // sa.Cwd was a dead label and tools kept running in parent // cwd. See pkg/tools/workdir.go. // // 注入 sa.Cwd, 让 cwd 感知工具 (Bash / FileEdit / Gitignore // / BashToolBackground) 在子 agent 工作目录执行而非父 engine // cwd. worktree 隔离真正生效的入口 — 之前 sa.Cwd 是死标签, // 工具仍跑在父 cwd. 见 pkg/tools/workdir.go. toolCtx := tools.WithWorkdir(ctx, sa.Cwd) result, err := tool.Execute(toolCtx, execInput, nil) output := "" isError := false if err != nil { output = fmt.Sprintf("Error: %v", err) isError = true } else if result != nil { output = result.Output isError = result.IsError } pushEvt(&ToolResultEvent{ ID: call.ID, ToolName: call.Name, Output: output, IsError: isError, }) toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: call.ID, Text: output, IsError: isError, }) } // 将工具结果追加到消息历史 messages = append(messages, query.Message{Role: query.RoleUser, Content: toolResultContent}) // 回到循环顶部继续调用 API continue } } // 没有工具调用,子 agent 完成 break } // 设置最终状态 sa.mu.Lock() if sa.Status == SubAgentStatusRunning { sa.Status = SubAgentStatusCompleted } sa.EndTime = time.Now() sa.mu.Unlock() // 推送完成事件 pushEvt(&DoneEvent{ TotalInputTokens: totalInputTokens, TotalOutputTokens: totalOutputTokens, TurnCount: turnCount, }) } // FilterSubAgentTools 过滤出子 Agent 可以使用的工具列表. // 移除 Agent 工具(防止递归)和其他管理工具. // // 历史包袱(LEGACY): 保留此函数用于向后兼容.新代码应使用 SubAgentConfig.AllowedTools // 在 SpawnSubAgent 中通过 canUseTool 运行时拦截,而非在 API 层裁剪工具列表. // // 被排除的工具: // - Agent: 防止无限递归 func FilterSubAgentTools(allTools []tools.Tool) []tools.Tool { excludedTools := map[string]bool{ "Agent": true, } filtered := make([]tools.Tool, 0, len(allTools)) for _, t := range allTools { if !excludedTools[t.Name()] { filtered = append(filtered, t) } } return filtered } // ToolsToAllowedMap 将工具列表转换为 AllowedTools map. // 用于将旧版 []tools.Tool 参数转换为新版 map[string]bool 参数. func ToolsToAllowedMap(toolList []tools.Tool) map[string]bool { if len(toolList) == 0 { return nil // nil = 不限制 } m := make(map[string]bool, len(toolList)) for _, t := range toolList { m[t.Name()] = true } return m } // --- SubAgentEvent 事件包装 --- // SubAgentEvent 是子 Agent 产生的事件的包装. // 在子 Agent 的原始事件基础上增加了子 Agent 的 ID 信息, // 使父 Agent / 消费层能够区分来自不同子 Agent 的事件. type SubAgentEvent struct { // SubAgentID 是产生此事件的子 Agent 的 ID SubAgentID string // Inner 是原始事件 Inner Event } func (e *SubAgentEvent) EventType() string { return "subagent_" + e.Inner.EventType() } // SubAgentStartEvent 是子 agent 生命周期开始事件, 由 sa.runLoop 在首个业务 // 事件之前 emit 到父引擎 Run channel (通过 ctx 注入的 EventEmitter). 让父 // agent / SDK 消费者 / SSE / 审计 sink 知道子 agent 何时开工以及领了什么 // 任务. Description/Cwd/Model 快照来自 SubAgent.Description/Cwd/Model 字段 // (SpawnSubAgent 构造期 populate). // // SubAgentStartEvent is the sub-agent lifecycle start event emitted by // sa.runLoop before the first business event, forwarded to the parent // engine's Run channel via the ctx-injected EventEmitter. Lets parent // agent / SDK consumers / SSE / audit sinks know when a sub-agent began // and what task it took. Description/Cwd/Model are snapshots of the // SubAgent.* fields populated at SpawnSubAgent time. type SubAgentStartEvent struct { // SubAgentID 是子 agent 的唯一标识符 (nextSubAgentID 生成). SubAgentID string // Description 是子 agent 的任务描述 (来自 SubAgentConfig.Description). Description string // Cwd 是子 agent 的工作目录 (worktree 模式下为分支路径, 否则继承父 engine cwd). Cwd string // Model 是子 agent 使用的模型 (空串表示继承父 agent). Model string // StartTime 是子 agent 启动的 wall-clock 时间 (sa.StartTime 同步填值). StartTime time.Time } // EventType 实现 Event 接口. 返回 "subagent_start". // EventType implements the Event interface. Returns "subagent_start". func (e *SubAgentStartEvent) EventType() string { return "subagent_start" } // SubAgentEndEvent 是子 agent 生命周期结束事件, 由 sa.runLoop 在所有业务 // 事件之后 (包括取消 / 错误 / maxTurns 等各种退出路径) emit 到父引擎 Run // channel. Duration = EndTime - StartTime, Status 反映最终状态 (Completed/ // Failed/Cancelled), Result 是最终文本结果 (截断到 2KB 避免 SSE payload // 膨胀), Error 是错误消息字符串 (用 error.Error() 扁平化, 不序列化原始 // error 类型, 让 SSE JSON 稳定). // // SubAgentEndEvent is the sub-agent lifecycle end event emitted by // sa.runLoop after all business events (through any exit path: cancel / // error / maxTurns). Duration = EndTime - StartTime; Status reflects the // final state (Completed/Failed/Cancelled); Result is the final text // output (truncated to 2KB to keep SSE payload sizes sane); Error is the // flattened error message (via error.Error()) — the underlying error type // is not serialized so JSON stays stable across versions. type SubAgentEndEvent struct { // SubAgentID 是子 agent 的唯一标识符, 与对应 SubAgentStartEvent 配对. SubAgentID string // Duration 是子 agent 总运行时长 (EndTime - StartTime). Duration time.Duration // Status 是子 agent 退出时的最终状态 (Completed / Failed / Cancelled). Status SubAgentStatus // Result 是子 agent 的最终文本结果, 截断到 subAgentResultMaxBytes. Result string // Error 是子 agent 错误消息 (error.Error()); nil 错误为空串. Error string } // EventType 实现 Event 接口. 返回 "subagent_end". // EventType implements the Event interface. Returns "subagent_end". func (e *SubAgentEndEvent) EventType() string { return "subagent_end" } // subAgentResultMaxBytes 是 SubAgentEndEvent.Result 截断上限, 避免子 agent // 产出巨量文本时把 SSE payload 撑爆 (SSE 单事件约束通常 <64KB, 2KB 留足余地). // 超出会附加 "..." 后缀提示被截断. // // subAgentResultMaxBytes is the truncation ceiling for SubAgentEndEvent. // Result, guarding against huge sub-agent outputs bloating SSE payloads // (SSE single-event limits are typically <64KB; 2KB leaves ample headroom). // Over-limit results are suffixed with "..." to indicate truncation. const subAgentResultMaxBytes = 2048 // truncateSubAgentResult 按字节截断 s 到 subAgentResultMaxBytes, 超出则加 // "..." 后缀. truncateSubAgentResult byte-truncates s to // subAgentResultMaxBytes, appending "..." when it was over the limit. func truncateSubAgentResult(s string) string { if len(s) <= subAgentResultMaxBytes { return s } return s[:subAgentResultMaxBytes] + "..." }