// Package wire provides internal shared protocol adapters. // // gemini.go - Google Gemini (Generative Language API) SSE client. // // Gemini 的流式协议与 Anthropic / OpenAI 均不同,是独立的第三种格式: // // 1. 每个 SSE chunk 是完整的 GenerateContentResponse(非 delta 结构), // 文本字段是增量(delta),但 functionCall / usageMetadata 是完整数据. // // 2. 思考内容(Thinking)通过 part.thought = true 标记, // 而非独立的 "thinking" 类型事件,与正文文本共用同一 parts 数组. // // 3. 工具调用(functionCall)在一个 chunk 内完整到达, // 无需像 OpenAI 那样在 finish_reason 时批量"拼接". // // 4. 用量统计(usageMetadata)在最后一个 chunk 的顶层字段, // 不在 candidates 内部. // // 支持的认证方式: // - Google AI Studio:API Key 通过查询参数传递(?key=...) // - Vertex AI:Bearer Token 通过 Authorization header 传递 // // 升华改进(ELEVATED): 早期实现 没有 Gemini provider-- // 我们是首次实现,参考 Google 官方文档设计. // 与 OpenAI 兼容层的对比:函数调用完整到达(无 arguments 拼接逻辑), // 代码路径更简洁;但消息格式转换更复杂(functionResponse 需要函数名, // 而 flyto.BlockToolResult 只存 ToolUseID). package wire import ( "bufio" "bytes" "context" "crypto/rand" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "strings" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // GeminiClient 是 Gemini (Google AI / Vertex AI) SSE 客户端. type GeminiClient struct { apiKey string // Google AI Studio API key(空串表示使用 Vertex AI) bearerToken string // Vertex AI Bearer token(apiKey 为空时使用) baseURL string // 默认 https://generativelanguage.googleapis.com httpClient *http.Client // thinkingBudget 启用扩展思考(Gemini 2.0+ 支持),0 = 禁用. // 通过 generationConfig.thinkingConfig 传递. thinkingBudget int } // GeminiOption 是 GeminiClient 的配置选项. type GeminiOption func(*GeminiClient) // GeminiWithHTTPClient 注入自定义 HTTP 客户端(代理,超时等). // // 注意: GeminiWithHTTPClient 会替换整个 httpClient 包括 Transport.与 // GeminiWithResponseHeaderTimeout 推荐二选一使用, 见 provider.New() 实现. func GeminiWithHTTPClient(hc *http.Client) GeminiOption { return func(c *GeminiClient) { c.httpClient = hc } } // GeminiWithResponseHeaderTimeout 覆盖默认 http.Client 的 ResponseHeaderTimeout. // // 约束"请求发出到收到响应首字节"的时间, **不影响** SSE 流式 body 后续读取. // 精妙之处(CLEVER): 绝对不要用 http.Client.Timeout 代替此函数 - 那会砍死长流式调用. // 与 openai.go 的 WithResponseHeaderTimeout 对称; wire 包 defaultResponseHeaderTimeout // 常量为两个 client 共用. // // 安全兜底: 如果 httpClient.Transport 不是 *http.Transport, 此 option silent no-op 不 panic. func GeminiWithResponseHeaderTimeout(d time.Duration) GeminiOption { return func(c *GeminiClient) { if t, ok := c.httpClient.Transport.(*http.Transport); ok && t != nil { t.ResponseHeaderTimeout = d } } } // HTTPClient 返回底层 *http.Client, 用于测试,introspection 或直接发请求的场景. // 与 OpenAICompatClient.HTTPClient() 对称. func (c *GeminiClient) HTTPClient() *http.Client { return c.httpClient } // GeminiWithThinkingBudget 启用扩展思考并设置预算 token 数. // // 建议值 1024–16384,0 表示禁用. // 思考内容通过 part.thought = true 在 SSE 流中传递, // 我们将其映射为 flyto.ThinkingDeltaEvent / flyto.ThinkingEvent. func GeminiWithThinkingBudget(budget int) GeminiOption { return func(c *GeminiClient) { c.thinkingBudget = budget } } // GeminiWithBearerToken 配置 Vertex AI Bearer 认证. // // Vertex AI 不使用 API Key,而是使用 OAuth2 Bearer token. // 设置此项后,apiKey 查询参数将不再附加到请求 URL. func GeminiWithBearerToken(token string) GeminiOption { return func(c *GeminiClient) { c.bearerToken = token } } // NewGeminiClient 创建 Gemini SSE 客户端. // // baseURL 示例: // - Google AI: "https://generativelanguage.googleapis.com"(默认) // - Vertex AI: "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google" func NewGeminiClient(apiKey, baseURL string, opts ...GeminiOption) *GeminiClient { if baseURL == "" { baseURL = "https://generativelanguage.googleapis.com" } c := &GeminiClient{ apiKey: apiKey, baseURL: strings.TrimRight(baseURL, "/"), httpClient: newDefaultHTTPClient(), } for _, opt := range opts { opt(c) } return c } // --- 请求 JSON 结构(Gemini GenerateContentRequest)--- type geminiReq struct { Contents []geminiContent `json:"contents"` SystemInstruction *geminiSystemInst `json:"systemInstruction,omitempty"` Tools []geminiTool `json:"tools,omitempty"` GenerationConfig geminiGenConfig `json:"generationConfig"` } type geminiSystemInst struct { Parts []geminiPart `json:"parts"` } type geminiContent struct { Role string `json:"role"` Parts []geminiPart `json:"parts"` } // geminiPart 是 Gemini 消息的内容单元,三种用途共用一个结构体: // - 文本:Text 非空,Thought 标记是否为思考内容 // - 函数调用(模型输出):FunctionCall 非 nil // - 函数结果(用户输入):FunctionResp 非 nil type geminiPart struct { Text string `json:"text,omitempty"` Thought bool `json:"thought,omitempty"` FunctionCall *geminiFunctionCall `json:"functionCall,omitempty"` FunctionResp *geminiFunctionResp `json:"functionResponse,omitempty"` } type geminiFunctionCall struct { Name string `json:"name"` Args map[string]any `json:"args"` } type geminiFunctionResp struct { Name string `json:"name"` Response map[string]any `json:"response"` } type geminiTool struct { FunctionDeclarations []geminiFuncDecl `json:"functionDeclarations"` } type geminiFuncDecl struct { Name string `json:"name"` Description string `json:"description"` Parameters json.RawMessage `json:"parameters"` // JSON Schema(与 OpenAI 格式相同) } type geminiGenConfig struct { MaxOutputTokens int `json:"maxOutputTokens,omitempty"` ThinkingConfig *geminiThinkingCfg `json:"thinkingConfig,omitempty"` ResponseMimeType string `json:"responseMimeType,omitempty"` // 结构化输出:json_object/json_schema ResponseSchema json.RawMessage `json:"responseSchema,omitempty"` // json_schema 模式下的 schema 定义 // Temperature / TopP: nil = omit; Gemini accepts [0, 2] for both. // Gemini 3 reasoning docs recommend not setting temperature below 1.0 // to avoid loop/degradation, but we passthrough -- caller decides. // // Temperature / TopP: nil = 不传; Gemini 接受 [0, 2]. Gemini 3 reasoning // 文档建议 thinking 模式不设 < 1.0 防 loop/退化, 但我们直接 passthrough, // 由调用方决定. Temperature *float64 `json:"temperature,omitempty"` TopP *float64 `json:"topP,omitempty"` } // geminiThinkingCfg 启用 Gemini 2.0+ 的扩展思考(Extended Thinking). // // IncludeThoughts=true 让模型在 SSE 流中输出 thought=true 的 parts, // 否则思考过程会被内部消耗而不传递给客户端. type geminiThinkingCfg struct { ThinkingBudget int `json:"thinkingBudget"` IncludeThoughts bool `json:"includeThoughts"` } // --- 响应 JSON 结构(Gemini GenerateContentResponse)--- // geminiRespChunk 是单个 SSE 数据块(完整的 GenerateContentResponse). // // 精妙之处(CLEVER): Gemini 每个 chunk 都是完整对象,不是 delta struct-- // 这意味着我们需要用缓冲器累积文本增量(多个 chunk 的 text 拼接), // 但 functionCall 无需拼接(单个 chunk 内完整到达). // 此设计的好处:解析更简单(无需跨 chunk 状态),代价是每 chunk JSON 稍大. type geminiRespChunk struct { Candidates []struct { Content struct { Role string `json:"role"` Parts []geminiRespPart `json:"parts"` } `json:"content"` // FinishReason 非空时表示该 candidate 已完成. // 常见值:"STOP"(正常),"MAX_TOKENS"(截断), // "SAFETY"(安全过滤),"RECITATION"(版权检测). FinishReason string `json:"finishReason"` } `json:"candidates"` UsageMetadata *struct { PromptTokenCount int `json:"promptTokenCount"` CandidatesTokenCount int `json:"candidatesTokenCount"` CachedContentTokenCount int `json:"cachedContentTokenCount"` // ThoughtsTokenCount 是思考内容消耗的 token 数(已包含在 CandidatesTokenCount 中). ThoughtsTokenCount int `json:"thoughtsTokenCount"` TotalTokenCount int `json:"totalTokenCount"` } `json:"usageMetadata"` // 顶层错误(非 2xx 时 body 也可能是此格式) Error *struct { Code int `json:"code"` Message string `json:"message"` Status string `json:"status"` } `json:"error"` } // geminiRespPart 是响应中的内容单元(对应请求中的 geminiPart,但字段略有不同). type geminiRespPart struct { Text string `json:"text,omitempty"` Thought bool `json:"thought,omitempty"` FunctionCall *geminiFunctionCall `json:"functionCall,omitempty"` } // StreamRequest 的 Gemini Stream 实现. func (c *GeminiClient) Stream(ctx context.Context, req *StreamRequest) (<-chan flyto.Event, error) { // 精妙之处(CLEVER): Google AI Studio 通过查询参数传 API key(?key=...), // 而非 Authorization header--这是 Google REST API 的历史习惯. // Vertex AI 使用 Bearer token(GCP 标准认证),两种路径通过 bearerToken 字段区分. endpoint := fmt.Sprintf("%s/v1beta/models/%s:streamGenerateContent?alt=sse", c.baseURL, req.Model) if c.apiKey != "" { endpoint += "&key=" + c.apiKey } body, err := c.buildRequest(req) if err != nil { return nil, fmt.Errorf("gemini: build request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("gemini: create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") if c.bearerToken != "" { httpReq.Header.Set("Authorization", "Bearer "+c.bearerToken) } resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("gemini: http request: %w", err) } if resp.StatusCode != http.StatusOK { // 升华改进(ELEVATED): 早期方案只记录状态码,排查问题时信息不足-- // 现在读取 error body(上限 1MB)附加到消息中,Gemini 错误 JSON 可见. errBody, _ := io.ReadAll(io.LimitReader(resp.Body, MaxErrorBodyBytes)) resp.Body.Close() if len(errBody) > 0 { return nil, fmt.Errorf("gemini: http %d: %s", resp.StatusCode, strings.TrimSpace(string(errBody))) } return nil, fmt.Errorf("gemini: http %d", resp.StatusCode) } ch := make(chan flyto.Event, 32) go func() { defer close(ch) defer resp.Body.Close() c.consumeSSE(ctx, resp, ch) }() return ch, nil } // buildRequest 将 StreamRequest 序列化为 Gemini GenerateContentRequest JSON. func (c *GeminiClient) buildRequest(req *StreamRequest) ([]byte, error) { contents := flytoMessagesToGemini(req.Messages) var sysInst *geminiSystemInst if req.System != "" { sysInst = &geminiSystemInst{Parts: []geminiPart{{Text: req.System}}} } var tools []geminiTool if len(req.Tools) > 0 { var decls []geminiFuncDecl for _, t := range req.Tools { // CAP-5: $ref 展开--Gemini 不处理 JSON Schema $ref 引用 schema := t.InputSchema if expanded, err := DereferenceSchema(schema); err == nil { schema = expanded } // err != nil: silently skip,保留原始 schema decls = append(decls, geminiFuncDecl{ Name: t.Name, Description: t.Description, Parameters: schema, }) } tools = []geminiTool{{FunctionDeclarations: decls}} } genCfg := geminiGenConfig{ MaxOutputTokens: req.MaxTokens, Temperature: req.Temperature, TopP: req.TopP, } budget := req.ThinkingBudget if budget == 0 { budget = c.thinkingBudget } if budget > 0 { genCfg.ThinkingConfig = &geminiThinkingCfg{ ThinkingBudget: budget, IncludeThoughts: true, } } // ResponseFormat 处理:Gemini 通过 responseMimeType + responseSchema 支持结构化输出 if req.ResponseFormat != nil { switch req.ResponseFormat.Type { case "json_object": genCfg.ResponseMimeType = "application/json" case "json_schema": genCfg.ResponseMimeType = "application/json" if len(req.ResponseFormat.JSONSchema) > 0 { genCfg.ResponseSchema = req.ResponseFormat.JSONSchema } } } r := geminiReq{ Contents: contents, SystemInstruction: sysInst, Tools: tools, GenerationConfig: genCfg, } return json.Marshal(r) } // consumeSSE 从 HTTP 响应中读取 Gemini SSE 流并转换为 flyto.Event. func (c *GeminiClient) consumeSSE(ctx context.Context, resp *http.Response, ch chan<- flyto.Event) { // 历史包袱(LEGACY): 1MB scanner buffer--原因同 openai.go, // 函数调用 args 可能很大;Gemini 的 functionCall.args 在单个 chunk 内完整到达, // 比 OpenAI 的增量更容易触发 buffer 上限. // // 升华改进(ELEVATED): io.LimitReader 包裹 resp.Body(同 openai.go)-- // 100MB 上限防止恶意服务端无限推送导致 OOM,超限后 EOF 触发正常退出. scanner := bufio.NewScanner(io.LimitReader(resp.Body, MaxStreamingBodyBytes)) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) var textBuf strings.Builder var reasoningBuf strings.Builder var finishReason string for scanner.Scan() { // 升华改进(ELEVATED): 与 openai.go 保持一致--每轮检查 ctx, // 确保 ctx 取消后 goroutine 在一个 chunk 内退出(同 openai consumeSSE). if ctx.Err() != nil { return } line := scanner.Text() if strings.HasPrefix(line, ":") || line == "" { continue } if !strings.HasPrefix(line, "data: ") { continue } data := line[6:] var chunk geminiRespChunk if err := json.Unmarshal([]byte(data), &chunk); err != nil { continue } // 顶层错误(通常是认证失败,配额超限等) if chunk.Error != nil { ch <- &flyto.ErrorEvent{ Err: fmt.Errorf("gemini: api error %d (%s): %s", chunk.Error.Code, chunk.Error.Status, chunk.Error.Message), Code: "provider_error", Retryable: chunk.Error.Code == 429, } return } if len(chunk.Candidates) == 0 { continue } candidate := chunk.Candidates[0] // 处理当前 chunk 的 parts for _, part := range candidate.Content.Parts { switch { case part.FunctionCall != nil: // 精妙之处(CLEVER): Gemini 函数调用在单个 chunk 内完整到达, // 不需要跨 chunk 拼接 arguments(与 OpenAI 格式的核心区别). // 因此可以在遇到 functionCall 时立即发出 ToolUseEvent, // 无需等待 finishReason. // // Gemini 不分配工具调用 ID,我们用 crypto/rand 生成唯一 ID, // 以确保多轮对话中工具结果能正确关联. args := part.FunctionCall.Args if args == nil { args = make(map[string]any) } ch <- &flyto.ToolUseEvent{ ID: geminiGenerateID(), ToolName: part.FunctionCall.Name, Input: args, } case part.Thought: // 思考增量:thought=true 的 text parts 是扩展思考内容 if part.Text != "" { reasoningBuf.WriteString(part.Text) ch <- &flyto.ThinkingDeltaEvent{Text: part.Text} } case part.Text != "": // 普通文本增量 textBuf.WriteString(part.Text) ch <- &flyto.TextDeltaEvent{Text: part.Text} } } // FinishReason 非空且有意义时,发出汇总事件 if candidate.FinishReason != "" && candidate.FinishReason != "FINISH_REASON_UNSPECIFIED" { // 精妙之处(CLEVER): Gemini finish reason 是大写字符串("STOP"/"MAX_TOKENS"), // 我们统一转为小写再传递,使下游处理器与 OpenAI("stop")行为一致. finishReason = strings.ToLower(candidate.FinishReason) if textBuf.Len() > 0 { ch <- &flyto.TextEvent{Text: textBuf.String()} } if reasoningBuf.Len() > 0 { ch <- &flyto.ThinkingEvent{Text: reasoningBuf.String()} } // usage 在最后一个 chunk 的 usageMetadata,可能与 finishReason 同 chunk if chunk.UsageMetadata != nil { ch <- &flyto.UsageEvent{ StopReason: finishReason, InputTokens: chunk.UsageMetadata.PromptTokenCount, OutputTokens: chunk.UsageMetadata.CandidatesTokenCount, CacheReadTokens: chunk.UsageMetadata.CachedContentTokenCount, // CacheCreationTokens: Gemini 暂无对应字段 } } } } if err := scanner.Err(); err != nil && ctx.Err() == nil { ch <- &flyto.ErrorEvent{ Err: fmt.Errorf("gemini: sse scan error: %w", err), Code: "stream_error", Retryable: true, } } } // flytoMessagesToGemini 将 flyto.Message 列表转换为 Gemini contents 格式. // // 关键差异(与 OpenAI 格式对比): // 1. assistant → "model" role(Gemini 用 "model") // 2. 工具结果用 functionResponse part(而非 "tool" role 独立消息) // 3. functionResponse 需要函数名,但 flyto.BlockToolResult 只存 ToolUseID-- // 通过提前扫描建立 ToolUseID → ToolName 映射解决 func flytoMessagesToGemini(msgs []flyto.Message) []geminiContent { // 精妙之处(CLEVER): 提前一趟扫描建立 ID → Name 映射,O(n) 时间, // 避免在转换工具结果时对每个 ID 做 O(n) 的线性搜索. toolNameByID := make(map[string]string) for _, msg := range msgs { for _, b := range msg.Blocks { if b.Type == flyto.BlockToolUse { toolNameByID[b.ToolUseID] = b.ToolName } } } var result []geminiContent for _, msg := range msgs { var parts []geminiPart switch msg.Role { case flyto.RoleUser: for _, b := range msg.Blocks { switch b.Type { case flyto.BlockToolResult: // functionResponse 需要函数名:从映射中查找 name := toolNameByID[b.ToolUseID] if name == "" { // 兜底:用 ToolUseID 作为名称(理论上不应出现) name = b.ToolUseID } response := map[string]any{"result": b.ResultText} if b.IsError { response["error"] = b.ResultText response["result"] = "" } parts = append(parts, geminiPart{ FunctionResp: &geminiFunctionResp{ Name: name, Response: response, }, }) case flyto.BlockText: if b.Text != "" { parts = append(parts, geminiPart{Text: b.Text}) } } } case flyto.RoleAssistant: for _, b := range msg.Blocks { switch b.Type { case flyto.BlockText: if b.Text != "" { parts = append(parts, geminiPart{Text: b.Text}) } case flyto.BlockToolUse: input := b.ToolInput if input == nil { input = make(map[string]any) } parts = append(parts, geminiPart{ FunctionCall: &geminiFunctionCall{ Name: b.ToolName, Args: input, }, }) } } } if len(parts) > 0 { role := "user" if msg.Role == flyto.RoleAssistant { role = "model" } result = append(result, geminiContent{Role: role, Parts: parts}) } } return result } // geminiGenerateID 生成唯一的工具调用 ID. // // Gemini API 不为函数调用分配 ID,但 flyto.ToolUseEvent 和多轮对话管理 // 依赖 ID 来关联工具请求与工具结果. // 我们用 crypto/rand 生成 12 字节(96 位)的随机 ID,碰撞概率可忽略. func geminiGenerateID() string { b := make([]byte, 12) _, _ = rand.Read(b) return "gtool_" + hex.EncodeToString(b) }