// Package wire 提供内部共享协议适配器. // // openai.go - OpenAI Chat Completions SSE 客户端. // // 多家 provider 使用相同的 OpenAI 流式协议: // - OpenAI 官方 API // - OpenRouter(聚合网关,完全兼容 OpenAI) // - Ollama(本地部署,OpenAI 兼容模式) // - LM Studio(本地部署,OpenAI 兼容) // - MiniMax native API(SSE 格式与 OpenAI 相同,仅端点路径不同) // // 设计: // - 本包仅被 pkg/providers/* 调用,不对外暴露(internal 包规则) // - 接受 flyto.Request 输入,输出 flyto.Event channel // - provider 层只需配置 baseURL / headers / modelTable,不需要重复 SSE 解析逻辑 // // 关键修复(BUGFIX): reasoning/thinking 字段各家不同,存在三种格式: // 1. OpenAI o1/o3: `choices[0].delta.reasoning_content`(字符串) // 2. OpenRouter: `choices[0].delta.reasoning_details`(对象数组,含 type 字段) // 旧版代码用 `reasoning` 字段(非标准),两种格式都无法正确解析. // // 升华改进(ELEVATED): 早期实现 没有统一 OpenAI 兼容层--每个 provider // 各自实现完整的 SSE 解析,存在大量重复代码. // 我们将公共部分提取到此文件,各 provider 保持薄封装(<150行). // 替代方案:<各 provider 自行实现完整 SSE 解析> - 否决:4个 provider 各500行, // 任何 bug 修复都要改4处,维护成本线性增长. package wire import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // L1330 (2026-04-13): maxErrorBodyBytes / maxStreamingBodyBytes 已迁移到 limits.go, // 统一导出为 MaxErrorBodyBytes / MaxStreamingBodyBytes, 消除 wire 和 transport/api 两处重复定义. // defaultResponseHeaderTimeout 是 newDefaultHTTPClient 的 safety net. // Provider 包 (openai/minimax/openrouter/ollama/lmstudio) 通过 WithResponseHeaderTimeout // 覆盖; 此常量只在 provider 不传选项时生效 (内部测试 / 直接调用场景). // // 精妙之处(CLEVER): 我们设置的是 http.Transport.ResponseHeaderTimeout, **不是** // http.Client.Timeout.后者约束"整个请求总时长"包括 SSE 流式 body 读取 - // 对 LLM 流式响应是致命的 (60s 超时会砍死 2-5 分钟的长回复).前者只约束 // "请求发出到收到响应首字节"的时间,流式 body 后续读取不受限. // 详见 internal/transport/client.go DefaultResponseHeaderTimeout 的反向思维段 - // 两处保持等价语义,避免不同 provider 对同一概念的行为漂移. const defaultResponseHeaderTimeout = 60 * time.Second // newDefaultHTTPClient 构造 wire 层默认 *http.Client, 带 ResponseHeaderTimeout 兜底. // 与 internal/transport/client.go 的同名 helper 并列: 两个包各自独立, 不跨包依赖. // 替代方案: <提取共享 helper 到 internal/httputil> - 否决: 只为 3 行 helper 引入 // 新包 + 跨包依赖, Go idiom 允许 "a little copying is better than a little dependency". func newDefaultHTTPClient() *http.Client { return &http.Client{ Transport: &http.Transport{ ResponseHeaderTimeout: defaultResponseHeaderTimeout, }, } } // OpenAICompatClient 是 OpenAI Chat Completions SSE 客户端. // // 支持: // - 流式文本输出(choices[].delta.content) // - 工具调用(choices[].delta.tool_calls) // - Thinking/Reasoning(choices[].delta.reasoning,OpenRouter 及部分 provider) // - mid-stream error 格式(OpenRouter 的错误推送方式) // - usage 字段(最后一个 chunk 中,通过 stream_options.include_usage 开启) type OpenAICompatClient struct { apiKey string baseURL string chatPath string // 默认 "/v1/chat/completions",MiniMax native 不同 httpClient *http.Client extraHeaders map[string]string // 如 OpenRouter 的 HTTP-Referer / X-Title } // OpenAICompatOption 是 OpenAICompatClient 的配置选项. type OpenAICompatOption func(*OpenAICompatClient) // WithHTTPClient 注入自定义 HTTP 客户端(代理,超时等). // // 注意: WithHTTPClient 会替换整个 httpClient 包括 Transport.与 // WithResponseHeaderTimeout 推荐二选一使用, 见 provider.New() 实现. func WithHTTPClient(hc *http.Client) OpenAICompatOption { return func(c *OpenAICompatClient) { c.httpClient = hc } } // WithResponseHeaderTimeout 覆盖默认 http.Client 的 ResponseHeaderTimeout. // // 这是"请求发出到收到响应首字节"的时间上限, **不影响** SSE 流式 body 后续读取. // LLM provider 的正确超时语义: 捕捉服务端死等, 放行长流式输出. // // 精妙之处(CLEVER): 绝对不要用 http.Client.Timeout 代替此函数 - 那会砍死流式调用. // 详见 defaultResponseHeaderTimeout 常量注释. // // 安全兜底: 如果 httpClient.Transport 不是 *http.Transport (消费者自定义 RoundTripper // 或先调用了 WithHTTPClient 替换整个 client), 此 option silent no-op 不 panic. // provider.New() 约定二选一使用, no-op 只在 wire 包被其他路径滥用时触发. func WithResponseHeaderTimeout(d time.Duration) OpenAICompatOption { return func(c *OpenAICompatClient) { if t, ok := c.httpClient.Transport.(*http.Transport); ok && t != nil { t.ResponseHeaderTimeout = d } } } // HTTPClient 返回底层 *http.Client, 用于测试,introspection 或需要直接发请求的场景. // 升华改进(ELEVATED): 暴露此 getter 让 provider 包单元测试能断言超时配置是否正确传递. // 与 internal/transport/client.go 的同名 getter 对称. func (c *OpenAICompatClient) HTTPClient() *http.Client { return c.httpClient } // WithChatPath 覆盖 chat completions 路径(默认 /v1/chat/completions). // // MiniMax native API 使用 /v1/text/chatcompletion_v2, // 其 SSE 格式与 OpenAI 相同,只需修改路径即可复用本客户端. func WithChatPath(path string) OpenAICompatOption { return func(c *OpenAICompatClient) { c.chatPath = path } } // WithExtraHeader 添加额外的 HTTP 请求头. // // 用途: // - OpenRouter: HTTP-Referer,X-Title // - 企业内网 API 网关的自定义鉴权头 func WithExtraHeader(key, value string) OpenAICompatOption { return func(c *OpenAICompatClient) { if c.extraHeaders == nil { c.extraHeaders = make(map[string]string) } c.extraHeaders[key] = value } } // NewOpenAICompatClient 创建 OpenAI 兼容客户端. func NewOpenAICompatClient(apiKey, baseURL string, opts ...OpenAICompatOption) *OpenAICompatClient { c := &OpenAICompatClient{ apiKey: apiKey, baseURL: strings.TrimRight(baseURL, "/"), chatPath: "/v1/chat/completions", httpClient: newDefaultHTTPClient(), } for _, opt := range opts { opt(c) } return c } // --- 请求 JSON 结构 --- // openaiRespFmt 是 response_format 请求字段. // // 精妙之处(CLEVER): OpenAI json_object 模式不需要 schema-- // 只声明"我要 JSON",模型自行保证输出可解析. // 与 Anthropic 的 json_schema 模式(需要完整 schema + Beta flag)不同, // 这是最小成本的结构化输出约束,适合探测和简单场景. type openaiRespFmt struct { Type string `json:"type"` // "json_object" | "json_schema" | "text" JSONSchema *openaiJSONSchema `json:"json_schema,omitempty"` // 仅 json_schema 类型时填充 } // openaiJSONSchema 是 OpenAI json_schema 模式的包装层. // // 升华改进(ELEVATED): OpenAI API 要求三层嵌套结构: // // response_format.json_schema.schema = <实际 JSON Schema> // // 而 flyto.ResponseFormat.JSONSchema 直接是原始 schema bytes. // 在 wire 层做包装,上层(flyto/engine)无需感知 OpenAI 的嵌套格式. // 替代方案:<在 flyto.ResponseFormat 里直接带上 name/strict 字段> - 否决: // 绑定了 OpenAI 格式细节,Anthropic/MiniMax 等不需要这些字段. type openaiJSONSchema struct { Name string `json:"name"` Strict bool `json:"strict"` Schema json.RawMessage `json:"schema"` } // openaiReq 是 Chat Completions 请求体. type openaiReq struct { Model string `json:"model"` Messages []openaiMsg `json:"messages"` MaxTokens int `json:"max_tokens,omitempty"` Tools []openaiTool `json:"tools,omitempty"` Stream bool `json:"stream"` StreamOptions *streamOptions `json:"stream_options,omitempty"` Reasoning *openaiReasoning `json:"reasoning,omitempty"` // OpenRouter / o1 系列 ResponseFormat *openaiRespFmt `json:"response_format,omitempty"` // Temperature / TopP: nil = omit field; passthrough policy. Upstream // validates range; OpenAI o-series / gpt-5 reasoning reject any non-1 // temperature with 4xx, which surfaces as ErrorEvent (no client-side // model-prefix list -- see ADR rule of two). // // Temperature / TopP: nil = wire 不传; passthrough 策略. 上游校验范围; // OpenAI o 系列 / gpt-5 reasoning 拒绝非 1 temperature 4xx 自然冒泡为 // ErrorEvent (不在客户端维护 model-prefix 列表 -- 见 ADR rule of two). Temperature *float64 `json:"temperature,omitempty"` TopP *float64 `json:"top_p,omitempty"` } type streamOptions struct { IncludeUsage bool `json:"include_usage"` } // openaiMsg 是 Chat Completions 请求中的消息. // // Content 使用 json.RawMessage 以支持两种格式: // - 纯文本:`"hello"` // - 多部分:`[{"type":"text","text":"hello"}]`(vision 场景) type openaiMsg struct { Role string `json:"role"` Content json.RawMessage `json:"content,omitempty"` ToolCalls []openaiToolCall `json:"tool_calls,omitempty"` ToolCallID string `json:"tool_call_id,omitempty"` Name string `json:"name,omitempty"` } type openaiTool struct { Type string `json:"type"` // "function" Function openaiToolFn `json:"function"` } type openaiToolFn struct { Name string `json:"name"` Description string `json:"description"` Parameters json.RawMessage `json:"parameters"` // JSON Schema } type openaiToolCall struct { Index int `json:"index"` ID string `json:"id,omitempty"` Type string `json:"type,omitempty"` // "function" Function struct { Name string `json:"name,omitempty"` Arguments string `json:"arguments,omitempty"` } `json:"function"` } // openaiReasoningDetail 是 OpenRouter 的 reasoning_details 数组元素. // // 关键修复(BUGFIX): OpenRouter 的 thinking 不是单个字符串字段,而是对象数组: // // "reasoning_details": [{"type": "reasoning.text", "text": "..."}, ...] // // 类型列表(OpenRouter 文档): // - "reasoning.text" - 思考过程文本片段(最常见) // - "reasoning.summary" - 思考摘要(某些模型开启 reasoning_summary 时出现) // - "thinking" - 部分本地模型(Qwen)使用此 type 名 // // 与 OpenAI o1/o3 的 reasoning_content(字符串字段)不同,两者不可混淆. type openaiReasoningDetail struct { Type string `json:"type"` // "reasoning.text" / "reasoning.summary" / "thinking" Text string `json:"text"` } // Reasoning 是 OpenRouter / o1 系列 / MiniMax 的 thinking 参数(公开类型,供 provider 层使用). type Reasoning struct { MaxTokens int `json:"max_tokens,omitempty"` // Anthropic/Gemini/Qwen/MiniMax 模型 Effort string `json:"effort,omitempty"` // OpenAI o1/o3: "high"/"medium"/"low" Enabled bool `json:"enabled,omitempty"` // 简单开关(使用默认配置) } // openaiReasoning 是序列化用的内部别名(保持 JSON 字段名一致). type openaiReasoning = Reasoning // --- 响应 JSON 结构 --- // openaiChunk 是流式响应的单个 SSE 数据块. type openaiChunk struct { ID string `json:"id"` Object string `json:"object"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Delta struct { Role string `json:"role"` Content string `json:"content"` // 关键修复(BUGFIX): reasoning/thinking 字段各家不同,存在两种格式-- // 旧版代码用 `reasoning`(非标准字段名),两种格式都无法正确解析. // // ReasoningContent: OpenAI o1/o3 原生格式,字符串字段. // 文档:platform.openai.com/docs/guides/reasoning ReasoningContent string `json:"reasoning_content"` // ReasoningDetails: OpenRouter 格式,对象数组(见 openaiReasoningDetail). // 多个 provider 经由 OpenRouter 转发时统一使用此格式. // 文档:openrouter.ai/docs/use-cases/reasoning-tokens ReasoningDetails []openaiReasoningDetail `json:"reasoning_details"` ToolCalls []openaiToolCall `json:"tool_calls"` } `json:"delta"` FinishReason *string `json:"finish_reason"` } `json:"choices"` // 精妙之处(CLEVER): usage 只在最后一个 chunk 出现(需要 stream_options.include_usage=true). // 我们每次都请求 include_usage=true,这样可以在流结束时获取完整用量统计, // 不需要在客户端自行累计 token 数(容易出错且对缓存 token 无能为力). Usage *struct { PromptTokens int `json:"prompt_tokens"` CompletionTokens int `json:"completion_tokens"` PromptTokensDetails *struct { CachedTokens int `json:"cached_tokens"` } `json:"prompt_tokens_details"` // CompletionTokensDetails 包含 o1/o3 的 reasoning_tokens(思考消耗的 token 数). // 目前仅记录,不单独上报给调用层(合并进 OutputTokens). CompletionTokensDetails *struct { ReasoningTokens int `json:"reasoning_tokens"` } `json:"completion_tokens_details"` // OpenRouter / MiniMax 特有的缓存写入字段 CacheWriteTokens int `json:"cache_write_tokens"` } `json:"usage"` // mid-stream 错误(OpenRouter 格式) Error *struct { Code int `json:"code"` Message string `json:"message"` } `json:"error"` } // StreamRequest 是发给 OpenAI 兼容端点的请求参数(公开给 provider 层). type StreamRequest struct { Model string Messages []flyto.Message System string // 系统提示(转换为 system role 消息) MaxTokens int Tools []flyto.Tool Reasoning *Reasoning // 可选 thinking 参数(OpenRouter / o1 系列 / MiniMax) ResponseFormat *flyto.ResponseFormat // 结构化输出格式(nil = 文本) // ThinkingBudget Gemini 专用 per-request thinking budget(其他 provider 忽略此字段). // > 0 时在 generationConfig.thinkingConfig 中传递. ThinkingBudget int // EnableSystemCaching 为系统消息添加 cache_control: ephemeral 标记(数组格式). // 用于 OpenRouter → Anthropic 路径的 prompt caching: // OpenRouter 将 cache_control 透传给 Anthropic,命中时 usage.cached_tokens > 0. // 注意:仅对支持 Anthropic 协议的后端有效(OpenAI 原生 API 会忽略此字段). EnableSystemCaching bool // Temperature / TopP: per-request sampling knobs. Nil = omit on the // wire (upstream uses its default). OpenAICompatClient.buildRequest // passes these straight to the openaiReq JSON; Gemini's buildRequest // embeds them under generationConfig. // // Temperature / TopP: 本次请求的采样旋钮. nil = wire 不传 (上游默认). // OpenAICompatClient.buildRequest 直接写入 openaiReq JSON; Gemini // 的 buildRequest 嵌入到 generationConfig 下. Temperature *float64 TopP *float64 } // Stream 向 OpenAI 兼容端点发起流式请求,返回 flyto.Event channel. // // channel 关闭表示流结束.最后一个事件可能是 *flyto.ErrorEvent(出错时) // 或 *flyto.UsageEvent(正常结束时). func (c *OpenAICompatClient) Stream(ctx context.Context, req *StreamRequest) (<-chan flyto.Event, error) { body, err := c.buildRequest(req) if err != nil { return nil, fmt.Errorf("openai_compat: build request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+c.chatPath, bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("openai_compat: create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) for k, v := range c.extraHeaders { httpReq.Header.Set(k, v) } resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("openai_compat: http request: %w", err) } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("openai_compat: http %d", resp.StatusCode) } // 精妙之处(CLEVER): 部分 provider(如 MiniMax)在鉴权失败时返回 HTTP 200 + JSON 错误体, // Content-Type 为 application/json 而非 text/event-stream. // SSE parser 找不到 data: 行,channel 静默关闭,调用层无法区分"空响应"和"错误". // 通过 Content-Type 提前检测,将错误体解析为 error 返回,避免误报. // MiniMax 格式: {"base_resp":{"status_code":2049,"status_msg":"invalid api key"}} // OpenAI 格式: {"error":{"message":"..."}} if ct := resp.Header.Get("Content-Type"); ct != "" && !strings.Contains(ct, "text/event-stream") { body, _ := io.ReadAll(io.LimitReader(resp.Body, MaxErrorBodyBytes)) resp.Body.Close() return nil, parseNonSSEError(body, ct) } ch := make(chan flyto.Event, 32) go func() { defer close(ch) defer resp.Body.Close() c.consumeSSE(ctx, resp, ch) }() return ch, nil } // buildRequest 将 StreamRequest 序列化为 JSON 请求体. func (c *OpenAICompatClient) buildRequest(req *StreamRequest) ([]byte, error) { msgs := flytoMessagesToOpenAI(req.Messages, req.System, req.EnableSystemCaching) var tools []openaiTool for _, t := range req.Tools { tools = append(tools, openaiTool{ Type: "function", Function: openaiToolFn{ Name: t.Name, Description: t.Description, Parameters: t.InputSchema, }, }) } r := openaiReq{ Model: req.Model, Messages: msgs, MaxTokens: req.MaxTokens, Tools: tools, Stream: true, // 精妙之处(CLEVER): 始终请求 include_usage-- // 这样最后一个 chunk 携带完整 usage,省去在客户端累计 token 的麻烦. // 额外开销:最后一个 chunk 多几十字节,完全可以接受. StreamOptions: &streamOptions{IncludeUsage: true}, Reasoning: req.Reasoning, Temperature: req.Temperature, TopP: req.TopP, } if req.ResponseFormat != nil { rf := &openaiRespFmt{Type: req.ResponseFormat.Type} if req.ResponseFormat.Type == "json_schema" && len(req.ResponseFormat.JSONSchema) > 0 { rf.JSONSchema = &openaiJSONSchema{ Name: "response", Strict: true, Schema: req.ResponseFormat.JSONSchema, } } r.ResponseFormat = rf } return json.Marshal(r) } // consumeSSE 从 HTTP 响应中读取 SSE 流并转换为 flyto.Event. func (c *OpenAICompatClient) consumeSSE(ctx context.Context, resp *http.Response, ch chan<- flyto.Event) { // 精妙之处(CLEVER): 1MB Scanner buffer-- // 工具调用的 arguments 字段可能累积到几百 KB(大型 JSON 操作), // 默认 64KB buffer 在这种情况下会静默截断,导致 JSON 解析失败. // 与其在发现问题后修 bug,不如预先分配合理上限. // // 升华改进(ELEVATED): io.LimitReader 包裹 resp.Body-- // 1MB 是 per-chunk 的 scanner buffer,但 total 流可能无限大. // LimitReader 在 100MB 处触发 EOF,scanner.Scan() 返回 false,goroutine 正常退出. // 替代方案:<不限制 total> - 否决:OOM 风险,恶意服务端可无限推送数据. scanner := bufio.NewScanner(io.LimitReader(resp.Body, MaxStreamingBodyBytes)) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 工具调用状态:按 index 追踪每个工具调用的累积数据 type pendingToolCall struct { id string name string argumentBuf strings.Builder } toolCalls := make(map[int]*pendingToolCall) var textBuf strings.Builder var reasoningBuf strings.Builder var finishReason string for scanner.Scan() { // 升华改进(ELEVATED): 每轮检查 ctx 是否已取消-- // HTTP 底层未必立即断开(如长连接),ctx 取消后 scanner 仍可能 block 在 Read(). // 显式检查确保 goroutine 在 ctx 取消后一个 chunk 内退出,而非依赖底层隐式传播. // 替代方案:<只依赖 resp.Body.Close() 隐式触发 EOF> - 否决:TCP 半关闭场景下 // Body.Close() 不保证立即解除 Read() 阻塞,goroutine 可能 hang 数秒. if ctx.Err() != nil { return } line := scanner.Text() // OpenRouter 发送 `: OPENROUTER PROCESSING` 类型的 keepalive 注释,直接跳过 if strings.HasPrefix(line, ":") || line == "" { continue } if !strings.HasPrefix(line, "data: ") { continue } data := line[6:] if data == "[DONE]" { break } var chunk openaiChunk if err := json.Unmarshal([]byte(data), &chunk); err != nil { // 非标准 JSON,跳过(部分 provider 发送 ping 等非数据行) continue } // mid-stream 错误(OpenRouter 格式) if chunk.Error != nil { ch <- &flyto.ErrorEvent{ Err: fmt.Errorf("openai_compat: provider error %d: %s", chunk.Error.Code, chunk.Error.Message), Code: "provider_error", Retryable: chunk.Error.Code == 429 || chunk.Error.Code == 529, } return } if len(chunk.Choices) == 0 { // 可能是只有 usage 字段的最终 chunk if chunk.Usage != nil { ch <- buildUsageEvent(chunk, finishReason) } continue } choice := chunk.Choices[0] // 文本增量 if choice.Delta.Content != "" { textBuf.WriteString(choice.Delta.Content) ch <- &flyto.TextDeltaEvent{Text: choice.Delta.Content} } // Thinking/Reasoning 增量 - 兼容两种格式: // // 格式1: OpenAI o1/o3 原生--单个字符串字段 reasoning_content if choice.Delta.ReasoningContent != "" { reasoningBuf.WriteString(choice.Delta.ReasoningContent) ch <- &flyto.ThinkingDeltaEvent{Text: choice.Delta.ReasoningContent} } // 格式2: OpenRouter--reasoning_details 对象数组(见 openaiReasoningDetail) // 精妙之处(CLEVER): 用白名单 type 过滤而非直接取 text, // 避免把 "redacted"(思考被截断的占位符)等非文本 type 输出给用户. for _, rd := range choice.Delta.ReasoningDetails { if rd.Text == "" { continue } switch rd.Type { case "reasoning.text", "reasoning.summary", "thinking": reasoningBuf.WriteString(rd.Text) ch <- &flyto.ThinkingDeltaEvent{Text: rd.Text} } } // 工具调用增量 for _, tc := range choice.Delta.ToolCalls { ptc, exists := toolCalls[tc.Index] if !exists { ptc = &pendingToolCall{} toolCalls[tc.Index] = ptc } if tc.ID != "" { ptc.id = tc.ID } if tc.Function.Name != "" { ptc.name = tc.Function.Name } ptc.argumentBuf.WriteString(tc.Function.Arguments) } // finish_reason 非 nil 表示本 choice 结束 if choice.FinishReason != nil { finishReason = *choice.FinishReason // 发出完整文本事件 if textBuf.Len() > 0 { ch <- &flyto.TextEvent{Text: textBuf.String()} } // 发出完整 thinking 事件 if reasoningBuf.Len() > 0 { ch <- &flyto.ThinkingEvent{Text: reasoningBuf.String()} } // 发出工具调用事件 // 精妙之处(CLEVER): OpenAI 格式没有 content_block_stop 概念-- // 所有工具调用的 arguments 都在 finish_reason 出现后才算完整. // 与 Anthropic 格式(每个 block 单独有 stop 事件)不同, // 这里统一在 finish_reason 时批量发出所有工具调用事件. for i := 0; i < len(toolCalls); i++ { ptc, ok := toolCalls[i] if !ok { continue } argsStr := ptc.argumentBuf.String() var toolInput map[string]any if argsStr != "" && json.Valid([]byte(argsStr)) { _ = json.Unmarshal([]byte(argsStr), &toolInput) } if toolInput == nil { toolInput = make(map[string]any) } ch <- &flyto.ToolUseEvent{ ID: ptc.id, ToolName: ptc.name, Input: toolInput, } } // 发出 usage 事件(如果此 chunk 携带了 usage) if chunk.Usage != nil { ch <- buildUsageEvent(chunk, finishReason) } } } if err := scanner.Err(); err != nil && ctx.Err() == nil { ch <- &flyto.ErrorEvent{ Err: fmt.Errorf("openai_compat: sse scan error: %w", err), Code: "stream_error", Retryable: true, } } } // buildUsageEvent 从 chunk 中提取 usage 信息构造 UsageEvent. func buildUsageEvent(chunk openaiChunk, stopReason string) *flyto.UsageEvent { evt := &flyto.UsageEvent{StopReason: stopReason} if chunk.Usage != nil { evt.InputTokens = chunk.Usage.PromptTokens evt.OutputTokens = chunk.Usage.CompletionTokens if chunk.Usage.PromptTokensDetails != nil { evt.CacheReadTokens = chunk.Usage.PromptTokensDetails.CachedTokens } evt.CacheCreationTokens = chunk.Usage.CacheWriteTokens } return evt } // --- 消息格式转换 --- // flytoMessagesToOpenAI 将 flyto.Message 列表转换为 OpenAI messages 格式. // // 精妙之处(CLEVER): flyto 和 OpenAI 的消息格式存在结构差异-- // Anthropic 格式中,工具结果以 "user" role 的 tool_result block 表示, // 且一个 user 消息可包含多个 tool_result block. // OpenAI 格式中,每个工具结果是独立的 "tool" role 消息,通过 tool_call_id 关联. // 转换规则:flyto BlockToolResult → 独立的 "tool" role OpenAI 消息. // 替代方案:<强制两种格式对齐,使 flyto.Message 无法表达多工具结果> - // 否决:限制了 flyto 格式的表达能力,其他 provider 无此限制. // // cacheSystem=true 时,系统消息使用数组格式并添加 cache_control: ephemeral-- // OpenRouter 将此标记透传给 Anthropic,命中缓存时 usage.cached_tokens > 0. // 普通字符串格式(cacheSystem=false)对 OpenAI 原生 API 透明,不产生副作用. func flytoMessagesToOpenAI(msgs []flyto.Message, systemPrompt string, cacheSystem bool) []openaiMsg { var result []openaiMsg if systemPrompt != "" { if cacheSystem { // 升华改进(ELEVATED): OpenRouter Anthropic caching 要求系统消息使用 content 数组格式, // 并在最后一个 block 上打 cache_control: ephemeral 断点. // 普通字符串格式("content": "...")无法携带 cache_control,缓存永远不会建立. // 替代方案:<在消息层之外通过 beta header 开启 caching> - // 否决:OpenRouter 不支持透传 Anthropic beta header,只能靠 content block 标记. type cacheBlock struct { Type string `json:"type"` Text string `json:"text"` CacheControl map[string]string `json:"cache_control"` } blocks := []cacheBlock{{ Type: "text", Text: systemPrompt, CacheControl: map[string]string{"type": "ephemeral"}, }} raw, _ := json.Marshal(blocks) result = append(result, openaiMsg{Role: "system", Content: raw}) } else { raw, _ := json.Marshal(systemPrompt) result = append(result, openaiMsg{Role: "system", Content: raw}) } } for _, msg := range msgs { switch msg.Role { case flyto.RoleUser: // 拆分:tool_result → "tool" 消息,text → "user" 消息 // 顺序:工具结果先于新的用户文本(符合 OpenAI 的对话流语义) for _, b := range msg.Blocks { if b.Type == flyto.BlockToolResult { raw, _ := json.Marshal(b.ResultText) result = append(result, openaiMsg{ Role: "tool", Content: raw, ToolCallID: b.ToolUseID, }) } } var textParts []string for _, b := range msg.Blocks { if b.Type == flyto.BlockText && b.Text != "" { textParts = append(textParts, b.Text) } } if len(textParts) > 0 { raw, _ := json.Marshal(strings.Join(textParts, "\n")) result = append(result, openaiMsg{Role: "user", Content: raw}) } case flyto.RoleAssistant: var textParts []string var toolCalls []openaiToolCall for i, b := range msg.Blocks { switch b.Type { case flyto.BlockText: textParts = append(textParts, b.Text) case flyto.BlockToolUse: argsJSON, _ := json.Marshal(b.ToolInput) toolCalls = append(toolCalls, openaiToolCall{ Index: i, ID: b.ToolUseID, Type: "function", Function: struct { Name string `json:"name,omitempty"` Arguments string `json:"arguments,omitempty"` }{ Name: b.ToolName, Arguments: string(argsJSON), }, }) } } m := openaiMsg{Role: "assistant"} if len(textParts) > 0 { raw, _ := json.Marshal(strings.Join(textParts, "\n")) m.Content = raw } if len(toolCalls) > 0 { m.ToolCalls = toolCalls } result = append(result, m) } } return result } // --- 模型列表获取 --- // FetchOpenAIModels 从 /v1/models 端点获取模型列表(OpenAI 格式). // // 适用于:OpenAI 官方 API,LM Studio. // Ollama 使用不同的 /api/tags 端点,见 FetchOllamaModels. func (c *OpenAICompatClient) FetchOpenAIModels(ctx context.Context) ([]flyto.ModelInfo, error) { httpReq, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/v1/models", nil) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) for k, v := range c.extraHeaders { httpReq.Header.Set(k, v) } resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() var body struct { Data []struct { ID string `json:"id"` OwnedBy string `json:"owned_by"` } `json:"data"` } if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { return nil, err } var models []flyto.ModelInfo for _, m := range body.Data { models = append(models, flyto.ModelInfo{ ID: m.ID, DisplayName: m.ID, Provider: m.OwnedBy, }) } return models, nil } // FetchOllamaModels 从 Ollama 的 /api/tags 端点获取本地模型列表. func (c *OpenAICompatClient) FetchOllamaModels(ctx context.Context) ([]flyto.ModelInfo, error) { httpReq, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/api/tags", nil) if err != nil { return nil, err } resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() var body struct { Models []struct { Name string `json:"name"` Details struct { ParameterSize string `json:"parameter_size"` QuantizationLevel string `json:"quantization_level"` } `json:"details"` } `json:"models"` } if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { return nil, err } var models []flyto.ModelInfo for _, m := range body.Models { displayName := m.Name if m.Details.ParameterSize != "" { displayName = fmt.Sprintf("%s (%s %s)", m.Name, m.Details.ParameterSize, m.Details.QuantizationLevel) } models = append(models, flyto.ModelInfo{ ID: m.Name, DisplayName: displayName, Provider: "ollama", SupportsVision: false, // Ollama 运行时无法可靠判断,保守返回 false }) } return models, nil } // FetchOpenRouterModels 从 OpenRouter 的 /api/v1/models 端点获取模型列表. func (c *OpenAICompatClient) FetchOpenRouterModels(ctx context.Context) ([]flyto.ModelInfo, error) { httpReq, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/api/v1/models", nil) if err != nil { return nil, err } httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) for k, v := range c.extraHeaders { httpReq.Header.Set(k, v) } resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() var body struct { Data []struct { ID string `json:"id"` Name string `json:"name"` ContextLength int `json:"context_length"` MaxCompletionTokens int `json:"max_completion_tokens"` Pricing struct { Prompt string `json:"prompt"` // per-token 价格(字符串) Completion string `json:"completion"` } `json:"pricing"` SupportedParameters []string `json:"supported_parameters"` } `json:"data"` } if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { return nil, err } var models []flyto.ModelInfo for _, m := range body.Data { info := flyto.ModelInfo{ ID: m.ID, DisplayName: m.Name, Provider: "openrouter", ContextWindow: m.ContextLength, MaxOutputTokens: m.MaxCompletionTokens, // 精妙之处(CLEVER): OpenRouter 定价是 per-token(字符串),需转为 per-1M-- // 用 parseOpenRouterPrice 将 "0.000003" 转为 3.0(USD/1M tokens). InputPricePer1M: parseOpenRouterPrice(m.Pricing.Prompt), OutputPricePer1M: parseOpenRouterPrice(m.Pricing.Completion), } // 从 supported_parameters 推断能力 for _, p := range m.SupportedParameters { switch p { case "reasoning": info.SupportsThinking = true case "tools": // 大多数模型支持工具,不单独标记 } } models = append(models, info) } return models, nil } // parseNonSSEError 从非 SSE 响应体中提取人类可读的错误信息. // // 支持两种常见格式: // - OpenAI/OpenRouter 格式:{"error":{"message":"..."}} // - MiniMax 格式:{"base_resp":{"status_code":2049,"status_msg":"invalid api key"}} // // 如果无法解析,将 body 前 256 字节作为原始错误信息返回. func parseNonSSEError(body []byte, contentType string) error { var errJSON struct { Error *struct { Message string `json:"message"` } `json:"error"` BaseResp *struct { StatusCode int `json:"status_code"` StatusMsg string `json:"status_msg"` } `json:"base_resp"` } if json.Unmarshal(body, &errJSON) == nil { if errJSON.Error != nil && errJSON.Error.Message != "" { return fmt.Errorf("openai_compat: provider error: %s", errJSON.Error.Message) } if errJSON.BaseResp != nil && errJSON.BaseResp.StatusCode != 0 { return fmt.Errorf("openai_compat: base_resp %d: %s", errJSON.BaseResp.StatusCode, errJSON.BaseResp.StatusMsg) } } preview := body if len(preview) > 256 { preview = preview[:256] } return fmt.Errorf("openai_compat: unexpected non-SSE response (Content-Type: %s): %s", contentType, preview) } // parseOpenRouterPrice 将 OpenRouter 的 per-token 价格字符串转换为 per-1M USD. // // OpenRouter 价格格式:"0.000003"(每 token 的 USD 价格). // 我们统一用 per-1M tokens 的 USD 价格. func parseOpenRouterPrice(s string) float64 { if s == "" || s == "0" { return 0 } var v float64 // 精妙之处(CLEVER): 用 fmt.Sscanf 而非 strconv.ParseFloat, // 因为 OpenRouter 偶尔返回 "N/A" 或空字符串,ParseFloat 会返回 error, // Sscanf 在 0 个成功 scan 时直接返回 0,不需要 err 检查. fmt.Sscanf(s, "%f", &v) return v * 1_000_000 }