// MCP 客户端:JSON-RPC 请求/响应关联 + 服务器协议握手. // // Client 层负责: // - JSON-RPC 请求 ID 管理(nextID + pending map) // - 分发后台循环(dispatchLoop:从 Transport.Recv 读消息,路由到等待 goroutine) // - MCP 协议方法封装(Initialize / ListTools / CallTool / ListResources 等) // - 通知回调注册(服务端主动推送,如 ToolListChanged) // // Client 不感知传输细节(进程管理,HTTP,SSE),只通过 Transport 接口交互. // 这使不同传输方式下的协议逻辑完全共享,无需为 stdio / SSE / HTTP 分别实现. // // 升华改进(ELEVATED): 早期实现将进程管理,JSON-RPC 帧,协议方法混在一起, // 无法支持远程 MCP 服务器.我们拆分为 Transport(传输) + Client(协议), // 任何实现了 Transport 接口的传输层都能直接复用所有 MCP 协议方法. package mcp import ( "context" "encoding/json" "fmt" "strconv" "sync" "sync/atomic" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // 协议版本 const protocolVersion = "2024-11-05" // 客户端常量 const ( // defaultRequestTimeout 默认请求超时时间 defaultRequestTimeout = 30 * time.Second // maxResponseSize 最大响应大小(1MB),超过此大小会截断并警告 maxResponseSize = 1 * 1024 * 1024 // maxPendingRequests 是 pending map 的硬性上限,防止恶意 MCP 服务器 // 通过不响应已发送请求撑爆客户端内存. // // 历史包袱(LEGACY): 与 resource_cache 同一类问题--早期实现无容量意识. // 理想做法:LRU 超时驱逐(pending 超过 defaultRequestTimeout 自动清理). // 改进条件:同一 session 内并发请求数接近 10000 时. maxPendingRequests = 10000 // maxElicitationMessageLen 是 elicitation/create 请求中 Message 字段的最大长度. // 防御深度: Transport 层已有 maxResponseSize(1MB)兜底, // 此限制是 elicitation 特定的 defense-in-depth, 避免超长消息传递到 UI 层. maxElicitationMessageLen = 64 * 1024 // 64KB ) // Client 是 MCP 协议客户端. // // 线程安全:所有公开方法可并发调用. type Client struct { // 配置(服务器名称,传输配置等) config config.MCPServerConfig // tp 是底层传输层,不感知协议细节 tp Transport // JSON-RPC 请求/响应关联 // 精妙之处(CLEVER): nextID 用 atomic.Int64 而非 mutex+int64-- // ID 生成是高频操作(每个请求一次),原子操作比 mutex 快一个数量级. // pending map 仍需 mu 保护(map 并发读写不安全),但 ID 生成本身无锁. nextID atomic.Int64 mu sync.Mutex pending map[string]chan *Response // notificationHandler 接收服务端推送通知(如 ToolListChanged) // 需在 Initialize 前注册,否则会漏掉 initialize 过程中的通知 notificationHandler func(method string, params json.RawMessage) // elicitationHandler 处理服务器主动发送的 server-to-client 请求(elicitation/create). // 升华改进(ELEVATED): 早期实现 无此字段,遇到服务器请求直接挂起. // 此字段为可选:nil 时 dispatchLoop 自动回复 cancel,确保服务器不会死等. // 替代方案:<全局函数注册> - 否决:多 Client 实例可以有不同处理器(多服务器场景). elicitationHandler ElicitationHandler // onClose 在连接意外断开时由 dispatchLoop 调用(Transport.Recv 返回错误). // // 升华改进(ELEVATED): 早期实现通过 `client.onclose = callback` 实现, // 逻辑写在 React Hook(useManageMCPConnections)里,与 UI 层强耦合. // SDK 嵌入/HTTP API 模式没有 React,重连逻辑完全丢失. // 我们把回调定义在 Client 层,Manager 注册并驱动重连,UI 层通过 Observer 事件感知. // 替代方案: - 否决:轮询有延迟且浪费 CPU;回调零延迟感知. onClose func() // 服务器能力(Initialize 后填充) serverInfo ServerInfo capabilities ServerCapabilities initialized bool // initializing 是 TOCTOU 竞态保护标志. // // 精妙之处(CLEVER): doInitialize() 中间需要释放 mu 进行网络调用, // 这会让并发的 Initialize() 看到 initialized==false 并发起重复初始化. // initializing 在持锁状态下设为 true,后续并发调用直接返回 nil, // 避免重复的 initialize 握手(服务器收到两次 initialize 会报错). // 替代方案:--Once 无法处理初始化失败后需要重试的场景. initializing bool // 请求超时(可通过 SetRequestTimeout 修改) requestTimeout time.Duration // 生命周期 done chan struct{} // Close() 时关闭 closed bool } // ElicitationHandler 是 MCP client 层的 Elicitation 处理接口. // 与 engine.ElicitationHandler 平行定义(mcp 包不导入 engine 包,避免循环依赖). // Manager 注入时将 engine.ElicitationHandler 包装为此接口. // // 精妙之处(CLEVER): 接口在 mcp 包内独立定义,而非直接使用 engine 包的接口-- // 这打破了 internal/mcp → pkg/engine 的导入链(internal 包不应依赖 pkg/engine). // Manager 层(被 engine 包使用)负责适配器注入,接口语义完全相同. type ElicitationHandler interface { HandleElicitation(serverName, message string, schema *ElicitationSchema) ElicitationCreateResult } // NewClient 使用指定传输层创建 MCP 客户端,并启动后台分发循环. // // 返回后 tp 已在后台运行,可立即调用 Initialize. func NewClient(cfg config.MCPServerConfig, tp Transport) *Client { c := &Client{ config: cfg, tp: tp, pending: make(map[string]chan *Response), requestTimeout: defaultRequestTimeout, done: make(chan struct{}), } go c.dispatchLoop() return c } // Connect 根据配置自动选择传输层,创建并返回 MCP 客户端. // // exec 是子进程启动抽象, 仅 stdio transport 使用 (方案 β 严格 DI). // SSE / HTTP transport 忽略此参数, 因为它们不起子进程. 强制统一传入 // 是为了让 caller (manager.go / 未来 platform 层) 对所有 transport // 类型用同一种 API. // // 支持的传输类型(cfg.Transport): // - "" / "stdio":子进程 stdin/stdout(本地 MCP 服务器) // - "sse":HTTP SSE(远程 MCP 服务器,2024-11-05 规范) // - "http":HTTP Streamable(远程 MCP 服务器,2025-03-26 规范) func Connect(exec execenv.Executor, cfg config.MCPServerConfig) (*Client, error) { var tp Transport var err error switch cfg.Transport { case "", "stdio": tp, err = NewStdioTransport(exec, cfg) case "sse": tp, err = NewSSETransport(cfg.URL, nil) // NoopAuth;消费层可在 Manager 层注入 AuthProvider case "http": tp, err = NewHTTPTransport(cfg.URL, nil) default: return nil, fmt.Errorf("mcp: unsupported transport %q for server %q (supported: stdio, sse, http)", cfg.Transport, cfg.Name) } if err != nil { return nil, fmt.Errorf("mcp: connect server %q: %w", cfg.Name, err) } return NewClient(cfg, tp), nil } // SetRequestTimeout 设置后续请求的超时时间. func (c *Client) SetRequestTimeout(timeout time.Duration) { c.mu.Lock() defer c.mu.Unlock() c.requestTimeout = timeout } // SetNotificationHandler 注册服务端通知回调. // // handler 在独立 goroutine 中调用,不会阻塞分发循环. // 必须在 Initialize 前注册,否则 initialize 阶段的通知会被忽略. func (c *Client) SetNotificationHandler(handler func(method string, params json.RawMessage)) { c.mu.Lock() defer c.mu.Unlock() c.notificationHandler = handler } // SetOnClose 注册连接断开回调(Transport.Recv 返回错误时触发). // // 必须在 dispatchLoop 启动前注册(即 NewClient 之前),否则可能漏触发. // 实际上 NewClient 在创建 Client 后立即启动 dispatchLoop goroutine, // 但 dispatchLoop 首先阻塞在 Recv,注册窗口足够大,线程安全(mu 保护). // // 回调在独立 goroutine 中调用,不阻塞 dispatchLoop 的清理流程. func (c *Client) SetOnClose(fn func()) { c.mu.Lock() defer c.mu.Unlock() c.onClose = fn } // SetElicitationHandler 注册 elicitation/create server-to-client 请求处理器. // // 升华改进(ELEVATED): MCP 2025-03-26 规范新增服务器向客户端主动请求用户输入. // 必须在 Initialize 前注册,否则 initialize 阶段的 elicitation 请求会被自动 cancel. // nil 处理器不会 panic--dispatchLoop 自动回复 cancel(NoopElicitation 语义). func (c *Client) SetElicitationHandler(handler ElicitationHandler) { c.mu.Lock() defer c.mu.Unlock() c.elicitationHandler = handler } // dispatchLoop 后台循环:从 Transport.Recv 读消息,按 ID 路由到等待的请求. // // 此 goroutine 在 NewClient 时启动,在 transport 关闭或 Close() 时退出. func (c *Client) dispatchLoop() { // 使用 Background context:Recv 的取消由 done channel 控制, // 不依赖 context 取消(避免 context cancel 后 dispatchLoop 提前退出, // 导致正在等待的请求永远得不到响应). ctx := context.Background() for { msg, err := c.tp.Recv(ctx) if err != nil { // Transport 关闭或网络错误:先 drain pending,再触发 onClose 回调. c.drainPending(err) // 精妙之处(CLEVER): onClose 在独立 goroutine 中调用-- // dispatchLoop 退出后才触发,确保 drain 完成(pending 请求已收到错误响应). // 若 onClose 内部启动重连(需要一定时间),不阻塞调用方. // 若 Close() 是主动调用的(而非断连),Manager 通过 context cancel 阻止重连, // onClose 本身被调用但立即返回--不影响正常关闭流程. c.mu.Lock() fn := c.onClose c.mu.Unlock() if fn != nil { go fn() } return } // 先判断是否为 batch 响应(JSON 数组) if len(msg) > 0 && msg[0] == '[' { var batch []json.RawMessage if err := json.Unmarshal(msg, &batch); err != nil { // 解析失败跳过 continue } for _, item := range batch { c.dispatchSingle(item) } continue } c.dispatchSingle(msg) } } // dispatchSingle 处理单条 JSON-RPC 消息(响应,通知或服务器请求). // // 由 dispatchLoop 调用,也供 batch 响应中的每条消息复用. func (c *Client) dispatchSingle(msg []byte) { var raw rawMessage if err := json.Unmarshal(msg, &raw); err != nil { // 历史包袱(LEGACY): 静默跳过解析失败的行-- // 某些 MCP 服务器实现会把调试日志输出到 stdout, // 宽容处理防止整个连接崩溃.未来改进:记录警告日志. return } // 提取字符串 key(去掉 JSON 字符串引号,数字 ID 原样保留) idKey := string(raw.ID) if len(idKey) >= 2 && idKey[0] == '"' && idKey[len(idKey)-1] == '"' { idKey = idKey[1 : len(idKey)-1] } if raw.ID != nil && raw.Method != "" { // 精妙之处(CLEVER): server-to-client 请求同时有 ID 和 Method-- // 这是 MCP 2025-03-26 新增的"服务器主动请求"模式,区别于: // - 响应(有 ID,无 Method):对我们之前发送请求的回答 // - 通知(无 ID,有 Method):服务器单向推送,无需回复 // 必须先判断此分支,否则有 ID 的服务器请求会被误当作响应处理, // pending map 里没有对应 ID 导致消息被静默丢弃. // 替代方案:<只处理响应和通知,忽略服务器请求> - 否决:服务器会无限等待响应死锁. id, _ := strconv.ParseInt(idKey, 10, 64) // 数字 ID 用于协议返回(尽量解析) go c.handleServerRequest(id, raw.Method, raw.Params) } else if raw.ID != nil { // JSON-RPC 响应(有 ID,无 Method):路由到等待该 ID 的 goroutine resp := &Response{ JSONRPC: raw.JSONRPC, ID: raw.ID, Result: raw.Result, Error: raw.Error, } c.mu.Lock() if ch, ok := c.pending[idKey]; ok { ch <- resp delete(c.pending, idKey) } c.mu.Unlock() } else if raw.Method != "" { // JSON-RPC 通知(有 Method,无 ID):调用通知回调 c.mu.Lock() handler := c.notificationHandler c.mu.Unlock() if handler != nil { // 在独立 goroutine 调用,不阻塞分发循环 go handler(raw.Method, raw.Params) } } } // handleServerRequest 处理服务器主动发送的 JSON-RPC 请求(有 ID + 有 Method). // // 目前支持的服务器请求方法: // - elicitation/create:服务器请求客户端收集用户输入(MCP 2025-03-26) // // 对于未知方法,回复 JSON-RPC error code -32601 (Method not found). // // 精妙之处(CLEVER): 在独立 goroutine 中调用(由 dispatchLoop 启动)-- // handler 可能阻塞等待用户输入,不能在 dispatchLoop 主循环中执行, // 否则整个 dispatchLoop 挂起,所有后续 MCP 消息都会堆积. func (c *Client) handleServerRequest(id int64, method string, params json.RawMessage) { switch method { case "elicitation/create": c.handleElicitationCreate(id, params) default: // 历史包袱(LEGACY): 未知方法一律返回 MethodNotFound,而不是忽略-- // 忽略会导致服务器死等,连接最终超时断开;显式拒绝让服务器能快速降级. c.sendServerResponse(id, nil, &RPCError{ Code: -32601, Message: fmt.Sprintf("Method not found: %s", method), }) } } // handleElicitationCreate 处理 elicitation/create 服务器请求. // // 流程: // 1. 解析 params 为 ElicitationCreateParams // 2. 调用 elicitationHandler.HandleElicitation(可能阻塞等待用户输入) // 3. 将结果序列化为 ElicitationCreateResult 发回服务器 // // 若 elicitationHandler 未设置,自动回复 cancel(不阻塞,不 panic). func (c *Client) handleElicitationCreate(id int64, params json.RawMessage) { var elicitParams ElicitationCreateParams if err := json.Unmarshal(params, &elicitParams); err != nil { c.sendServerResponse(id, nil, &RPCError{ Code: -32600, Message: fmt.Sprintf("elicitation/create: invalid params: %v", err), }) return } // Defense-in-depth: 限制 message 长度, 防止恶意 MCP 服务器发送超长 elicitation. // Transport 层有 maxResponseSize(1MB)兜底, 此处是 elicitation 特定的二次校验. if len(elicitParams.Message) > maxElicitationMessageLen { c.sendServerResponse(id, nil, &RPCError{ Code: -32600, Message: fmt.Sprintf("elicitation/create: message too long (%d bytes, max %d)", len(elicitParams.Message), maxElicitationMessageLen), }) return } c.mu.Lock() handler := c.elicitationHandler serverName := c.config.Name c.mu.Unlock() var result ElicitationCreateResult if handler != nil { result = handler.HandleElicitation(serverName, elicitParams.Message, elicitParams.RequestedSchema) } else { // 无 handler:自动 cancel(NoopElicitation 语义) result = ElicitationCreateResult{Action: "cancel"} } c.sendServerResponse(id, result, nil) } // sendServerResponse 向服务器发送对 server-to-client 请求的 JSON-RPC 响应. // // 这是客户端作为"服务端"角色时的回复--响应格式与普通 JSON-RPC Response 相同, // 但含义反转:我们是 responder,MCP Server 是 requester. func (c *Client) sendServerResponse(id int64, result any, rpcErr *RPCError) { type serverResp struct { JSONRPC string `json:"jsonrpc"` ID int64 `json:"id"` Result any `json:"result,omitempty"` Error *RPCError `json:"error,omitempty"` } resp := serverResp{ JSONRPC: "2.0", ID: id, Result: result, Error: rpcErr, } data, err := json.Marshal(resp) if err != nil { return // marshal 失败静默:服务器会超时,但不会 panic } ctx, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) defer cancel() _ = c.tp.Send(ctx, data) // 发送失败静默:传输层已断开时忽略 } // drainPending 向所有等待中的请求发送错误响应,通常在连接断开时调用. func (c *Client) drainPending(err error) { c.mu.Lock() defer c.mu.Unlock() for id, ch := range c.pending { ch <- &Response{ ID: json.RawMessage(id), // 重建 string ID Error: &RPCError{Code: -1, Message: err.Error()}, } delete(c.pending, id) } } // send 发送一条 JSON-RPC 请求并等待响应(context 超时控制). func (c *Client) send(ctx context.Context, method string, params any) (*Response, error) { c.mu.Lock() if c.closed { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q is closed", c.config.Name) } c.mu.Unlock() id := c.nextID.Add(1) idKey := strconv.FormatInt(id, 10) idJSON := json.RawMessage(idKey) // JSON 数字格式 req := Request{ JSONRPC: "2.0", ID: idJSON, Method: method, Params: params, } data, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("mcp: marshal %s request: %w", method, err) } ch := make(chan *Response, 1) c.mu.Lock() // 容量检查:超限时拒绝新请求,防止 OOM. // 精妙之处(CLEVER): 在加锁后检查 len(pending)-- // 若在锁外检查,并发写入可能同时通过检查导致超限. if len(c.pending) >= maxPendingRequests { c.mu.Unlock() return nil, fmt.Errorf("mcp: server %q: too many pending requests (%d), server may be unresponsive", c.config.Name, maxPendingRequests) } c.pending[idKey] = ch c.mu.Unlock() if err := c.tp.Send(ctx, data); err != nil { c.mu.Lock() delete(c.pending, idKey) c.mu.Unlock() return nil, fmt.Errorf("mcp: send %s to server %q: %w", method, c.config.Name, err) } select { case resp := <-ch: if resp.Error != nil { return nil, resp.Error } return resp, nil case <-ctx.Done(): c.mu.Lock() delete(c.pending, idKey) c.mu.Unlock() return nil, fmt.Errorf("mcp: %s request to server %q: %w", method, c.config.Name, ctx.Err()) } } // sendWithTimeout 使用默认超时发送请求(无 context 的公共 API 内部使用). func (c *Client) sendWithTimeout(method string, params any) (*Response, error) { c.mu.Lock() timeout := c.requestTimeout c.mu.Unlock() ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.send(ctx, method, params) } // notify 发送 JSON-RPC 通知(无需响应). func (c *Client) notify(ctx context.Context, method string, params any) error { notif := struct { JSONRPC string `json:"jsonrpc"` Method string `json:"method"` Params any `json:"params,omitempty"` }{ JSONRPC: "2.0", Method: method, Params: params, } data, err := json.Marshal(notif) if err != nil { return fmt.Errorf("mcp: marshal notification %s: %w", method, err) } return c.tp.Send(ctx, data) } // ── MCP 协议方法 ───────────────────────────────────────────────────────────── // Initialize 执行 MCP 协议握手. // // 发送 initialize 请求获取服务器能力,然后发送 initialized 通知. // 幂等:多次调用安全,已初始化时直接返回. func (c *Client) Initialize() error { c.mu.Lock() defer c.mu.Unlock() if c.initialized || c.initializing { return nil } c.initializing = true err := c.doInitialize() c.initializing = false return err } // doInitialize 内部初始化方法(调用者须持有 c.mu). func (c *Client) doInitialize() error { params := map[string]any{ "protocolVersion": protocolVersion, "capabilities": map[string]any{}, "clientInfo": map[string]any{ "name": "git.flytoex.net/yuanwei/flyto-agent", "version": "1.0.0", }, } // 释放锁,避免 send 内部操作与锁死锁 c.mu.Unlock() resp, err := c.sendWithTimeout("initialize", params) c.mu.Lock() if err != nil { return fmt.Errorf("mcp: initialize server %q: %w", c.config.Name, err) } var result InitializeResult if err := json.Unmarshal(resp.Result, &result); err != nil { return fmt.Errorf("mcp: parse initialize result from server %q: %w", c.config.Name, err) } c.serverInfo = result.ServerInfo c.capabilities = result.Capabilities c.initialized = true // 发送 initialized 通知 c.mu.Unlock() ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout) defer cancel() notifyErr := c.notify(ctx, "notifications/initialized", nil) c.mu.Lock() if notifyErr != nil { return fmt.Errorf("mcp: send initialized notification to server %q: %w", c.config.Name, notifyErr) } return nil } // ListTools 列出服务器提供的所有工具. func (c *Client) ListTools() ([]MCPTool, error) { c.mu.Lock() if !c.initialized { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q not initialized", c.config.Name) } c.mu.Unlock() resp, err := c.sendWithTimeout("tools/list", nil) if err != nil { return nil, err } var result ListToolsResult if err := json.Unmarshal(resp.Result, &result); err != nil { return nil, fmt.Errorf("mcp: parse tools list from server %q: %w", c.config.Name, err) } return result.Tools, nil } // CallTool 调用指定的工具. func (c *Client) CallTool(name string, args map[string]any) (*ToolCallResult, error) { c.mu.Lock() if !c.initialized { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q not initialized", c.config.Name) } c.mu.Unlock() params := map[string]any{ "name": name, } if args != nil { params["arguments"] = args } resp, err := c.sendWithTimeout("tools/call", params) if err != nil { return nil, fmt.Errorf("mcp: call tool %q on server %q: %w", name, c.config.Name, err) } // 响应过大时:先解析再截断文本内容,保证 JSON 合法 if len(resp.Result) > maxResponseSize { var result ToolCallResult if err := json.Unmarshal(resp.Result, &result); err != nil { return &ToolCallResult{ Content: []ContentItem{{ Type: "text", Text: fmt.Sprintf("[Warning: response too large (%d bytes) from server %q tool %q]", len(resp.Result), c.config.Name, name), }}, }, nil } for i := range result.Content { if result.Content[i].Type == "text" && len(result.Content[i].Text) > maxResponseSize { result.Content[i].Text = result.Content[i].Text[:maxResponseSize] + fmt.Sprintf("\n... [truncated from server %q tool %q]", c.config.Name, name) } } return &result, nil } var result ToolCallResult if err := json.Unmarshal(resp.Result, &result); err != nil { return nil, fmt.Errorf("mcp: parse tool result from server %q tool %q: %w", c.config.Name, name, err) } return &result, nil } // ListResources 列出服务器提供的所有资源. func (c *Client) ListResources() ([]MCPResource, error) { c.mu.Lock() if !c.initialized { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q not initialized", c.config.Name) } c.mu.Unlock() resp, err := c.sendWithTimeout("resources/list", nil) if err != nil { return nil, err } var result ListResourcesResult if err := json.Unmarshal(resp.Result, &result); err != nil { return nil, fmt.Errorf("mcp: parse resources list from server %q: %w", c.config.Name, err) } return result.Resources, nil } // ReadResource 读取指定资源的内容. func (c *Client) ReadResource(uri string) (*ReadResourceResult, error) { c.mu.Lock() if !c.initialized { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q not initialized", c.config.Name) } c.mu.Unlock() resp, err := c.sendWithTimeout("resources/read", map[string]any{"uri": uri}) if err != nil { return nil, fmt.Errorf("mcp: read resource %q from server %q: %w", uri, c.config.Name, err) } var result ReadResourceResult if err := json.Unmarshal(resp.Result, &result); err != nil { return nil, fmt.Errorf("mcp: parse resource content from server %q: %w", c.config.Name, err) } return &result, nil } // ListPrompts 列出服务器提供的所有提示模板. func (c *Client) ListPrompts() ([]MCPPrompt, error) { c.mu.Lock() if !c.initialized { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q not initialized", c.config.Name) } c.mu.Unlock() resp, err := c.sendWithTimeout("prompts/list", nil) if err != nil { return nil, err } var result ListPromptsResult if err := json.Unmarshal(resp.Result, &result); err != nil { return nil, fmt.Errorf("mcp: parse prompts list from server %q: %w", c.config.Name, err) } return result.Prompts, nil } // GetPrompt 获取指定提示模板的内容. func (c *Client) GetPrompt(name string, args map[string]string) (*GetPromptResult, error) { c.mu.Lock() if !c.initialized { c.mu.Unlock() return nil, fmt.Errorf("mcp: client for server %q not initialized", c.config.Name) } c.mu.Unlock() params := map[string]any{"name": name} if args != nil { params["arguments"] = args } resp, err := c.sendWithTimeout("prompts/get", params) if err != nil { return nil, fmt.Errorf("mcp: get prompt %q from server %q: %w", name, c.config.Name, err) } var result GetPromptResult if err := json.Unmarshal(resp.Result, &result); err != nil { return nil, fmt.Errorf("mcp: parse prompt result from server %q prompt %q: %w", c.config.Name, name, err) } return &result, nil } // ── 状态查询方法 ────────────────────────────────────────────────────────────── // ServerName 返回服务器配置名称. func (c *Client) ServerName() string { return c.config.Name } // GetServerInfo 返回服务器信息(Initialize 后可用). func (c *Client) GetServerInfo() ServerInfo { c.mu.Lock() defer c.mu.Unlock() return c.serverInfo } // Capabilities 返回服务器能力(Initialize 后可用). func (c *Client) Capabilities() ServerCapabilities { c.mu.Lock() defer c.mu.Unlock() return c.capabilities } // IsInitialized 返回客户端是否已完成初始化握手. func (c *Client) IsInitialized() bool { c.mu.Lock() defer c.mu.Unlock() return c.initialized } // IsAlive 返回底层连接是否仍然活跃. // // 对于 stdio 传输:检查服务器进程是否存活. // 对于 SSE / HTTP 传输:假设活跃(网络状态通过错误返回检测). func (c *Client) IsAlive() bool { if stp, ok := c.tp.(*StdioTransport); ok { return stp.IsAlive() } return !c.isClosed() } // ProcessError 返回服务器进程退出错误(仅 stdio 传输有效,其他传输返回 nil). func (c *Client) ProcessError() error { if stp, ok := c.tp.(*StdioTransport); ok { return stp.ProcessError() } return nil } func (c *Client) isClosed() bool { c.mu.Lock() defer c.mu.Unlock() return c.closed } // Close 关闭客户端并释放所有资源. func (c *Client) Close() error { c.mu.Lock() if c.closed { c.mu.Unlock() return nil } c.closed = true c.mu.Unlock() close(c.done) return c.tp.Close() }