Documentation
¶
Overview ¶
Package wire 提供独立于 provider 身份的 SSE 协议解析器.
anthropic.go - Anthropic Messages API SSE 格式解析器.
输入:io.ReadCloser(HTTP 响应体) 输出:<-chan flyto.Event(直接产出公共事件,无中间类型)
升华改进(ELEVATED): 早期方案将 SSE 解析嵌在 internal/api/client.go 中, 和 HTTP transport 混杂,也产出 Anthropic 专有的 StreamEvent 中间类型. 现在解析器独立为本文件,client.go 仅做 HTTP transport. 好处:
- 可独立测试(传入任意 io.Reader 即可,无需 mock HTTP)
- 多 provider 共享同一个 Anthropic 解析器(MiniMax Anthropic-compat 端点也用此)
- flyto.Event 直接产出,provider 层无需二次转换
关键修复(BUGFIX): cache token 字段只从 message_start 读取. Anthropic 文档确认:message_delta 中的 usage 也含 cache 相关字段, 但那是累计值,与 message_start 中的值相加会导致重复计数. 参考:https://github.com/langchain-ai/langchainjs/issues/10249
Package wire provides internal shared protocol adapters.
gemini.go - Google Gemini (Generative Language API) SSE client.
Gemini 的流式协议与 Anthropic / OpenAI 均不同,是独立的第三种格式:
每个 SSE chunk 是完整的 GenerateContentResponse(非 delta 结构), 文本字段是增量(delta),但 functionCall / usageMetadata 是完整数据.
思考内容(Thinking)通过 part.thought = true 标记, 而非独立的 "thinking" 类型事件,与正文文本共用同一 parts 数组.
工具调用(functionCall)在一个 chunk 内完整到达, 无需像 OpenAI 那样在 finish_reason 时批量"拼接".
用量统计(usageMetadata)在最后一个 chunk 的顶层字段, 不在 candidates 内部.
支持的认证方式:
- Google AI Studio:API Key 通过查询参数传递(?key=...)
- Vertex AI:Bearer Token 通过 Authorization header 传递
升华改进(ELEVATED): 早期实现 没有 Gemini provider-- 我们是首次实现,参考 Google 官方文档设计. 与 OpenAI 兼容层的对比:函数调用完整到达(无 arguments 拼接逻辑), 代码路径更简洁;但消息格式转换更复杂(functionResponse 需要函数名, 而 flyto.BlockToolResult 只存 ToolUseID).
limits.go - HTTP 响应体大小限制常量 (wire 包导出).
L1330 (2026-04-13): 原先 internal/wire/openai.go:50-51 和 internal/transport/client.go:41-42 分别定义了同名同值的 maxErrorBodyBytes / maxStreamingBodyBytes, 两份独立声明意味着 未来 bump 时只改一边会产生 "wire 接受 100MB 但 transport 拒绝 50MB" 之类的静默漂移. 提取到 wire 包单一源头: wire 不依赖 transport, 而 transport 已依赖 wire (ParseAnthropicStream), 所以放 wire 是唯一不破坏包依赖方向的选择.
升华改进(ELEVATED): 早期方案无任何响应体大小限制--恶意 API 服务端可发送 GB 级响应导致 OOM. error body 限 1MB (足以承载任何 API 错误消息); streaming body 限 100MB (足以承载正常对话轮次). 超限后 io.LimitReader 触发 EOF, consumeSSE goroutine 正常退出, 不会死锁或泄漏. 替代方案: 不限制 - 否决, OOM 风险, 单次恶意请求可崩溃进程.
Package wire 提供内部共享协议适配器.
openai.go - OpenAI Chat Completions SSE 客户端.
多家 provider 使用相同的 OpenAI 流式协议:
- OpenAI 官方 API
- OpenRouter(聚合网关,完全兼容 OpenAI)
- Ollama(本地部署,OpenAI 兼容模式)
- LM Studio(本地部署,OpenAI 兼容)
- MiniMax native API(SSE 格式与 OpenAI 相同,仅端点路径不同)
设计:
- 本包仅被 pkg/providers/* 调用,不对外暴露(internal 包规则)
- 接受 flyto.Request 输入,输出 flyto.Event channel
- provider 层只需配置 baseURL / headers / modelTable,不需要重复 SSE 解析逻辑
关键修复(BUGFIX): reasoning/thinking 字段各家不同,存在三种格式:
- OpenAI o1/o3: `choices[0].delta.reasoning_content`(字符串)
- OpenRouter: `choices[0].delta.reasoning_details`(对象数组,含 type 字段) 旧版代码用 `reasoning` 字段(非标准),两种格式都无法正确解析.
升华改进(ELEVATED): 早期实现 没有统一 OpenAI 兼容层--每个 provider 各自实现完整的 SSE 解析,存在大量重复代码. 我们将公共部分提取到此文件,各 provider 保持薄封装(<150行). 替代方案:<各 provider 自行实现完整 SSE 解析> - 否决:4个 provider 各500行, 任何 bug 修复都要改4处,维护成本线性增长.
Package wire/schema.go - Tool Schema 预处理工具
精妙之处(CLEVER): 在 wire 层统一处理 schema 兼容性问题, 而非在每个 provider 各自实现-- $ref 展开逻辑复杂,集中实现一次,所有有 bug 的 provider 共用.
背景(BUGFIX): MiniMax 直连和 OpenRouter→Gemini 有 $ref 双重序列化 bug: 工具 schema 里用 $ref 引用 $defs 时,返回的 tool input 中被引用字段是 JSON 字符串而非 object,导致静默数据损坏. 在发送前展开 $ref,彻底规避此类 provider 端的序列化缺陷.
替代方案:<各 provider 各自处理 $ref 展开> - 否决: $ref 递归展开有循环引用检测等复杂逻辑,各自实现容易出现不一致 bug, 且每次新增有问题的 provider 都需重复实现.
AdaptSchema: 各 provider 对 JSON Schema 约束支持不一, 消费者写标准 schema,引擎自动裁剪不支持的约束.
ValidateSchemaComplexity: OpenAI strict 有隐藏嵌套/属性数/enum 上限, 超出时仅返回 400 + 晦涩错误;引擎提前检测,给出可读错误.
Package wire/tools.go - Tool 数量上限保护
背景(BUGFIX): OpenAI Chat API ≤ 128 个工具(OpenAPI spec 明确), Anthropic strict ≤ 20,超出直接 400 且错误信息晦涩. 在发送前做前置检查,把"provider 500/400 含义不明"变成"引擎侧明确错误".
精妙之处(CLEVER): 不截断,让消费者自己决定裁剪哪些工具-- 截断会改变 agent 的能力语义,引擎无法判断哪些工具重要,哪些可丢弃. 报错让消费者在正确的抽象层级做决策.
替代方案:<截断到 max,多余工具静默丢弃> - 否决: Agent 场景下工具缺失会导致静默功能降级,比 400 错误更难 debug.
Index ¶
- Constants
- Variables
- func AdaptSchema(schema json.RawMessage, provider string) json.RawMessage
- func CheckToolCount(tools []flyto.Tool, max int) error
- func DereferenceSchema(schema json.RawMessage) (json.RawMessage, error)
- func ParseAnthropicStream(ctx context.Context, body io.ReadCloser) <-chan flyto.Event
- func ValidateSchemaComplexity(schema json.RawMessage, provider string) error
- type GeminiClient
- type GeminiOption
- type OpenAICompatClient
- func (c *OpenAICompatClient) FetchOllamaModels(ctx context.Context) ([]flyto.ModelInfo, error)
- func (c *OpenAICompatClient) FetchOpenAIModels(ctx context.Context) ([]flyto.ModelInfo, error)
- func (c *OpenAICompatClient) FetchOpenRouterModels(ctx context.Context) ([]flyto.ModelInfo, error)
- func (c *OpenAICompatClient) HTTPClient() *http.Client
- func (c *OpenAICompatClient) Stream(ctx context.Context, req *StreamRequest) (<-chan flyto.Event, error)
- type OpenAICompatOption
- type Reasoning
- type SchemaComplexityError
- type StreamRequest
Constants ¶
const ( // MaxErrorBodyBytes 是 HTTP 错误响应体 (非 2xx) 的最大读取字节数. // 1MB 足以承载 Anthropic/OpenAI/Gemini 任意 API 错误 JSON (实际通常 < 10KB). MaxErrorBodyBytes = 1 << 20 // 1 MiB // MaxStreamingBodyBytes 是 SSE 流式响应体的最大读取字节数. // 100MB 足以承载正常对话轮次 (包括 prompt cache 大上下文的 chunk 回放). // 触发上限后 io.LimitReader 返回 EOF, 流式解析器优雅终止. MaxStreamingBodyBytes = 100 << 20 // 100 MiB )
Variables ¶
var ErrCircularRef = errors.New("wire/schema: circular $ref detected")
ErrCircularRef 表示 schema 中存在循环引用(A → B → A). 循环引用无法内联展开,调用方应选择保留原始 schema 或报错.
var ErrExternalRef = errors.New("wire/schema: external $ref not supported")
ErrExternalRef 表示 schema 中包含外部 URL $ref(非 #/$defs/ 前缀). 外部引用需要网络请求解析,超出本函数职责范围.
Functions ¶
func AdaptSchema ¶
func AdaptSchema(schema json.RawMessage, provider string) json.RawMessage
AdaptSchema 裁剪 schema 中指定 provider 不支持的 JSON Schema 约束.
provider 参数:
- "anthropic" - 移除数值/字符串约束关键字,将约束信息写入 description
- "openai-strict" - 移除 allOf/not/if/then/else 等条件组合关键字
- 其他("minimax"/"openrouter"/"openai"/"gemini" 等)- 原样返回(probe 确认全支持)
精妙之处(CLEVER): 裁剪 Anthropic 约束时将其写入 description,而非静默丢弃-- 例如 minimum=1, maximum=100 变成 description 追加 "(range: 1-100)". 模型在没有 schema validation 时依然能"看到"约束,这比静默裁剪更安全. 原方案:<静默丢弃约束> - 否决:模型可能生成不合法的值,且无法从 description 中感知约束存在.
替代方案:<返回 error 让消费者处理> - 否决: 约束裁剪是引擎自动化兼容的核心职责,报错会破坏"消费者写标准 schema"的体验. 无法裁剪的情况(JSON 解析失败)静默 fallback 到原值,不影响请求发出.
func CheckToolCount ¶
CheckToolCount 验证工具数量不超过 provider 上限.
max <= 0 时直接返回 nil(表示未知/无限制,跳过检查). len(tools) > max 时返回格式化错误,消费者应在此错误前裁剪工具列表.
已知上限(2026-04):
- OpenAI Chat API : 128(OpenAPI spec 明确记录)
- Anthropic strict: 20(官方文档;非 strict 无明确上限)
- MiniMax : 0(probe 实测 @256 未发现上限,待验证)
- OpenRouter : 0(透传底层模型,自身无额外限制)
func DereferenceSchema ¶
func DereferenceSchema(schema json.RawMessage) (json.RawMessage, error)
DereferenceSchema 展开 schema 中所有内部 $ref 引用.
精妙之处(CLEVER): 两阶段处理--
- 收集 $defs 中所有定义(名称 → JSON 对象)
- 递归遍历 schema,遇到 $ref: "#/$defs/Foo" 时内联替换
展开后删除顶层 $defs 和所有残余 $ref 字段, 保证输出对不支持 $ref 的 provider 完全透明.
不含 $ref 的 schema 原样返回(零开销路径). 只处理内部引用(#/$defs/),外部 URL $ref 返回 ErrExternalRef. 检测循环引用(A → B → A),返回 ErrCircularRef.
替代方案:<先 Unmarshal 到 any 再处理> - 否决: any 路径会丢失 JSON 数字精度(int64 大数变 float64), 直接操作 map[string]json.RawMessage 保留原始字节.
func ParseAnthropicStream ¶
ParseAnthropicStream 解析 Anthropic Messages API 的 SSE 流.
调用方负责在不需要时 cancel ctx 以停止解析. channel 关闭表示解析结束;最后一个事件可能是 *flyto.ErrorEvent(出错时) 或 *flyto.UsageEvent(正常结束时).
精妙之处(CLEVER): 接受 io.ReadCloser 而非 *http.Response-- 解析器只关心字节流,不关心 HTTP 细节. 这使得单元测试可以直接传入 strings.NewReader 而无需 mock HTTP 客户端.
func ValidateSchemaComplexity ¶
func ValidateSchemaComplexity(schema json.RawMessage, provider string) error
ValidateSchemaComplexity 验证 schema 是否满足指定 provider 的复杂度限制.
provider 参数:
- "openai-strict" - 检查嵌套深度(≤10),总属性数(≤5000),enum 值数(≤1000)
- 其他 - 当前无已知硬性限制,始终返回 nil
返回 *SchemaComplexityError(可 type-assert)或 nil.
精妙之处(CLEVER): 三项检查独立运行,返回第一个超限的维度-- 实践中嵌套深度是最常触发的限制,优先报告;总属性数次之;enum 值数最后. 消费者修复第一个问题后重新调用,逐一解决,避免一次 error 掩盖多个问题. 替代方案:<一次返回所有超限维度> - 否决:接口复杂([]error),消费者处理成本高.
Types ¶
type GeminiClient ¶
type GeminiClient struct {
// contains filtered or unexported fields
}
GeminiClient 是 Gemini (Google AI / Vertex AI) SSE 客户端.
func NewGeminiClient ¶
func NewGeminiClient(apiKey, baseURL string, opts ...GeminiOption) *GeminiClient
NewGeminiClient 创建 Gemini SSE 客户端.
baseURL 示例:
func (*GeminiClient) HTTPClient ¶
func (c *GeminiClient) HTTPClient() *http.Client
HTTPClient 返回底层 *http.Client, 用于测试,introspection 或直接发请求的场景. 与 OpenAICompatClient.HTTPClient() 对称.
func (*GeminiClient) Stream ¶
func (c *GeminiClient) Stream(ctx context.Context, req *StreamRequest) (<-chan flyto.Event, error)
StreamRequest 的 Gemini Stream 实现.
type GeminiOption ¶
type GeminiOption func(*GeminiClient)
GeminiOption 是 GeminiClient 的配置选项.
func GeminiWithBearerToken ¶
func GeminiWithBearerToken(token string) GeminiOption
GeminiWithBearerToken 配置 Vertex AI Bearer 认证.
Vertex AI 不使用 API Key,而是使用 OAuth2 Bearer token. 设置此项后,apiKey 查询参数将不再附加到请求 URL.
func GeminiWithHTTPClient ¶
func GeminiWithHTTPClient(hc *http.Client) GeminiOption
GeminiWithHTTPClient 注入自定义 HTTP 客户端(代理,超时等).
注意: GeminiWithHTTPClient 会替换整个 httpClient 包括 Transport.与 GeminiWithResponseHeaderTimeout 推荐二选一使用, 见 provider.New() 实现.
func GeminiWithResponseHeaderTimeout ¶
func GeminiWithResponseHeaderTimeout(d time.Duration) GeminiOption
GeminiWithResponseHeaderTimeout 覆盖默认 http.Client 的 ResponseHeaderTimeout.
约束"请求发出到收到响应首字节"的时间, **不影响** SSE 流式 body 后续读取. 精妙之处(CLEVER): 绝对不要用 http.Client.Timeout 代替此函数 - 那会砍死长流式调用. 与 openai.go 的 WithResponseHeaderTimeout 对称; wire 包 defaultResponseHeaderTimeout 常量为两个 client 共用.
安全兜底: 如果 httpClient.Transport 不是 *http.Transport, 此 option silent no-op 不 panic.
func GeminiWithThinkingBudget ¶
func GeminiWithThinkingBudget(budget int) GeminiOption
GeminiWithThinkingBudget 启用扩展思考并设置预算 token 数.
建议值 1024–16384,0 表示禁用. 思考内容通过 part.thought = true 在 SSE 流中传递, 我们将其映射为 flyto.ThinkingDeltaEvent / flyto.ThinkingEvent.
type OpenAICompatClient ¶
type OpenAICompatClient struct {
// contains filtered or unexported fields
}
OpenAICompatClient 是 OpenAI Chat Completions SSE 客户端.
支持:
- 流式文本输出(choices[].delta.content)
- 工具调用(choices[].delta.tool_calls)
- Thinking/Reasoning(choices[].delta.reasoning,OpenRouter 及部分 provider)
- mid-stream error 格式(OpenRouter 的错误推送方式)
- usage 字段(最后一个 chunk 中,通过 stream_options.include_usage 开启)
func NewOpenAICompatClient ¶
func NewOpenAICompatClient(apiKey, baseURL string, opts ...OpenAICompatOption) *OpenAICompatClient
NewOpenAICompatClient 创建 OpenAI 兼容客户端.
func (*OpenAICompatClient) FetchOllamaModels ¶
FetchOllamaModels 从 Ollama 的 /api/tags 端点获取本地模型列表.
func (*OpenAICompatClient) FetchOpenAIModels ¶
FetchOpenAIModels 从 /v1/models 端点获取模型列表(OpenAI 格式).
适用于:OpenAI 官方 API,LM Studio. Ollama 使用不同的 /api/tags 端点,见 FetchOllamaModels.
func (*OpenAICompatClient) FetchOpenRouterModels ¶
FetchOpenRouterModels 从 OpenRouter 的 /api/v1/models 端点获取模型列表.
func (*OpenAICompatClient) HTTPClient ¶
func (c *OpenAICompatClient) HTTPClient() *http.Client
HTTPClient 返回底层 *http.Client, 用于测试,introspection 或需要直接发请求的场景. 升华改进(ELEVATED): 暴露此 getter 让 provider 包单元测试能断言超时配置是否正确传递. 与 internal/transport/client.go 的同名 getter 对称.
func (*OpenAICompatClient) Stream ¶
func (c *OpenAICompatClient) Stream(ctx context.Context, req *StreamRequest) (<-chan flyto.Event, error)
Stream 向 OpenAI 兼容端点发起流式请求,返回 flyto.Event channel.
channel 关闭表示流结束.最后一个事件可能是 *flyto.ErrorEvent(出错时) 或 *flyto.UsageEvent(正常结束时).
type OpenAICompatOption ¶
type OpenAICompatOption func(*OpenAICompatClient)
OpenAICompatOption 是 OpenAICompatClient 的配置选项.
func WithChatPath ¶
func WithChatPath(path string) OpenAICompatOption
WithChatPath 覆盖 chat completions 路径(默认 /v1/chat/completions).
MiniMax native API 使用 /v1/text/chatcompletion_v2, 其 SSE 格式与 OpenAI 相同,只需修改路径即可复用本客户端.
func WithExtraHeader ¶
func WithExtraHeader(key, value string) OpenAICompatOption
WithExtraHeader 添加额外的 HTTP 请求头.
用途:
- OpenRouter: HTTP-Referer,X-Title
- 企业内网 API 网关的自定义鉴权头
func WithHTTPClient ¶
func WithHTTPClient(hc *http.Client) OpenAICompatOption
WithHTTPClient 注入自定义 HTTP 客户端(代理,超时等).
注意: WithHTTPClient 会替换整个 httpClient 包括 Transport.与 WithResponseHeaderTimeout 推荐二选一使用, 见 provider.New() 实现.
func WithResponseHeaderTimeout ¶
func WithResponseHeaderTimeout(d time.Duration) OpenAICompatOption
WithResponseHeaderTimeout 覆盖默认 http.Client 的 ResponseHeaderTimeout.
这是"请求发出到收到响应首字节"的时间上限, **不影响** SSE 流式 body 后续读取. LLM provider 的正确超时语义: 捕捉服务端死等, 放行长流式输出.
精妙之处(CLEVER): 绝对不要用 http.Client.Timeout 代替此函数 - 那会砍死流式调用. 详见 defaultResponseHeaderTimeout 常量注释.
安全兜底: 如果 httpClient.Transport 不是 *http.Transport (消费者自定义 RoundTripper 或先调用了 WithHTTPClient 替换整个 client), 此 option silent no-op 不 panic. provider.New() 约定二选一使用, no-op 只在 wire 包被其他路径滥用时触发.
type Reasoning ¶
type Reasoning struct {
MaxTokens int `json:"max_tokens,omitempty"` // Anthropic/Gemini/Qwen/MiniMax 模型
Effort string `json:"effort,omitempty"` // OpenAI o1/o3: "high"/"medium"/"low"
Enabled bool `json:"enabled,omitempty"` // 简单开关(使用默认配置)
}
Reasoning 是 OpenRouter / o1 系列 / MiniMax 的 thinking 参数(公开类型,供 provider 层使用).
type SchemaComplexityError ¶
type SchemaComplexityError struct {
// Dimension 是超限维度("nesting_depth" / "total_properties" / "enum_values").
Dimension string
// Actual 是实际测量值.
Actual int
// Limit 是允许的上限.
Limit int
}
SchemaComplexityError 是 schema 复杂度超限错误,包含超限维度和实际值.
精妙之处(CLEVER): 自定义错误类型而非 fmt.Errorf-- 调用方可以 type-assert 到 *SchemaComplexityError, 获取结构化的 Dimension/Actual/Limit 用于监控打点或自动裁剪. 替代方案:<直接 fmt.Errorf> - 否决:调用方只能解析错误字符串,脆弱.
func (*SchemaComplexityError) Error ¶
func (e *SchemaComplexityError) Error() string
type StreamRequest ¶
type StreamRequest struct {
Model string
Messages []flyto.Message
System string // 系统提示(转换为 system role 消息)
MaxTokens int
Tools []flyto.Tool
Reasoning *Reasoning // 可选 thinking 参数(OpenRouter / o1 系列 / MiniMax)
ResponseFormat *flyto.ResponseFormat // 结构化输出格式(nil = 文本)
// ThinkingBudget Gemini 专用 per-request thinking budget(其他 provider 忽略此字段).
// > 0 时在 generationConfig.thinkingConfig 中传递.
ThinkingBudget int
// EnableSystemCaching 为系统消息添加 cache_control: ephemeral 标记(数组格式).
// 用于 OpenRouter → Anthropic 路径的 prompt caching:
// OpenRouter 将 cache_control 透传给 Anthropic,命中时 usage.cached_tokens > 0.
// 注意:仅对支持 Anthropic 协议的后端有效(OpenAI 原生 API 会忽略此字段).
EnableSystemCaching bool
// Temperature / TopP: per-request sampling knobs. Nil = omit on the
// wire (upstream uses its default). OpenAICompatClient.buildRequest
// passes these straight to the openaiReq JSON; Gemini's buildRequest
// embeds them under generationConfig.
//
// Temperature / TopP: 本次请求的采样旋钮. nil = wire 不传 (上游默认).
// OpenAICompatClient.buildRequest 直接写入 openaiReq JSON; Gemini
// 的 buildRequest 嵌入到 generationConfig 下.
Temperature *float64
TopP *float64
}
StreamRequest 是发给 OpenAI 兼容端点的请求参数(公开给 provider 层).