// SSETransport:通过 HTTP SSE 与远程 MCP 服务器通信. // // 连接模型(MCP 2024-11-05 规范 SSE 传输): // // 客户端 服务端 // | GET /sse | // | Accept: text/event-stream | // | ─────────────────────────> | // | 200 OK (SSE 流开始) | // | <───────────────────────── | // | event: endpoint | // | data: /messages?sessionId=x| ← 服务端告知请求端点 // | <───────────────────────── | // | | // | POST /messages?sessionId=x | ← 客户端发 JSON-RPC 请求 // | ─────────────────────────> | // | event: message | // | data: {"jsonrpc":"2.0"...} | ← 服务端通过 SSE 返回响应 // | <───────────────────────── | // // 升华改进(ELEVATED): 早期实现 只支持 stdio,完全依赖本地服务器进程. // SSE 传输使 MCP 服务器可以: // - 部署为远程服务(SaaS MCP,容器化工具服务器) // - 穿越防火墙(仅需标准 HTTPS 出站) // - 多客户端共享(一个 MCP 服务器进程服务多个 Agent) // // CLI / SDK 消费层按需注入 AuthProvider(如 OAuth token),engine 不感知认证细节. package mcp import ( "bufio" "bytes" "context" "fmt" "io" "net/http" "strings" "sync" "time" ) // SSETransport 通过 HTTP SSE 实现 Transport 接口. type SSETransport struct { sseURL string auth AuthProvider // endpointURL 由服务端通过 SSE "endpoint" 事件下发 // 精妙之处(CLEVER): endpointReady channel 而非 sync.Cond-- // 关闭 channel 是 Go 惯用的一次性广播信号,可以被多个 goroutine // 同时等待(select <-endpointReady),且不会因 race 而漏信号. endpointURL string endpointReady chan struct{} endpointOnce sync.Once recvCh chan []byte // SSE 消息 → Recv() recvErr chan error // SSE 读取错误(容量 1,不阻塞写入方) done chan struct{} // Close() 时关闭 once sync.Once // sseCancel 取消 sseReadLoop 使用的 context. // 精妙之处(CLEVER): 双重退出信号--sseBody.Close() 解除 scanner.Scan() 阻塞, // sseCancel() 则令 ctx.Err() 检查生效,两路互为兜底. // 在 connect() 中创建,在 Close() 中调用. sseCancel context.CancelFunc // sseBody 是当前 SSE 长连接的响应 body. // 精妙之处(CLEVER): Close() 主动关闭 sseBody 而非只关闭 done channel-- // sseReadLoop goroutine 在 scanner.Scan() 处阻塞时不会检查 done channel, // 只有底层 io.ReadCloser 被关闭,Scan() 才会立即返回 error,goroutine 才能退出. // 若只关闭 done,goroutine 要等到服务端发来下一个字节才能感知关闭,造成连接泄漏. // 替代方案:--需重构 connect(),复杂度高. sseBodyMu sync.Mutex sseBody io.ReadCloser httpClient *http.Client } // NewSSETransport 连接到 SSE MCP 服务器,等待 endpoint URL 就绪后返回. // // sseURL 是服务端 SSE 端点,例如 "https://mcp.example.com/sse". // auth 是鉴权提供者;传 nil 则使用 NoopAuth(匿名访问). // // 建连超时:30 秒(等待服务端发送 endpoint 事件). func NewSSETransport(sseURL string, auth AuthProvider) (*SSETransport, error) { if sseURL == "" { return nil, fmt.Errorf("mcp: SSE transport: url is empty") } if auth == nil { auth = &NoopAuth{} } t := &SSETransport{ sseURL: sseURL, auth: auth, endpointReady: make(chan struct{}), recvCh: make(chan []byte, 32), recvErr: make(chan error, 1), done: make(chan struct{}), httpClient: &http.Client{ // 精妙之处(CLEVER): SSE 连接不设 Timeout-- // http.Client.Timeout 会在整个连接期间计时, // 而 SSE 是长连接(可能持续数小时). // 单次 POST 请求的超时由 Send() 传入的 ctx 控制. Timeout: 0, }, } if err := t.connect(); err != nil { return nil, err } return t, nil } // connect 建立 SSE 长连接,并等待服务端发送 endpoint URL. func (t *SSETransport) connect() error { // 建连超时 30 秒:只等待第一个 endpoint 事件,不影响后续长连接 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodGet, t.sseURL, nil) if err != nil { return fmt.Errorf("mcp: SSE connect: create request: %w", err) } req.Header.Set("Accept", "text/event-stream") req.Header.Set("Cache-Control", "no-cache") req.Header.Set("Connection", "keep-alive") resp, err := t.httpClient.Do(req) if err != nil { return fmt.Errorf("mcp: SSE connect to %q: %w", t.sseURL, err) } if resp.StatusCode != http.StatusOK { resp.Body.Close() return fmt.Errorf("mcp: SSE connect to %q: status %d", t.sseURL, resp.StatusCode) } // 记录 body 供 Close() 主动关闭,防止 sseReadLoop 长期阻塞导致连接泄漏 t.sseBodyMu.Lock() t.sseBody = resp.Body t.sseBodyMu.Unlock() // 后台读取 SSE 流(长连接). // 升华改进(ELEVATED): 早期方案以 context.Background() 调用,令 ctx.Err() 检查永为死代码. // 修复:创建独立可取消 ctx,Close() 中调用 sseCancel() 可令检查点立即响应关闭信号. // 原方案: - 否决:ctx.Err() 永远 nil,P1-D 检查点完全失效. sseCtx, sseCancel := context.WithCancel(context.Background()) t.sseCancel = sseCancel go t.sseReadLoop(sseCtx, resp.Body) // 等待服务端推送 endpoint 事件 select { case <-t.endpointReady: return nil case err := <-t.recvErr: return fmt.Errorf("mcp: SSE waiting for endpoint: %w", err) case <-ctx.Done(): t.Close() return fmt.Errorf("mcp: SSE connect: timeout waiting for endpoint URL from %q", t.sseURL) } } // Send 通过 HTTP POST 向 endpoint URL 发送 JSON-RPC 消息. // // 响应通过 SSE 流异步返回,不在此处等待. func (t *SSETransport) Send(ctx context.Context, msg []byte) error { // 确保 endpoint URL 已就绪(connect 成功后立即可用) select { case <-t.endpointReady: case <-t.done: return fmt.Errorf("mcp: SSE transport closed") } req, err := http.NewRequestWithContext(ctx, http.MethodPost, t.endpointURL, bytes.NewReader(msg)) if err != nil { return fmt.Errorf("mcp: SSE post: create request: %w", err) } req.Header.Set("Content-Type", "application/json") // 附加鉴权头(每次请求调用,支持 token 动态刷新) headers, err := t.auth.Headers(ctx) if err != nil { return fmt.Errorf("mcp: SSE auth headers: %w", err) } for k, v := range headers { req.Header.Set(k, v) } resp, err := t.httpClient.Do(req) if err != nil { return fmt.Errorf("mcp: SSE post to %q: %w", t.endpointURL, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) return fmt.Errorf("mcp: SSE post: status %d: %s", resp.StatusCode, string(body)) } return nil } // Recv 阻塞等待来自 SSE 流的一条消息. func (t *SSETransport) Recv(ctx context.Context) ([]byte, error) { select { case msg, ok := <-t.recvCh: if !ok { return nil, io.EOF } return msg, nil case err := <-t.recvErr: return nil, err case <-t.done: return nil, io.EOF case <-ctx.Done(): return nil, ctx.Err() } } // Close 关闭 SSE 连接. func (t *SSETransport) Close() error { t.once.Do(func() { close(t.done) // sseCancel 取消 sseReadLoop 的 ctx,令 ctx.Err() 检查点立即感知关闭. if t.sseCancel != nil { t.sseCancel() } // 主动关闭 SSE body,解除 sseReadLoop 中 scanner.Scan() 的阻塞. // 若不关闭,goroutine 需等到服务端发来数据才能感知 done,造成连接泄漏. t.sseBodyMu.Lock() if t.sseBody != nil { t.sseBody.Close() } t.sseBodyMu.Unlock() }) return nil } // sseReadLoop 解析 SSE 事件流(在后台 goroutine 中运行). // // SSE 格式(RFC 8895): // // event: <类型>\n // data: <内容>\n // \n // // 多行 data 字段:每行一个 "data:" 前缀,合并后换行连接. func (t *SSETransport) sseReadLoop(ctx context.Context, body io.ReadCloser) { defer body.Close() scanner := bufio.NewScanner(body) // SSE 消息通常较小(JSON-RPC 响应),64KB 已经足够; // 极大响应由 Client 层的 maxResponseSize 截断. scanner.Buffer(make([]byte, 64*1024), 1*1024*1024) var eventType string var dataLines []string for scanner.Scan() { // P1-D: 检查 ctx 是否已取消,防止 dead connection 永久阻塞 goroutine if ctx.Err() != nil { return } select { case <-t.done: return default: } line := scanner.Text() if line == "" { // 空行:分发当前累积的事件 if len(dataLines) > 0 { data := strings.Join(dataLines, "\n") t.dispatchEvent(eventType, data) } eventType = "" dataLines = nil continue } if after, ok := strings.CutPrefix(line, "event:"); ok { eventType = strings.TrimSpace(after) } else if after, ok := strings.CutPrefix(line, "data:"); ok { dataLines = append(dataLines, strings.TrimSpace(after)) } // id: retry: 字段暂不处理(不影响正确性) } if err := scanner.Err(); err != nil { select { case t.recvErr <- fmt.Errorf("mcp: SSE read error from %q: %w", t.sseURL, err): default: } } } // dispatchEvent 处理一个完整的 SSE 事件. func (t *SSETransport) dispatchEvent(eventType, data string) { switch eventType { case "endpoint": // 服务端告知请求端点 URL(通常只发送一次) url := strings.TrimSpace(data) if url != "" { t.endpointURL = resolveRelativeURL(t.sseURL, url) t.endpointOnce.Do(func() { close(t.endpointReady) }) } case "message", "": // 无 event 类型默认为 message(RFC 8895 §9.2.6) if data != "" { select { case t.recvCh <- []byte(data): case <-t.done: } } } } // resolveRelativeURL 将相对路径的 URL 解析为绝对 URL. // // 例如:sseURL="https://host/sse",relURL="/messages" → "https://host/messages" func resolveRelativeURL(sseURL, relURL string) string { if strings.HasPrefix(relURL, "http://") || strings.HasPrefix(relURL, "https://") { return relURL } // 提取 origin(scheme + host,不含路径) idx := strings.Index(sseURL, "//") if idx < 0 { return relURL } rest := sseURL[idx+2:] slash := strings.IndexByte(rest, '/') if slash < 0 { // sseURL 没有路径部分(如 "https://host") origin := sseURL if strings.HasPrefix(relURL, "/") { return origin + relURL } return origin + "/" + relURL } origin := sseURL[:idx+2+slash] if strings.HasPrefix(relURL, "/") { return origin + relURL } return origin + "/" + relURL }