package engine // norm_tool_result_pairing.go -- 完整的 tool_use / tool_result 配对修复. // // 升华改进(ELEVATED): 从早期方案 OrphanToolResultRemover 的单一清理升级为 4 种 case 的完整修复. // 早期方案只处理"孤立 tool_result"(case 2),但生产中还会遇到: // - tool_use 没有 tool_result(会话中断,压缩截断)→ API 400 // - 重复 tool_use ID(压缩合并,消息回放)→ API 行为未定义 // - 重复 tool_result ID(网络重传,SDK bug)→ 浪费 token // // 所有修复都通过 Observer 记录,严格模式下 panic. // 替代方案:只处理 case 2(早期方案做法,其他 3 种 case 靠祈祷). // // 4 种 case: // 1. tool_use 无 tool_result → 注入合成 tool_result // 2. tool_result 无 tool_use → 剥离(升级自 OrphanToolResultRemover) // 3. 重复 tool_use ID → 去重(保留第一个) // 4. 重复 tool_result ID → 去重(保留第一个) import ( "fmt" "strings" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" ) // syntheticToolResultText 合成 tool_result 的文案. // 精妙之处(CLEVER): 措辞经过仔细设计-- // 明确告诉模型"工具未执行"(防止模型假设工具已执行并基于此推理), // 暗示"会话中断"(给模型足够上下文做合理决策). const syntheticToolResultText = "[Tool result not available - session may have been interrupted. The tool was not executed.]" // ToolResultPairingNormalizer 完整的 tool_use / tool_result 配对修复器. // // Priority: 8(在 OrphanToolResultRemover(10) 之前执行). // 精妙之处(CLEVER): 必须在 OrphanToolResultRemover 之前-- // 本步骤先注入合成 tool_result(case 1),然后 OrphanToolResultRemover // 不会误删它们(因为此时已有配对的 tool_use). // 如果顺序反过来,OrphanToolResultRemover 会先跑, // 本步骤注入的合成 tool_result 不会被二次检查. type ToolResultPairingNormalizer struct { Observer EventObserver // 可观测性接口(nil 安全) StrictMode *StrictMode // 严格模式(nil 安全) } func (n *ToolResultPairingNormalizer) Name() string { return "tool_result_pairing" } func (n *ToolResultPairingNormalizer) Priority() int { return 8 } // Normalize 执行 4 种配对修复. func (n *ToolResultPairingNormalizer) Normalize(messages []query.Message) []query.Message { if len(messages) == 0 { return messages } // --- 阶段 1:收集所有 tool_use 和 tool_result 的 ID --- // tool_use ID → 是否已见过(用于去重) toolUseIDs := make(map[string]bool) // tool_result 引用的 tool_use ID → 是否已见过(用于去重) toolResultIDs := make(map[string]bool) // 记录所有修复操作 var repairs []string repaired := false // --- 阶段 2:第一遍扫描--去重 tool_use(case 3)和去重 tool_result(case 4)--- result := make([]query.Message, 0, len(messages)) for _, msg := range messages { var filteredContent []query.Content for _, c := range msg.Content { switch c.Type { case query.ContentToolUse: if c.ID == "" { // 无 ID 的 tool_use,保留(不做处理) filteredContent = append(filteredContent, c) continue } if toolUseIDs[c.ID] { // case 3: 重复 tool_use ID → 去重 // 埋点说明:重复 tool_use 是压缩合并或消息回放导致的, // API 对重复 ID 的行为未定义,必须在此去重. repairs = append(repairs, fmt.Sprintf("duplicate_tool_use:%s", c.ID)) repaired = true continue } toolUseIDs[c.ID] = true filteredContent = append(filteredContent, c) case query.ContentToolResult: if c.ToolUseID == "" { // 无 ToolUseID 的 tool_result,保留(OrphanToolResultRemover 会处理) filteredContent = append(filteredContent, c) continue } if toolResultIDs[c.ToolUseID] { // case 4: 重复 tool_result ID → 去重 // 埋点说明:重复 tool_result 通常是网络重传或 SDK 重试导致的, // 保留第一个,后续的浪费 token 且可能混淆模型. repairs = append(repairs, fmt.Sprintf("duplicate_tool_result:%s", c.ToolUseID)) repaired = true continue } toolResultIDs[c.ToolUseID] = true filteredContent = append(filteredContent, c) default: filteredContent = append(filteredContent, c) } } // 保留非空消息 if len(filteredContent) > 0 { result = append(result, query.Message{ Role: msg.Role, Content: filteredContent, Time: msg.Time, Metadata: msg.Metadata, }) } else if len(msg.Content) > 0 { // 整条消息的内容都被去重掉了 repairs = append(repairs, "removed_empty_message_after_dedup") repaired = true } } // --- 阶段 3:找缺失的 tool_result(case 1)和孤立的 tool_result(case 2)--- // // 精妙之处(CLEVER): 阶段 2 已在去重过程中积累了 toolUseIDs 和 toolResultIDs, // 两者等价于"去重后的全量 ID 集合",直接复用,无需再遍历 result 一次. // 原方案:重新扫描 result 构建 allToolUseIDs/allToolResultIDs(冗余扫描,已删除). // case 1: tool_use 无 tool_result → 注入合成 tool_result // 埋点说明:缺失的 tool_result 会导致 API 400 错误. // 在会话中断,压缩截断,异常退出时都可能出现. // 注入合成 tool_result 让 API 请求合法,同时告知模型工具未执行. var missingToolUseIDs []string for id := range toolUseIDs { if !toolResultIDs[id] { missingToolUseIDs = append(missingToolUseIDs, id) } } if len(missingToolUseIDs) > 0 { repaired = true // 为每个缺失的 tool_result 构建合成 content block var syntheticResults []query.Content for _, id := range missingToolUseIDs { repairs = append(repairs, fmt.Sprintf("synthetic_tool_result:%s", id)) syntheticResults = append(syntheticResults, query.Content{ Type: query.ContentToolResult, ToolUseID: id, Text: syntheticToolResultText, IsError: true, }) } // 合成 tool_result 追加到消息列表末尾(作为 user 消息) // 精妙之处(CLEVER): 放在末尾而不是尝试找"正确位置"-- // API 只要求 tool_result 在某个 user 消息中且引用合法的 tool_use ID, // 不要求 tool_result 紧跟 tool_use.放末尾最安全,最简单. result = append(result, query.Message{ Role: query.RoleUser, Content: syntheticResults, Metadata: map[string]any{ "synthetic": true, "reason": "tool_result_pairing_repair", }, }) } // case 2: tool_result 无 tool_use → 剥离 // 这个 case 由后续的 OrphanToolResultRemover(Priority 10)处理. // 本步骤不重复实现,保持单一职责. // 但我们统计一下,用于诊断. for id := range toolResultIDs { if !toolUseIDs[id] { repairs = append(repairs, fmt.Sprintf("orphan_tool_result:%s(delegated)", id)) repaired = true } } // --- 阶段 4:记录修复并检查严格模式 --- if repaired { obs := n.observer() diagnostic := buildDiagnosticDetail(messages) // 埋点说明:这是最关键的可观测性埋点之一. // tool_result 配对错误是生产中最常见的 API 400 根因, // 每次修复都必须记录,用于事后分析根因(是压缩导致的?是网络重传?是 SDK bug?) obs.Event("tool_result_pairing_repaired", map[string]any{ "repairs": repairs, "repair_count": len(repairs), "message_count_before": len(messages), "message_count_after": len(result), "diagnostic": diagnostic, }) // 严格模式检查 if n.StrictMode != nil { n.StrictMode.CheckToolResultPairing(obs, diagnostic) } } return result } // observer 返回安全的 observer(nil 时返回 NoopObserver). func (n *ToolResultPairingNormalizer) observer() EventObserver { if n.Observer != nil { return n.Observer } return &NoopObserver{} } // buildDiagnosticDetail 构建消息序列的诊断快照. // // 精妙之处(CLEVER): 不序列化完整消息内容(可能包含敏感数据), // 只提取结构信息(角色,content 类型,tool_use/tool_result ID). // 这样诊断信息可以安全地发到外部监控系统,不会泄露用户数据. // // 输出示例: // // [0] user(text) // [1] assistant(tool_use=[tu_001,tu_002]) // [2] user(tool_result=[tu_001]) ← tu_002 missing func buildDiagnosticDetail(messages []query.Message) string { var sb strings.Builder for i, msg := range messages { if i > 0 { sb.WriteString("; ") } sb.WriteString(fmt.Sprintf("[%d] %s(", i, msg.Role)) var parts []string var toolUseIDs []string var toolResultIDs []string hasText := false for _, c := range msg.Content { switch c.Type { case query.ContentToolUse: if c.ID != "" { toolUseIDs = append(toolUseIDs, c.ID) } case query.ContentToolResult: if c.ToolUseID != "" { toolResultIDs = append(toolResultIDs, c.ToolUseID) } case query.ContentText: hasText = true case query.ContentThinking: parts = append(parts, "thinking") case query.ContentImage: parts = append(parts, "image") } } if hasText { parts = append(parts, "text") } if len(toolUseIDs) > 0 { parts = append(parts, fmt.Sprintf("tool_use=[%s]", strings.Join(toolUseIDs, ","))) } if len(toolResultIDs) > 0 { parts = append(parts, fmt.Sprintf("tool_result=[%s]", strings.Join(toolResultIDs, ","))) } sb.WriteString(strings.Join(parts, ",")) sb.WriteString(")") } return sb.String() }