package tools // 工具编排器 -- 管理工具的并发执行和调度. // // 对应原项目中同名模块的功能. // 原项目用 partitionToolCalls() 将工具调用分组,然后用 Promise.all 并发执行. // Go 版本用 goroutine + errgroup 实现,更自然. // // 核心算法不变: // - 连续的 ConcurrencySafe 工具分为一组,并行执行 // - 非 ConcurrencySafe 工具单独执行(串行) // - 同一组内最大并发数由 MaxConcurrency 控制 import ( "context" "encoding/json" "fmt" "strings" "sync" "unicode/utf8" ) // ResultProcessor 是大结果处理的接口. // Orchestrator 通过此接口处理工具输出,避免直接依赖 engine 包. type ResultProcessor interface { ProcessResult(toolUseID, toolName, output string) (processedOutput string, storedPath string) } // Orchestrator 负责工具调用的并发调度. type Orchestrator struct { registry *Registry maxConcurrency int resultProcessor ResultProcessor // 大结果处理器(可选) } // NewOrchestrator 创建一个工具编排器. func NewOrchestrator(registry *Registry, maxConcurrency int) *Orchestrator { if maxConcurrency <= 0 { maxConcurrency = 10 // 对应原项目的默认值 } return &Orchestrator{ registry: registry, maxConcurrency: maxConcurrency, } } // NewOrchestratorWithResultProcessor 创建一个带大结果处理器的工具编排器. // processor 为 nil 时退化为无处理行为,向后兼容. func NewOrchestratorWithResultProcessor(registry *Registry, maxConcurrency int, processor ResultProcessor) *Orchestrator { o := NewOrchestrator(registry, maxConcurrency) o.resultProcessor = processor return o } // ToolCall 表示模型发起的一次工具调用请求. type ToolCall struct { ID string // API 生成的工具调用 ID Name string // 工具名称 Input json.RawMessage // 工具输入参数(JSON) } // ToolCallResult 表示一次工具调用的结果. // // Truncated / StoredPath surface the large-result handling decision so // downstream consumers (engine → flyto.ToolResultEvent / OperationEntry / // SSE bridge → UI / audit sink) can honor the "click to open full content" // affordance and keep audit logs pointing at the authoritative artifact. // When the orchestrator's pluggable ResultProcessor rewrites Output to a // short summary, these fields are the only breadcrumb back to the full // result. // // Truncated / StoredPath 把大结果处理决策透传给下游消费者 (engine → // flyto.ToolResultEvent / OperationEntry / SSE bridge → UI / 审计 sink), // 让消费者能兑现 "点击查看完整内容" 入口并让审计日志指向权威产物. 当 // orchestrator 的可插拔 ResultProcessor 把 Output 重写成短摘要时, 这两个 // 字段是回溯完整结果的唯一线索. type ToolCallResult struct { ID string // 对应的工具调用 ID Name string // 工具名称 Output string // 工具输出(可能被截断, 见 Truncated) IsError bool // 是否出错 // Truncated marks that Output is a short summary and the full result // has been persisted to StoredPath. // // Truncated 标记 Output 是短摘要, 完整结果已落盘到 StoredPath. Truncated bool // StoredPath is where the ResultProcessor persisted the full output. // Empty when Truncated is false. Path shape is consumer-defined // (local path / sandbox path / object-storage key). // // SECURITY: path may carry sandbox-internal structure. Downstream // observers / SSE bridges treat it as caller-specific, untrusted data // (e.g. do not log to shared destinations without review). Same // constraint applies in flyto.ToolResultEvent.StoredPath and // OperationEntry.StoredPath; all three share one trust boundary. // // StoredPath 是 ResultProcessor 落盘完整输出的位置. Truncated=false 时 // 为空串. Path shape 由消费层定义 (本地路径 / sandbox 路径 / 对象存储 // key). // // 安全: path 可能带 sandbox 内部结构. 下游 observer / SSE bridge 当作 // 调用方特定的不可信数据对待 (未审查前不写入共享目的地). 此约束同样 // 适用于 flyto.ToolResultEvent.StoredPath 和 OperationEntry.StoredPath, // 三者共享同一条信任边界. StoredPath string UndoInfo *UndoInfo // 撤销信息(由 Reversible 工具生成) // Data is the structured payload that the tool's Result.Data carries // through to the engine. Primary consumer (2026-04): engine wraps an // *tools/builtin.ImageResult into an array-form tool_result so vision // models see the real image bytes (see engine.go tool_result // construction). Future non-image structured payloads (e.g. document, // chart) can plug in without another field. // // Orchestrator preserves the reference verbatim -- no copy, no // serialization. Consumers MUST type-assert (`if img, ok := res.Data.(*ImageResult); ok`) // and handle the nil case. // // Data 是工具 Result.Data 透传到引擎的结构化载荷. 主消费者 (2026-04): // 引擎把 *tools/builtin.ImageResult 包成 array-form tool_result 让 vision // 模型真读到图 (见 engine.go tool_result 构造点). 未来非图结构化载荷 // (文档 / 图表 / 结构化表格) 可直接沿用, 不用再加字段. // // Orchestrator 原样透传引用 -- 不拷贝不序列化. 消费方必须 type-assert // (`if img, ok := res.Data.(*ImageResult); ok`) 并处理 nil 情况. Data any } // ExecuteBatch 执行一批工具调用. // 自动根据工具的并发安全性分组,安全的并行执行,不安全的串行执行. // // 对应原项目中 runTools() 异步生成器. // 原项目用 yield 逐个返回结果,Go 版本通过 results channel 推送. func (o *Orchestrator) ExecuteBatch( ctx context.Context, calls []ToolCall, results chan<- ToolCallResult, ) { // 第一步:将工具调用按并发安全性分批 // 对应原项目的 partitionToolCalls() batches := o.partition(calls) // 第二步:逐批执行 for _, batch := range batches { if len(batch) == 0 { continue } if len(batch) == 1 || !batch[0].concurrent { // 单个工具或不可并发 -- 串行执行 for _, item := range batch { // 升华改进(ELEVATED): 串行执行也检查 context 取消-- // 在非编程场景(如流水线调度,审批链),一旦整体任务被取消, // 后续步骤应立即停止而不是继续执行再丢弃结果. // 替代方案:<原方案不检查 ctx,所有串行工具逐个执行到底> select { case <-ctx.Done(): return default: } result := o.executeSingle(ctx, item.call) results <- result } } else { // 可并发工具 -- 并行执行(受 maxConcurrency 限制) o.executeConcurrent(ctx, batch, results) } } } // batchItem 是分批后的工具调用条目. type batchItem struct { call ToolCall concurrent bool // 是否可并发执行 } // partition 将工具调用按并发安全性分批. // 连续的并发安全工具归为一组,非安全工具单独一组. // // 示例: // // 输入: [Glob, Grep, FileEdit, Glob, Grep] // 输出: [[Glob, Grep](并发), [FileEdit](串行), [Glob, Grep](并发)] func (o *Orchestrator) partition(calls []ToolCall) [][]batchItem { var batches [][]batchItem var currentBatch []batchItem lastWasConcurrent := false for _, call := range calls { tool, ok := o.registry.Get(call.Name) if !ok { // 未知工具,单独串行执行(会在 executeSingle 中报错) if len(currentBatch) > 0 { batches = append(batches, currentBatch) currentBatch = nil } batches = append(batches, []batchItem{{call: call, concurrent: false}}) lastWasConcurrent = false continue } meta := GetMetadata(tool) isConcurrent := meta.ConcurrencySafe if isConcurrent { if !lastWasConcurrent && len(currentBatch) > 0 { batches = append(batches, currentBatch) currentBatch = nil } currentBatch = append(currentBatch, batchItem{call: call, concurrent: true}) lastWasConcurrent = true } else { if len(currentBatch) > 0 { batches = append(batches, currentBatch) currentBatch = nil } batches = append(batches, []batchItem{{call: call, concurrent: false}}) lastWasConcurrent = false } } if len(currentBatch) > 0 { batches = append(batches, currentBatch) } return batches } // executeSingle 执行单个工具调用. // 如果配置了 resultProcessor,工具输出会经过大结果处理. // // 防御层: // // a) 工具不存在 → 友好提示 + 可用工具列表 // b) 工具输入 JSON 无效 → 提供该工具的 InputSchema 作为参考 // c) 工具输出含非 UTF-8 字节 → 替换为 '?' 并追加警告 func (o *Orchestrator) executeSingle(ctx context.Context, call ToolCall) ToolCallResult { tool, ok := o.registry.Get(call.Name) if !ok { // 防御点 a: 工具不存在时,提供可用工具列表,帮助模型自我纠错. // // 精妙之处(CLEVER): 返回可用工具列表而非仅报告错误-- // 模型在下一轮看到这个响应时,可以选择正确的工具名,无需人工干预. // 对 Agent 来说,这是"自愈型错误消息"而非"终止型错误消息". // 替代方案:仅返回 "tool not found: "(原方案)--模型不知道有什么工具可用, // 往往会重试同一个错误名或要求用户帮忙. availableNames := o.registry.Names() var friendlyMsg string if len(availableNames) > 0 { friendlyMsg = fmt.Sprintf( "tool %q not found. Available tools: %s", call.Name, strings.Join(availableNames, ", "), ) } else { friendlyMsg = fmt.Sprintf("tool %q not found (no tools registered)", call.Name) } return ToolCallResult{ ID: call.ID, Name: call.Name, Output: "error: " + friendlyMsg, IsError: true, } } // 防御点 b: 工具输入 JSON 无效时,提供该工具的 InputSchema 帮助模型修正. // // 升华改进(ELEVATED): 注入 InputSchema 作为修复建议-- // 模型在看到 schema 后通常能直接生成符合要求的 JSON, // 比让模型盲猜"正确的参数格式是什么"效率高出数个 RTT. // 跨行业场景:仓储工具 schema 包含条码格式规范,模型报错后能立即参考. // 替代方案:仅返回 JSON 解析错误信息(模型不知如何修正). if len(call.Input) > 0 { var probe map[string]any if err := json.Unmarshal(call.Input, &probe); err != nil { schemaBytes := tool.InputSchema() schemaHint := "" if len(schemaBytes) > 0 { schemaHint = "\nExpected schema: " + string(schemaBytes) } return ToolCallResult{ ID: call.ID, Name: call.Name, Output: fmt.Sprintf("error: invalid input for tool %q: %v%s", call.Name, err, schemaHint), IsError: true, } } } // Agent Tool Safety Protocol 置信度门控: // 对声明了 MinConfidence>0 的工具, 要求 LLM 在 input 中附 _flyto_confidence // (0-100). 低于阈值 → 返回工具级错误让 LLM 重新评估; 放行前剥除保留字段, // 保持工具 InputSchema 契约不被侵入. // // Agent Tool Safety Protocol confidence gate: // For tools declaring MinConfidence>0, require LLM to attach // _flyto_confidence (0-100) in input. Below threshold -> return tool // error letting LLM reconsider; strip reserved field on pass so tool // InputSchema contract stays clean. toolCap := GetCapability(tool) pass, gateMsg, strippedInput := CheckConfidenceGate(toolCap, call.Input) if !pass { return ToolCallResult{ ID: call.ID, Name: call.Name, Output: "error: " + gateMsg, IsError: true, } } result, err := tool.Execute(ctx, strippedInput, nil) if err != nil { return ToolCallResult{ ID: call.ID, Name: call.Name, Output: "error: " + err.Error(), IsError: true, } } // 升华改进(ELEVATED): 工具执行成功后,自动为 Reversible 工具生成 UndoInfo. // 这样编排器收集结果时自然就拿到了撤销信息,不需要外部额外调用. // 替代方案:在 Engine 层单独调用 GenerateUndo(多一次调用,时序更复杂). var undoInfo *UndoInfo if !result.IsError { if rev, ok := tool.(Reversible); ok { undo, undoErr := rev.GenerateUndo(ctx, call.Input, result) if undoErr == nil { undoInfo = undo } } // Result 中如果已经有 UndoInfo(工具自己填的),优先使用 if result.UndoInfo != nil { undoInfo = result.UndoInfo } } // 防御点 c: 工具输出含非 UTF-8 字节时,替换为 '?' 并追加警告. // // 历史包袱(LEGACY): 某些工具(Bash 执行二进制命令,grep 扫描二进制文件)可能输出 // 非 UTF-8 字节序列.早期方案未做检测,直接发给 API 会导致 JSON 序列化失败或 // API 返回 400 错误(JSON 字符串中的非 UTF-8 字节是非法的). // 精妙之处(CLEVER): strings.ToValidUTF8 是 O(n) 单次扫描,对纯 ASCII/UTF-8 输出近乎零开销. // 替代方案:base64 编码整个输出(安全但膨胀 33%,且模型难以理解 base64 内容). output := result.Output if !result.IsError && !utf8.ValidString(output) { output = strings.ToValidUTF8(output, "?") + "\n[warning: output contained non-UTF-8 bytes, replaced with ?]" } // 通过大结果处理器处理输出 var truncated bool var storedPath string if o.resultProcessor != nil && !result.IsError { output, storedPath = o.resultProcessor.ProcessResult(call.ID, call.Name, output) truncated = storedPath != "" } return ToolCallResult{ ID: call.ID, Name: call.Name, Output: output, IsError: result.IsError, Truncated: truncated, StoredPath: storedPath, UndoInfo: undoInfo, Data: result.Data, // path B 入口: 结构化载荷原样透传 } } // executeConcurrent 并行执行一批可并发的工具调用. // 使用信号量控制最大并发数. func (o *Orchestrator) executeConcurrent( ctx context.Context, batch []batchItem, results chan<- ToolCallResult, ) { // 精妙之处(CLEVER): 用 buffered channel 做信号量--比 sync.WaitGroup + 计数器更简洁. // channel 的缓冲区大小就是最大并发数,获取信号量是写入,释放是读出. // 这是 Go 中实现有界并行的惯用模式,比 errgroup.SetLimit 出现得更早. sem := make(chan struct{}, o.maxConcurrency) var wg sync.WaitGroup for _, item := range batch { wg.Add(1) sem <- struct{}{} // 获取信号量 go func(call ToolCall) { defer wg.Done() defer func() { <-sem }() // 释放信号量 defer func() { // 精妙之处(CLEVER): recover 防止单个工具执行 panic 崩溃整个进程. // panic 时补发一条 error 结果,确保调用方 for-range 能收到与发起数相符的结果数, // 不会因少一条结果而产生逻辑缺漏(engine 侧不依赖精确计数,但明确报错更好). if r := recover(); r != nil { results <- ToolCallResult{ ID: call.ID, Name: call.Name, Output: fmt.Sprintf("error: tool executor panic: %v", r), IsError: true, } } }() result := o.executeSingle(ctx, call) results <- result }(item.call) } wg.Wait() }