// Package wire 提供独立于 provider 身份的 SSE 协议解析器. // // anthropic.go - Anthropic Messages API SSE 格式解析器. // // 输入:io.ReadCloser(HTTP 响应体) // 输出:<-chan flyto.Event(直接产出公共事件,无中间类型) // // 升华改进(ELEVATED): 早期方案将 SSE 解析嵌在 internal/api/client.go 中, // 和 HTTP transport 混杂,也产出 Anthropic 专有的 StreamEvent 中间类型. // 现在解析器独立为本文件,client.go 仅做 HTTP transport. // 好处: // 1. 可独立测试(传入任意 io.Reader 即可,无需 mock HTTP) // 2. 多 provider 共享同一个 Anthropic 解析器(MiniMax Anthropic-compat 端点也用此) // 3. flyto.Event 直接产出,provider 层无需二次转换 // // 关键修复(BUGFIX): cache token 字段只从 message_start 读取. // Anthropic 文档确认:message_delta 中的 usage 也含 cache 相关字段, // 但那是累计值,与 message_start 中的值相加会导致重复计数. // 参考:https://github.com/langchain-ai/langchainjs/issues/10249 package wire import ( "bufio" "context" "encoding/json" "fmt" "io" "strings" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // ParseAnthropicStream 解析 Anthropic Messages API 的 SSE 流. // // 调用方负责在不需要时 cancel ctx 以停止解析. // channel 关闭表示解析结束;最后一个事件可能是 *flyto.ErrorEvent(出错时) // 或 *flyto.UsageEvent(正常结束时). // // 精妙之处(CLEVER): 接受 io.ReadCloser 而非 *http.Response-- // 解析器只关心字节流,不关心 HTTP 细节. // 这使得单元测试可以直接传入 strings.NewReader 而无需 mock HTTP 客户端. func ParseAnthropicStream(ctx context.Context, body io.ReadCloser) <-chan flyto.Event { ch := make(chan flyto.Event, 64) go func() { defer close(ch) defer body.Close() parseAnthropicSSE(ctx, body, ch) }() return ch } // --- Anthropic SSE JSON 结构(仅用于本文件内部解析)--- type antMsgStart struct { Message struct { Usage antUsage `json:"usage"` } `json:"message"` } type antUsage struct { InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` CacheReadTokens int `json:"cache_read_input_tokens"` CacheWriteTokens int `json:"cache_creation_input_tokens"` } type antBlockStart struct { Index int `json:"index"` ContentBlock struct { Type string `json:"type"` // "text" / "thinking" / "tool_use" ID string `json:"id"` Name string `json:"name"` } `json:"content_block"` } type antBlockDelta struct { Index int `json:"index"` Delta struct { Type string `json:"type"` // "text_delta" / "thinking_delta" / "input_json_delta" Text string `json:"text"` Thinking string `json:"thinking"` PartialJSON string `json:"partial_json"` } `json:"delta"` } type antBlockStop struct { Index int `json:"index"` } type antMsgDelta struct { Delta struct { StopReason string `json:"stop_reason"` } `json:"delta"` // 精妙之处(CLEVER): message_delta 的 usage 只取 output_tokens-- // cache 相关字段(cache_read/cache_creation)不从这里读, // 只从 message_start 读取,否则会重复计数. Usage struct { OutputTokens int `json:"output_tokens"` } `json:"usage"` } // blockState 追踪一个 content block 的累积状态. type blockState struct { blockType string // "text" / "thinking" / "tool_use" id string // tool_use block 的 ID name string // tool_use block 的工具名 text string // text / thinking 累积 partialJSON string // tool_use input JSON 累积 } // parseAnthropicSSE 是核心解析循环. func parseAnthropicSSE(ctx context.Context, body io.Reader, ch chan<- flyto.Event) { // 历史包袱(LEGACY): 1MB Scanner buffer-- // tool_use 的 input_json_delta 可能一次推送几百 KB, // bufio.Scanner 默认 64KB 会静默截断. // 未来:考虑 bufio.Reader.ReadBytes('\n') 以彻底去除上限. scanner := bufio.NewScanner(body) scanner.Buffer(make([]byte, 0, 1<<20), 1<<20) blocks := make(map[int]*blockState) // 仅从 message_start 采集的 cache token 计数 var cacheReadTokens, cacheCreationTokens int // 从 message_start 采集的 input tokens(不包含 cache 的普通 input) var inputTokens int // 从 message_delta 累积的 output tokens var outputTokens int var stopReason string var eventType string var dataLines []string send := func(evt flyto.Event) { select { case ch <- evt: case <-ctx.Done(): } } for scanner.Scan() { select { case <-ctx.Done(): return default: } line := scanner.Text() if line == "" { // 空行 = 事件分隔符 if eventType != "" && len(dataLines) > 0 { data := strings.Join(dataLines, "\n") handleAnthropicEvent(ctx, eventType, data, ch, blocks, &inputTokens, &outputTokens, &cacheReadTokens, &cacheCreationTokens, &stopReason, send) } eventType = "" dataLines = nil continue } if strings.HasPrefix(line, "event: ") { eventType = strings.TrimPrefix(line, "event: ") } else if strings.HasPrefix(line, "data: ") { dataLines = append(dataLines, strings.TrimPrefix(line, "data: ")) } else if line == "data:" { dataLines = append(dataLines, "") } // 注释行(": ...")和 retry: 行忽略 } // 处理末尾未终止的事件 if eventType != "" && len(dataLines) > 0 { data := strings.Join(dataLines, "\n") handleAnthropicEvent(ctx, eventType, data, ch, blocks, &inputTokens, &outputTokens, &cacheReadTokens, &cacheCreationTokens, &stopReason, send) } if err := scanner.Err(); err != nil && ctx.Err() == nil { send(&flyto.ErrorEvent{ Err: fmt.Errorf("anthropic: sse scan error: %w", err), Code: "stream_error", Retryable: true, }) } } // handleAnthropicEvent 处理单个 SSE 事件,转换为 flyto.Event. func handleAnthropicEvent( ctx context.Context, eventType, data string, ch chan<- flyto.Event, blocks map[int]*blockState, inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens *int, stopReason *string, send func(flyto.Event), ) { switch eventType { case "message_start": var msg antMsgStart if err := json.Unmarshal([]byte(data), &msg); err != nil { send(&flyto.ErrorEvent{Err: fmt.Errorf("anthropic: parse message_start: %w", err), Code: "parse_error"}) return } // 精妙之处(CLEVER): 只在 message_start 读 cache token 字段-- // message_delta 的 usage 也有这些字段但是累计值,双读会重复计数. // 研究文档确认(Anthropic 官方 + LangChain issue #10249). *inputTokens = msg.Message.Usage.InputTokens *cacheReadTokens = msg.Message.Usage.CacheReadTokens *cacheCreationTokens = msg.Message.Usage.CacheWriteTokens case "content_block_start": var block antBlockStart if err := json.Unmarshal([]byte(data), &block); err != nil { send(&flyto.ErrorEvent{Err: fmt.Errorf("anthropic: parse content_block_start: %w", err), Code: "parse_error"}) return } blocks[block.Index] = &blockState{ blockType: block.ContentBlock.Type, id: block.ContentBlock.ID, name: block.ContentBlock.Name, } case "content_block_delta": var delta antBlockDelta if err := json.Unmarshal([]byte(data), &delta); err != nil { send(&flyto.ErrorEvent{Err: fmt.Errorf("anthropic: parse content_block_delta: %w", err), Code: "parse_error"}) return } b, ok := blocks[delta.Index] if !ok { return } switch delta.Delta.Type { case "text_delta": b.text += delta.Delta.Text if delta.Delta.Text != "" { send(&flyto.TextDeltaEvent{Text: delta.Delta.Text}) } case "thinking_delta": b.text += delta.Delta.Thinking if delta.Delta.Thinking != "" { send(&flyto.ThinkingDeltaEvent{Text: delta.Delta.Thinking}) } case "input_json_delta": b.partialJSON += delta.Delta.PartialJSON } case "content_block_stop": var stop antBlockStop if err := json.Unmarshal([]byte(data), &stop); err != nil { send(&flyto.ErrorEvent{Err: fmt.Errorf("anthropic: parse content_block_stop: %w", err), Code: "parse_error"}) return } b, ok := blocks[stop.Index] if !ok { return } switch b.blockType { case "text": if b.text != "" { send(&flyto.TextEvent{Text: b.text}) } case "thinking": if b.text != "" { send(&flyto.ThinkingEvent{Text: b.text}) } case "tool_use": // 精妙之处(CLEVER): json.Valid 预检工具输入-- // 流中断或模型缺陷可能导致残缺 JSON. // 预检失败时仍发出 ToolUseEvent(空 input),引擎层可检测并生成有意义的错误. // 替代方案:<跳过工具调用或直接 panic> - 否决:静默失败比有意义的错误更难调试. var toolInput map[string]any if b.partialJSON != "" && json.Valid([]byte(b.partialJSON)) { _ = json.Unmarshal([]byte(b.partialJSON), &toolInput) } if toolInput == nil { toolInput = make(map[string]any) } send(&flyto.ToolUseEvent{ ID: b.id, ToolName: b.name, Input: toolInput, }) } delete(blocks, stop.Index) // 释放已完成的 block 状态 case "message_delta": var md antMsgDelta if err := json.Unmarshal([]byte(data), &md); err != nil { send(&flyto.ErrorEvent{Err: fmt.Errorf("anthropic: parse message_delta: %w", err), Code: "parse_error"}) return } *stopReason = md.Delta.StopReason // 只取 output_tokens,不取 cache 字段(见 message_start 注释) *outputTokens += md.Usage.OutputTokens case "message_stop": send(&flyto.UsageEvent{ InputTokens: *inputTokens, OutputTokens: *outputTokens, CacheReadTokens: *cacheReadTokens, CacheCreationTokens: *cacheCreationTokens, StopReason: *stopReason, }) case "ping": // 心跳,忽略 case "error": send(&flyto.ErrorEvent{ Err: fmt.Errorf("anthropic: api error: %s", data), Code: "api_error", Retryable: true, }) } }