// API 客户端 -- 封装与 Messages API 的 HTTP 通信(纯 transport 层). // // 这是引擎与 Anthropic Messages API 之间的 HTTP 传输桥梁. // 负责构建 HTTP 请求,鉴权,发送请求,处理非 2xx 错误. // SSE 协议解析已迁移至 internal/wire/anthropic.go. // // 升华改进(ELEVATED): 早期方案将 HTTP transport + SSE 解析混杂在本文件中, // 导致单元测试需要 mock HTTP 客户端.现在 transport 和解析分离: // - client.go: 纯 HTTP transport,职责是"把字节发出去,把响应体拿回来" // - wire/anthropic.go: 纯 SSE 解析,职责是"把字节流变成 flyto.Event" // // 好处:wire 包可以独立测试(传入 strings.NewReader 即可). // // 历史包袱(LEGACY): StreamEvent / StreamEventType / UsageInfo 类型仍保留-- // engine.go / subagent.go / classifier_ai.go 等尚未迁移到 flyto.Event. // 待 Phase 5(engine.go 全面迁移)完成后,这些类型将随之删除. package api import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" "git.flytoex.net/yuanwei/flyto-agent/internal/transport/retry" "git.flytoex.net/yuanwei/flyto-agent/internal/wire" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // L1330 (2026-04-13): maxErrorBodyBytes / maxStreamingBodyBytes 已迁移到 // internal/wire/limits.go 并导出为 wire.MaxErrorBodyBytes / wire.MaxStreamingBodyBytes, // 消除 transport (本包) 和 wire 包两处重复定义的漂移风险.transport 已经依赖 wire // (下方 wire.ParseAnthropicStream), 所以提取方向是 transport → wire, 不破坏包依赖. // DefaultResponseHeaderTimeout is the safety net applied to the internal default // http.Client when no WithResponseHeaderTimeout option is supplied. Provider // packages (anthropic, openai, ...) override this explicitly via their own // Config.Timeout field; this constant only matters for direct transport.NewClient // callers (internal tests, experimentation). // // 精妙之处(CLEVER): We target http.Transport.ResponseHeaderTimeout, NOT // http.Client.Timeout. The latter caps TOTAL request duration including the // streaming body read - which silently kills long SSE responses. The former // only caps "time from request send to first response byte", leaving long // streams alive. Setting http.Client.Timeout=60s on a streaming LLM provider // is a classic way to break prod in a subtle, hard-to-debug manner. // // 反向思维 (2026-04-13): Should this be mutable (var) for global override? // Rejected - encourages process-global tuning which violates Provider 必填 // principle (see memory project_architecture_decisions.md). Per-instance // tuning belongs in provider Config.Timeout. const DefaultResponseHeaderTimeout = 60 * time.Second // newDefaultHTTPClient constructs the internal default *http.Client with a // ResponseHeaderTimeout safety net. Kept as a helper so WithResponseHeaderTimeout // can introspect the Transport type safely. func newDefaultHTTPClient() *http.Client { return &http.Client{ Transport: &http.Transport{ ResponseHeaderTimeout: DefaultResponseHeaderTimeout, }, } } // Client 是 Messages API 的客户端. type Client struct { apiKey string baseURL string httpClient *http.Client // messagePath 是 API 消息路径(如 "/v1/messages"),空字符串时不拼接. // 升华改进(ELEVATED): 早期方案硬编码 "/v1/messages"--不同供应商路径不同 // (Anthropic: /v1/messages, MiniMax: /anthropic/v1/messages 等), // 由调用方通过 WithMessagePath 注入,引擎核心不假设任何特定路径. messagePath string // apiVersion 是 API 版本 header 值(如 "2023-06-01"),空字符串时不发此 header. // 升华改进(ELEVATED): 早期方案硬编码 "anthropic-version: 2023-06-01"-- // 非 Anthropic 供应商不需要此 header,由调用方通过 WithAPIVersion 注入. apiVersion string // bearerAuth 控制鉴权头格式. // false(默认): "x-api-key: "(Anthropic 原生格式) // true : "Authorization: Bearer "(通用 Bearer Token 格式) // // 升华改进(ELEVATED): 引擎不绑定特定供应商-- // 只要端点实现 Messages API 格式,任何鉴权方式均可对接. // 替代方案:<硬编码供应商 URL 判断鉴权方式> - 否决:供应商无限增加,白名单永远落后. bearerAuth bool // classifier 是错误分类器--将原始 HTTP 错误转化为结构化 APIError. // 升华改进(ELEVATED): 可插拔分类器,不同供应商(Anthropic/OpenAI/Bedrock) // 各自实现自己的错误分类逻辑,引擎只依赖 ErrorCategory 枚举. // 替代方案:<原方案硬编码所有供应商的逻辑在一个 500 行 if-else 中> classifier ErrorClassifier // streamGuardConfig 是 SSE 流守卫配置. // 升华改进(ELEVATED): 覆盖 5 种生产边界情况: // 空响应,部分流,空闲挂起,停顿诊断,Scanner 错误. // 替代方案:<原方案不做边界检测,流异常时静默返回空结果> streamGuardConfig *StreamGuardConfig // overflowHandler 处理 max_tokens 溢出修正(可选). overflowHandler *retry.ContextOverflowHandler // retryer 执行带重试的 HTTP 操作. retryer retry.Retryer } // ClientOption 是 Client 的配置选项. type ClientOption func(*Client) // WithClassifier 设置自定义错误分类器. func WithClassifier(c ErrorClassifier) ClientOption { return func(client *Client) { client.classifier = c } } // WithHTTPClient 设置自定义 HTTP 客户端(用于代理,超时等配置). // // 注意: WithHTTPClient 会替换整个 httpClient,包括 Transport.如果同时使用 // WithResponseHeaderTimeout,options 的调用顺序决定最终结果: // - WithHTTPClient 在后: 最终使用消费者的 client, WithResponseHeaderTimeout 被覆盖 // - WithResponseHeaderTimeout 在后: 会尝试修改消费者的 transport (如果是 *http.Transport) // // 推荐: provider.New() 应二选一,不混用.消费者给了 HTTPClient 就完全交出超时控制权. func WithHTTPClient(hc *http.Client) ClientOption { return func(client *Client) { client.httpClient = hc } } // WithResponseHeaderTimeout 覆盖默认 http.Client 的 ResponseHeaderTimeout. // // 这是 "从请求发出到收到响应首字节" 的时间上限,**不影响** SSE 流式响应的后续 body 读取. // LLM provider 的正确超时语义: 捕捉服务端不响应的死等,放行正常的长流式输出. // // 精妙之处(CLEVER): 不要误用 http.Client.Timeout (它会砍死 SSE 流). // 详见 DefaultResponseHeaderTimeout 注释的反向思维段. // // 安全兜底: 如果 httpClient.Transport 不是 *http.Transport (消费者用了自定义 RoundTripper // 或先调用了 WithHTTPClient 替换了整个 client),此 option silent no-op 而不是 panic. // provider.New() 约定二选一使用,此 no-op 只在 transport 包被其他路径滥用时触发. func WithResponseHeaderTimeout(d time.Duration) ClientOption { return func(client *Client) { if t, ok := client.httpClient.Transport.(*http.Transport); ok && t != nil { t.ResponseHeaderTimeout = d } } } // HTTPClient 返回底层 *http.Client,用于测试,introspection 或需要直接发请求的场景. // 升华改进(ELEVATED): 暴露这个 getter 是为了让 provider 包的单元测试能断言超时配置 // 是否正确传递 (通过 .Transport.(*http.Transport).ResponseHeaderTimeout). // 同时也为未来可能的 introspection 场景 (比如 metrics observer 读取 transport 配置) 预留接口. func (c *Client) HTTPClient() *http.Client { return c.httpClient } // WithStreamGuard 设置自定义 SSE 流守卫配置. func WithStreamGuard(cfg *StreamGuardConfig) ClientOption { return func(client *Client) { client.streamGuardConfig = cfg } } // WithBearerAuth 切换为 "Authorization: Bearer " 鉴权格式. // 适用于使用 Bearer Token 的兼容端点. func WithBearerAuth() ClientOption { return func(client *Client) { client.bearerAuth = true } } // WithOverflowHandler 设置 max_tokens 溢出修正器. func WithOverflowHandler(h *retry.ContextOverflowHandler) ClientOption { return func(client *Client) { client.overflowHandler = h client.retryer.OverflowHandler = h } } // WithMessagePath 设置 API 消息路径(如 "/v1/messages"). // 空字符串表示 baseURL 已包含完整路径,不需要拼接. func WithMessagePath(path string) ClientOption { return func(c *Client) { c.messagePath = path } } // WithAPIVersion 设置 API 版本 header 值(如 "2023-06-01"). // 空字符串表示不发送 "anthropic-version" header. func WithAPIVersion(version string) ClientOption { return func(c *Client) { c.apiVersion = version } } // WithRetryPolicy 设置自定义重试策略. // nil 表示不重试(首次失败即返回). func WithRetryPolicy(p retry.RetryPolicy) ClientOption { return func(c *Client) { c.retryer.Policy = p } } // NewClient 创建一个新的 API 客户端. // 默认使用 DefaultClassifier + DefaultHinter,无重试策略. // 调用方必须传入 baseURL(不再默认指向任何供应商). // 供应商特有配置(messagePath,apiVersion,retryPolicy)通过 ClientOption 注入. func NewClient(apiKey, baseURL string, opts ...ClientOption) *Client { c := &Client{ apiKey: apiKey, baseURL: baseURL, httpClient: newDefaultHTTPClient(), // 升华改进(ELEVATED): 默认使用 DefaultClassifier(纯 HTTP 状态码分类)-- // 不假设任何特定供应商.Anthropic provider 通过 WithClassifier 注入 AnthropicClassifier. // 替代方案:<原方案默认 AnthropicClassifier> - 否决:引擎核心不应绑定特定供应商. classifier: &DefaultClassifier{}, streamGuardConfig: DefaultStreamGuardConfig(), } for _, opt := range opts { opt(c) } return c } // --- 请求体类型定义 --- // SystemContentBlock 是系统提示中的内容块. // 支持 prompt caching:通过 CacheControl 标记静态内容为可缓存. type SystemContentBlock struct { Type string `json:"type"` // "text" Text string `json:"text"` CacheControl *CacheControl `json:"cache_control,omitempty"` // 缓存控制标记 } // CacheControl 是缓存控制配置. type CacheControl struct { Type string `json:"type"` // "ephemeral" } // ThinkingConfig 是 extended thinking 配置. type ThinkingConfig struct { Type string `json:"type"` // "enabled" 或 "disabled" BudgetTokens int `json:"budget_tokens,omitempty"` // thinking token 预算(0=不限制) } // ResponseFormat 是结构化输出的格式配置. type ResponseFormat struct { Type string `json:"type"` // "json_schema" JSONSchema json.RawMessage `json:"json_schema,omitempty"` // JSON Schema 定义 } // BetaFeatures 控制要启用的 Beta 功能. // 各功能对应特定的 anthropic-beta header 值. type BetaFeatures struct { // PromptCaching 启用 prompt caching PromptCaching bool // PromptCachingScope 缓存范围 ("global" 或 "org") PromptCachingScope string // ExtendedThinking 启用 extended thinking ExtendedThinking bool // FastMode 启用快速模式 FastMode bool // Effort 努力级别 ("low"/"medium"/"high") Effort string // ContextManagement 启用上下文管理 ContextManagement bool // StructuredOutput 启用结构化输出 StructuredOutput bool // TaskBudgets 启用任务预算 TaskBudgets bool } // betaHeaderValues 返回需要添加的 beta header 值列表. func (b *BetaFeatures) betaHeaderValues() []string { if b == nil { return nil } var values []string if b.PromptCaching { values = append(values, "prompt-caching-2024-07-31") } if b.PromptCachingScope != "" { values = append(values, "prompt-caching-scope-2025-02-19") } if b.ExtendedThinking { values = append(values, "extended-thinking-2025-01-24") } if b.FastMode { values = append(values, "fast-mode-2025-04-01") } if b.Effort != "" { values = append(values, "effort-2025-04-01") } if b.ContextManagement { values = append(values, "context-management-2025-03-01") } if b.StructuredOutput { values = append(values, "structured-output-2025-03-01") } if b.TaskBudgets { values = append(values, "task-budgets-2025-04-01") } return values } // MessageRequest 是 Messages API 的请求体. type MessageRequest struct { Model string `json:"model"` MaxTokens int `json:"max_tokens"` System json.RawMessage `json:"system,omitempty"` // 支持字符串或 SystemContentBlock 数组 Messages []RequestMessage `json:"messages"` Tools []ToolDef `json:"tools,omitempty"` Stream bool `json:"stream"` Thinking *ThinkingConfig `json:"thinking,omitempty"` // extended thinking 配置 // 升华改进(ELEVATED): stop_sequences 是 API 的基础能力,引擎应完整支持. // 不应因为"当前只有分类器用"就砍掉--跨场景谁知道哪个场景需要. // 例如仓储场景可能需要模型生成到 "---订单结束---" 就停. // 分类器 Stage 1 用 stop_sequences=[""] 可以从 64 token 省到 5 token. // 替代方案:不支持,靠 max_tokens 兜底(能用但浪费 token). StopSequences []string `json:"stop_sequences,omitempty"` // Beta 功能配置(不序列化到请求体,通过 header 传递) Beta *BetaFeatures `json:"-"` ResponseFormat *ResponseFormat `json:"response_format,omitempty"` // 结构化输出格式 // Temperature is the per-request sampling temperature; nil omits the // field on the wire so the upstream uses its default. Anthropic accepts // [0, 1]; anthropic provider also enforces extended-thinking mode's // hard constraint of temperature=1.0 before reaching this point. // // Temperature 是本次请求的采样温度; nil 时 wire 上不传, 上游用默认. // Anthropic 接受 [0, 1]; anthropic provider 在 extended thinking 模式 // 下会在到达此处前强制覆盖为 1.0 (服务端硬约束). Temperature *float64 `json:"temperature,omitempty"` // TopP is the per-request nucleus sampling cutoff; nil omits the field. // Anthropic accepts [0, 1]; in thinking mode the API restricts to // [0.95, 1.0] (anthropic provider pre-handles the override). // // TopP 是本次请求的 nucleus 采样阈值; nil 时 wire 上不传. // Anthropic 接受 [0, 1]; thinking 模式下 API 限制 [0.95, 1.0] // (anthropic provider 提前覆盖). TopP *float64 `json:"top_p,omitempty"` } // SetSystemString 设置纯文本系统提示(向后兼容). func (r *MessageRequest) SetSystemString(text string) { if text == "" { r.System = nil return } r.System, _ = json.Marshal(text) } // SetSystemBlocks 设置带缓存控制的系统提示内容块. func (r *MessageRequest) SetSystemBlocks(blocks []SystemContentBlock) { if len(blocks) == 0 { r.System = nil return } r.System, _ = json.Marshal(blocks) } // RequestMessage 是请求中的消息. type RequestMessage struct { Role string `json:"role"` Content json.RawMessage `json:"content"` } // NewTextMessage 创建一个纯文本消息. func NewTextMessage(role, text string) RequestMessage { raw, _ := json.Marshal(text) return RequestMessage{Role: role, Content: raw} } // NewBlockMessage 创建一个包含多个 content block 的消息. func NewBlockMessage(role string, blocks []ContentBlock) RequestMessage { raw, _ := json.Marshal(blocks) return RequestMessage{Role: role, Content: raw} } // ContentBlock 是消息中的内容块(用于构建请求体). type ContentBlock struct { Type string `json:"type"` Text string `json:"text,omitempty"` ID string `json:"id,omitempty"` Name string `json:"name,omitempty"` Input map[string]any `json:"input,omitempty"` ToolUseID string `json:"tool_use_id,omitempty"` Content string `json:"content,omitempty"` IsError bool `json:"is_error,omitempty"` Signature string `json:"signature,omitempty"` // thinking block 的签名(Anthropic API 要求) // Source 携带 type=="image" 块的图片数据, 对齐 Anthropic image block // source spec (base64 内联或 url). 非 image 块留 nil (omitempty). // // Source carries image data for type=="image" blocks, matching // Anthropic's image block source spec (base64 inline or url). Nil // for non-image blocks (omitempty on the wire). Source *ImageSource `json:"source,omitempty"` // ContentItems carries array-form tool_result content (text + image // mixed). MarshalJSON picks between Content (string) and ContentItems // (array) for the "content" wire field. Non-tool_result blocks leave // it nil. // // ContentItems 承载 array 形式的 tool_result 内容 (text + image 混排). // MarshalJSON 在 Content 字符串 / ContentItems 数组间选一个做 "content" // wire 字段. 非 tool_result 块留 nil. ContentItems []ContentBlock `json:"-"` } // ImageSource 是 Anthropic image block 的 source 子对象. // // ImageSource is the source sub-object of an Anthropic image block. type ImageSource struct { Type string `json:"type"` // "base64" 或 "url" MediaType string `json:"media_type,omitempty"` // "image/png" 等, base64 时必填 Data string `json:"data,omitempty"` // base64 编码字节, base64 时必填 URL string `json:"url,omitempty"` // HTTPS URL, url 时必填 } // MarshalJSON overrides the default for tool_use blocks: the Anthropic spec // requires tool_use to always carry an "input" object, even when the tool // takes no arguments. The default map[string]any + omitempty drops nil/empty // maps, producing {"type":"tool_use","id":...,"name":...} with no input key. // Real Anthropic endpoints are lenient (auto-substitute {}), but the MiniMax // Anthropic-compat endpoint rejects it with: // // "invalid params, invalid function arguments json string, tool_call_id: ..." // // Round 1 returns a valid tool_use → Round 2 re-sends with input dropped → // MiniMax 400 → engine wraps as "API 调用在 1 次尝试后失败". All three wire // call sites (pkg/providers/anthropic, pkg/providers/minimax, pkg/engine's // query.Content → api.ContentBlock compaction conversion) hit this, so the // fix lives on the type rather than being duplicated in each caller. // // Non-tool_use blocks fall through to the default alias marshaler -- omitempty // is the correct behavior for text/thinking/tool_result (their optional fields // really are optional on the wire). // // MarshalJSON 覆盖 tool_use 块的默认序列化: Anthropic spec 要求 tool_use 永远 // 包含 "input" 对象, 即使工具无参数. 默认 map[string]any + omitempty 组合会把 // nil/空 map 丢掉, 产出 {"type":"tool_use","id":...,"name":...} 无 input. // 真 Anthropic 端宽容 (自动补 {}), 但 MiniMax Anthropic-compat 端严格拒绝: // // "invalid params, invalid function arguments json string, tool_call_id: ..." // // Round 1 返有效 tool_use → Round 2 重发时 input 被丢 → MiniMax 400 → engine // 包成 "API 调用在 1 次尝试后失败". 三处 wire 调用点 (anthropic / minimax / // engine 的 query.Content → api.ContentBlock 压缩转换) 都中招, 修在类型上而不 // 在每个 caller 里重复. // // 非 tool_use 块 fall through 到默认 alias 序列化 -- omitempty 对 // text/thinking/tool_result 是正确行为 (它们的可选字段在 wire 上真的可选). func (c ContentBlock) MarshalJSON() ([]byte, error) { // tool_result with ContentItems: emit "content":[array] instead of // "content":"string". Lets tool_result carry image blocks per Anthropic // spec (content field accepts string or array). Shared between // anthropic provider and minimax anthropic-compat path. // // tool_result 带 ContentItems: 发 "content":[array], 让 tool_result // 承载 image block (Anthropic spec content 字段接受 string 或 array). // anthropic provider 和 minimax anthropic-compat 共用. if c.Type == "tool_result" && len(c.ContentItems) > 0 { return json.Marshal(struct { Type string `json:"type"` ToolUseID string `json:"tool_use_id"` Content []ContentBlock `json:"content"` IsError bool `json:"is_error,omitempty"` }{ Type: c.Type, ToolUseID: c.ToolUseID, Content: c.ContentItems, IsError: c.IsError, }) } if c.Type != "tool_use" { type alias ContentBlock return json.Marshal(alias(c)) } input := c.Input if input == nil { input = map[string]any{} } return json.Marshal(struct { Type string `json:"type"` ID string `json:"id"` Name string `json:"name"` Input map[string]any `json:"input"` }{ Type: c.Type, ID: c.ID, Name: c.Name, Input: input, }) } // ToolDef 是工具定义(提供给 API 的 JSON Schema). // // CacheControl 是可选的缓存控制标记. // 升华改进(ELEVATED): 工具列表也可以按 cache_control 分段缓存-- // 把稳定工具排在前面,在最后一个稳定工具处加 cache_control, // 不稳定工具排在后面(不带 cache_control),则稳定工具缓存不受不稳定工具影响. // 早期实现 只在系统提示词上加 cache_control,工具列表整体无分段保护. // 替代方案:<只缓存全部工具或不缓存> - 否决:任何工具变化都让全部工具缓存失效, // 大量稳定工具的 token 费用浪费在一个不稳定工具身上. type ToolDef struct { Name string `json:"name"` Description string `json:"description"` InputSchema json.RawMessage `json:"input_schema"` CacheControl *CacheControl `json:"cache_control,omitempty"` } // --- 流式响应事件类型 --- // StreamEvent 是从 SSE 流中解析出的结构化事件. type StreamEvent struct { Type StreamEventType Index int // content block 的索引 Delta string // text / thinking 增量内容 // content_block_start 时携带完整的 block 信息 BlockType string // "text", "thinking", "tool_use" BlockID string // tool_use block 的 ID BlockName string // tool_use block 的工具名称 // tool_use delta 时累积的 JSON 片段 PartialJSON string // message_start / message_delta 时携带的 usage 信息 Usage *UsageInfo // message_delta 时携带的 stop_reason StopReason string } // StreamEventType 是流式事件类型枚举. type StreamEventType string const ( EventMessageStart StreamEventType = "message_start" EventContentBlockStart StreamEventType = "content_block_start" EventContentBlockDelta StreamEventType = "content_block_delta" EventContentBlockStop StreamEventType = "content_block_stop" EventMessageDelta StreamEventType = "message_delta" EventMessageStop StreamEventType = "message_stop" EventPing StreamEventType = "ping" EventError StreamEventType = "error" ) // UsageInfo 是 API 返回的 token 使用统计. type UsageInfo struct { InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` CacheReadTokens int `json:"cache_read_input_tokens"` CacheWriteTokens int `json:"cache_creation_input_tokens"` } // --- 核心方法 --- // CreateMessageStream 发起流式 Messages API 调用,返回 flyto.Event channel. // // 升华改进(ELEVATED): 早期方案返回 <-chan StreamEvent(Anthropic 专有中间类型), // 消费者(provider 层)还需要二次转换为 flyto.Event. // 现在直接返回 flyto.Event,消费者零转换成本,且 StreamEvent 类型从公共 API 消失. // 替代方案:<保留 StreamEvent 减少改动> - 否决:这正是架构不一致的根源. func (c *Client) CreateMessageStream(ctx context.Context, req *MessageRequest) (<-chan flyto.Event, error) { req.Stream = true // 用于在重试成功后传递 event channel var eventCh <-chan flyto.Event rctx := &retry.RetryContext{ Model: req.Model, // QuerySource from ctx so retry-layer diagnostics can attribute // failed retries to their origin. Empty string when the caller // did not inject (retry layer handles that gracefully). // // QuerySource 从 ctx 取, 重试层诊断可把失败归因到来源. 调用方 // 未注入时为空串 (重试层降级处理). QuerySource: retry.QuerySourceFromCtx(ctx), } err := c.retryer.Do(ctx, rctx, func(attempt int, rctx *retry.RetryContext) error { // 如果有 max_tokens 溢出修正,使用修正后的值 if rctx.MaxTokensOverride > 0 { req.MaxTokens = rctx.MaxTokensOverride } ch, err := c.doCreateMessageStreamOnce(ctx, req) if err != nil { return err } eventCh = ch return nil }) if err != nil { return nil, err } return eventCh, nil } // doCreateMessageStreamOnce 执行单次 HTTP 请求,返回 event channel. // 成功后 channel 归调用方管理,失败时方法内部关闭 body. func (c *Client) doCreateMessageStreamOnce(ctx context.Context, req *MessageRequest) (<-chan flyto.Event, error) { body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("api: marshal request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+c.messagePath, bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("api: create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") if c.bearerAuth { httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) } else { httpReq.Header.Set("x-api-key", c.apiKey) } if c.apiVersion != "" { httpReq.Header.Set("anthropic-version", c.apiVersion) } // 添加 Beta headers(根据请求配置) if req.Beta != nil { betaValues := req.Beta.betaHeaderValues() if len(betaValues) > 0 { httpReq.Header.Set("anthropic-beta", strings.Join(betaValues, ",")) } } resp, err := c.httpClient.Do(httpReq) if err != nil { // 精妙之处(CLEVER): 网络错误时 statusCode=0,headers=nil, // 分类器通过 cause 参数检测 SSL/超时/DNS 等具体错误类型. return nil, c.classifier.Classify(0, nil, nil, err) } // 检查 HTTP 状态码--非 200 一律通过分类器生成结构化 APIError. // 升华改进(ELEVATED): 调用者可以用 errors.As(err, &apiErr) 获取结构化信息 // (Category,RetryInfo,TokenGap 等),也可以直接用 err.Error() 获取字符串. // 替代方案:<原方案 fmt.Errorf("api: HTTP %d: %s") 丢失了所有结构化信息> if resp.StatusCode != http.StatusOK { defer resp.Body.Close() errBody, _ := io.ReadAll(io.LimitReader(resp.Body, wire.MaxErrorBodyBytes)) return nil, c.classifier.Classify(resp.StatusCode, resp.Header, errBody, nil) } // 升华改进(ELEVATED): wire.ParseAnthropicStream 负责 SSE 解析, // StreamGuard 叠加可靠性检测(空响应/截断/空闲超时). // client.go 只做 HTTP transport,不再内嵌任何协议解析逻辑. // 精妙之处(CLEVER): io.LimitReader 包裹流式响应体-- // 恶意服务端发送超大响应时,LimitReader 在 100MB 处触发 EOF, // ParseAnthropicStream 视为流结束正常关闭 channel,goroutine 不会泄漏. rawCh := wire.ParseAnthropicStream(ctx, io.NopCloser(io.LimitReader(resp.Body, wire.MaxStreamingBodyBytes))) guard := NewStreamGuard(c.streamGuardConfig) return guard.Watch(ctx, rawCh), nil }