Documentation
¶
Overview ¶
MCP 工具到 Engine Tool 接口的桥接.
MCPToolBridge 将 MCP 服务器提供的工具包装为 Engine 的 Tool 接口, 使 MCP 工具可以像内置工具一样被 Engine 调度和执行.
工具名格式(升华改进(ELEVATED)): 早期实现 无冲突时用短名 "search",有冲突才用 "server/search",行为不一致. 我们始终用 mcp__serverName__toolName 格式(由 Manager.AllTools 统一生成), Bridge 层不再做碰撞检测,职责更纯粹:类型转换 + 调用路由.
转换流程:
- MCPTool.InputSchema → Tool.InputSchema()(支持 oneOf / anyOf / default)
- Tool.Execute() → Manager.CallTool() → ToolCallResult
- ToolCallResult.Content(text/image/resource)→ tools.Result.Output
MCP 客户端:JSON-RPC 请求/响应关联 + 服务器协议握手.
Client 层负责:
- JSON-RPC 请求 ID 管理(nextID + pending map)
- 分发后台循环(dispatchLoop:从 Transport.Recv 读消息,路由到等待 goroutine)
- MCP 协议方法封装(Initialize / ListTools / CallTool / ListResources 等)
- 通知回调注册(服务端主动推送,如 ToolListChanged)
Client 不感知传输细节(进程管理,HTTP,SSE),只通过 Transport 接口交互. 这使不同传输方式下的协议逻辑完全共享,无需为 stdio / SSE / HTTP 分别实现.
升华改进(ELEVATED): 早期实现将进程管理,JSON-RPC 帧,协议方法混在一起, 无法支持远程 MCP 服务器.我们拆分为 Transport(传输) + Client(协议), 任何实现了 Transport 接口的传输层都能直接复用所有 MCP 协议方法.
HTTPTransport:通过 HTTP Streamable 与 MCP 服务器通信.
对应 MCP 2025-03-26 规范的 Streamable HTTP 传输模式:
客户端 服务端
| POST /mcp |
| Content-Type: application/json
| Accept: application/json, text/event-stream
| ─────────────────────────> |
| |
| 响应 A(单条): |
| 200 OK |
| Content-Type: application/json
| {"jsonrpc":"2.0",...} |
| <───────────────────────── |
| |
| 响应 B(流式): |
| 200 OK |
| Content-Type: text/event-stream
| data: {...} | ← 多条消息
| data: {...} |
| <───────────────────────── |
| |
| 202 Accepted(通知 fire-and-forget)
| <───────────────────────── |
Session 管理:
- 服务端可在响应头中返回 Mcp-Session-Id
- 后续请求需在请求头携带该 Session-Id
升华改进(ELEVATED): 相较于 SSE 传输(两条持久连接),HTTP Streamable 更简洁-- 每次请求/响应是独立 HTTP 事务,无需维持长连接,对 CDN / 负载均衡更友好. SSE 传输兼容旧版 MCP 服务器;HTTP Streamable 面向 2025-03-26 及以后的新服务器. 替代方案:<复用 SSE 传输+扩展 POST 端点> - 否决,因为两个规范的会话管理模型不同.
JSON-RPC 2.0 协议类型定义.
本文件只保留数据类型;传输逻辑(帧格式,连接管理)由各 Transport 实现负责, 请求/响应关联(pending map,nextID)由 Client 负责.
早期实现 将 JSON-RPC 类型和传输逻辑混在同一个类里. 我们将类型单独抽出,使 StdioTransport / SSETransport / HTTPTransport 都能复用相同的类型定义而不引入循环依赖.
多 MCP 服务器管理器.
Manager 职责:
- 并发连接多个 MCP 服务器(stdio ≤2,remote ≤5)
- 工具名统一采用 mcp__serverName__toolName 格式(无条件前缀)
- 缓存工具列表,当服务端推送 ToolListChanged 通知时标记脏位,懒惰刷新
- 统一关闭所有连接
工具名格式(升华改进(ELEVATED)): 早期实现 在无冲突时使用短名称(如 "search"),有冲突时才加前缀("server/search"). 问题:同一工具名在不同配置下行为不一致,调用方无法通过名称可靠地定位服务器. 我们始终使用 mcp__serverName__toolName 格式,优势:
- 工具名 ↔ 服务器的映射关系一目了然,无需额外查找表
- 避免两台服务器的工具名碰撞导致注册失败(早期方案行为不确定)
- 符合 MCP 规范建议的命名空间隔离
替代方案:<沿用早期方案短名/冲突才加前缀> - 否决,行为不确定且难调试
并发限制(升华改进(ELEVATED)): 早期方案串行连接,多个 stdio 服务器顺序启动,第一个卡住会阻塞所有后续连接. 我们并发连接,但限制同时启动的进程数,避免系统资源耗尽:
- stdio:≤2(进程启动有 fork 开销,同时启动太多会抢 CPU)
- remote (SSE/HTTP):≤5(网络 RTT 为主要延迟,并发收益明显)
ResourceCache - MCP 资源 TTL + LRU 缓存.
背景:MCP 服务器可以暴露「资源」(resources),Agent 通过 resources/read 读取. 资源内容通常变化较慢(如配置文件,知识库文档,静态数据集), 每次工具调用前都发起 RPC 既浪费 latency 又消耗服务器资源.
设计要点:
- TTL-based 缓存:过期后下次 Get 触发 miss,调用方负责回源
- LRU 驱逐:容量满时驱逐最久未访问的条目(Get 和 Set 均更新访问顺序)
- 缓存键格式:serverName + ":" + resourceURI(命名空间隔离,避免不同服务器同名 URI 碰撞)
- InvalidateServer:ToolListChanged 通知时批量清除某服务器的资源缓存-- 工具列表变化通常意味着服务器重启或更新,资源也应视为失效
- 读多写少:RWMutex 分离读/写路径,高并发 Get 无锁争用
升华改进(ELEVATED): 早期实现 无 MCP 资源缓存,每次 resources/read 都是 RPC-- 在高频工具调用场景(如检索增强生成)下,同一资源可能被读取数十次, TTL 缓存将 RPC 次数降低到 1(TTL 内),latency 从 ~100ms 降到 ~0.1μs. 替代方案:<不缓存,每次 RPC> - 否决:资源读取在工具链中可高度重复,无缓存等于把 MCP 服务器当慢速函数调用.
替代方案2:<基于 TTLCache[string, []ResourceContent] 泛型缓存> - 否决:此处不需要 stale-while-revalidate(资源过期直接 miss,让调用方决定回源时机), 专用结构更直观,测试也更清晰.
SSETransport:通过 HTTP SSE 与远程 MCP 服务器通信.
连接模型(MCP 2024-11-05 规范 SSE 传输):
客户端 服务端
| GET /sse |
| Accept: text/event-stream |
| ─────────────────────────> |
| 200 OK (SSE 流开始) |
| <───────────────────────── |
| event: endpoint |
| data: /messages?sessionId=x| ← 服务端告知请求端点
| <───────────────────────── |
| |
| POST /messages?sessionId=x | ← 客户端发 JSON-RPC 请求
| ─────────────────────────> |
| event: message |
| data: {"jsonrpc":"2.0"...} | ← 服务端通过 SSE 返回响应
| <───────────────────────── |
升华改进(ELEVATED): 早期实现 只支持 stdio,完全依赖本地服务器进程. SSE 传输使 MCP 服务器可以:
- 部署为远程服务(SaaS MCP,容器化工具服务器)
- 穿越防火墙(仅需标准 HTTPS 出站)
- 多客户端共享(一个 MCP 服务器进程服务多个 Agent)
CLI / SDK 消费层按需注入 AuthProvider(如 OAuth token),engine 不感知认证细节.
StdioTransport:通过子进程 stdin/stdout 与 MCP 服务器通信.
这是 MCP 规范要求的基础传输方式:
- 每条 JSON-RPC 消息占一行(newline-delimited JSON)
- 客户端向 stdin 写,从 stdout 读
- stderr 仅用于服务器日志,引擎不解析
早期实现将进程管理和 JSON-RPC 帧混在一个类里. 我们的设计将进程管理单独封装为 StdioTransport,Client 层只看 Transport 接口, 这使 StdioTransport 可以独立测试,也使 SSE/HTTP 传输可以无缝替换.
生命周期:
- NewStdioTransport(cfg) 启动进程,启动后台读取/监控 goroutine
- Send/Recv 正常使用
- Close() 优雅关闭(先 SIGINT,3 秒后 SIGKILL)
MCP 传输层核心抽象.
本文件定义了三个核心接口:
- Transport:字节级 JSON-RPC 消息传输(Send/Recv/Close)
- AuthProvider:按请求提供 HTTP 鉴权头(支持 token 动态刷新)
以及两个开箱即用的 AuthProvider 实现:
- StaticTokenAuth:静态 Bearer token
- NoopAuth:无认证
设计精妙(CLEVER): Transport 操作粒度选在「raw JSON 字节层」-- 不是 io.Reader/Writer(太低层),也不是 JSON-RPC 方法调用(太高层). 原因:
- stdio 传输:字节层契合 newline-framed JSON over pipes.
- SSE 传输:Send 走独立 HTTP POST(完整 JSON body), Recv 从 SSE event 流读完整 JSON 事件--两条连接,无法建模为单个 io.ReadWriter.
- HTTP Streamable:POST 响应本身可以是 SSE 流,同样无法用 io.ReadWriter 表达.
如果选 io.Reader/Writer 层:无法建模 SSE / HTTP Streamable 的双连接模型. 如果选 JSON-RPC 方法层:请求/响应关联(pending map)被埋进 Transport, Client 无法统一管理,重连时 pending 会丢失.
Package mcp 实现 MCP (Model Context Protocol) 客户端.
MCP 是 Flyto 的核心扩展机制,允许通过独立的服务器进程 提供额外的工具,资源和提示.
通信协议:JSON-RPC 2.0 over stdio(stdin/stdout)
Index ¶
- func BridgeAllTools(manager *Manager, registry *tools.Registry) int
- type AuthProvider
- type CapabilityConfig
- type Client
- func (c *Client) CallTool(name string, args map[string]any) (*ToolCallResult, error)
- func (c *Client) Capabilities() ServerCapabilities
- func (c *Client) Close() error
- func (c *Client) GetPrompt(name string, args map[string]string) (*GetPromptResult, error)
- func (c *Client) GetServerInfo() ServerInfo
- func (c *Client) Initialize() error
- func (c *Client) IsAlive() bool
- func (c *Client) IsInitialized() bool
- func (c *Client) ListPrompts() ([]MCPPrompt, error)
- func (c *Client) ListResources() ([]MCPResource, error)
- func (c *Client) ListTools() ([]MCPTool, error)
- func (c *Client) ProcessError() error
- func (c *Client) ReadResource(uri string) (*ReadResourceResult, error)
- func (c *Client) ServerName() string
- func (c *Client) SetElicitationHandler(handler ElicitationHandler)
- func (c *Client) SetNotificationHandler(handler func(method string, params json.RawMessage))
- func (c *Client) SetOnClose(fn func())
- func (c *Client) SetRequestTimeout(timeout time.Duration)
- type ContentItem
- type ElicitationCreateParams
- type ElicitationCreateResult
- type ElicitationHandler
- type ElicitationProperty
- type ElicitationSchema
- type GetPromptResult
- type HTTPTransport
- type InitializeResult
- type ListPromptsResult
- type ListResourcesResult
- type ListToolsResult
- type MCPPrompt
- type MCPPromptArgument
- type MCPPromptMessage
- type MCPResource
- type MCPTool
- type MCPToolBridge
- func (b *MCPToolBridge) Description(_ context.Context) string
- func (b *MCPToolBridge) Execute(ctx context.Context, input json.RawMessage, _ tools.ProgressFunc) (*tools.Result, error)
- func (b *MCPToolBridge) InputSchema() json.RawMessage
- func (b *MCPToolBridge) Metadata() tools.Metadata
- func (b *MCPToolBridge) Name() string
- type Manager
- func (m *Manager) AllPrompts() []MCPPrompt
- func (m *Manager) AllTools() []MCPTool
- func (m *Manager) CallTool(serverName, toolName string, args map[string]any) (*ToolCallResult, error)
- func (m *Manager) CallToolByName(fullName string, args map[string]any) (*ToolCallResult, error)
- func (m *Manager) ClientNames() []string
- func (m *Manager) CloseAll()
- func (m *Manager) CloseOne(name string) error
- func (m *Manager) ConnectAll(configs map[string]config.MCPServerConfig) (int, []error)
- func (m *Manager) ConnectOne(name string, cfg config.MCPServerConfig) error
- func (m *Manager) GetClient(name string) (*Client, bool)
- func (m *Manager) InvalidateResourceCache(serverName, uri string)
- func (m *Manager) InvalidateServerResourceCache(serverName string)
- func (m *Manager) ReadResource(ctx context.Context, serverName, uri string) ([]ResourceContent, error)
- func (m *Manager) SetElicitationHandler(handler ElicitationHandler)
- func (m *Manager) SetEventEmitter(fn func(event string, data map[string]any))
- func (m *Manager) SetOnServerRemoved(fn func(serverName string))
- type NoopAuth
- type Notification
- type RPCError
- type ReadResourceResult
- type Request
- type ResourceCache
- func (c *ResourceCache) Clear()
- func (c *ResourceCache) Get(serverName, uri string) ([]ResourceContent, bool)
- func (c *ResourceCache) Invalidate(serverName, uri string)
- func (c *ResourceCache) InvalidateServer(serverName string)
- func (c *ResourceCache) Len() int
- func (c *ResourceCache) Set(serverName, uri string, content []ResourceContent)
- type ResourceContent
- type Response
- type SSETransport
- type ServerCapabilities
- type ServerInfo
- type StaticTokenAuth
- type StdioTransport
- type ToolCallResult
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AuthProvider ¶
type AuthProvider interface {
// Headers 返回要追加到出站 HTTP 请求的 header 键值对.
// 返回 nil 表示无需额外头部.
Headers(ctx context.Context) (map[string]string, error)
}
AuthProvider 为每条出站 HTTP 请求提供鉴权头部.
升华改进(ELEVATED): 按请求调用(而非建连时一次性注入)-- 早期实现 MCP client 的认证是建连时静态注入,不支持 token 过期后透明刷新. 我们将 Headers() 设计为 per-request 调用,消费层(CLI / SaaS 后端)可以在此:
- 检查 token 是否即将过期,必要时发起 OAuth2 token refresh
- 从 secret manager 动态读取最新凭证
- 实现请求级签名(AWS SigV4 等)
Engine 核心不感知任何 OAuth 协议细节,符合「零硬编码」原则. OAuth 实现由 CLI 消费层自行注入;engine 只定义扩展点.
type CapabilityConfig ¶
type CapabilityConfig struct {
// ListChanged 是否支持列表变更通知
ListChanged bool `json:"listChanged,omitempty"`
}
CapabilityConfig 是能力配置.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client 是 MCP 协议客户端.
线程安全:所有公开方法可并发调用.
func Connect ¶
Connect 根据配置自动选择传输层,创建并返回 MCP 客户端.
exec 是子进程启动抽象, 仅 stdio transport 使用 (方案 β 严格 DI). SSE / HTTP transport 忽略此参数, 因为它们不起子进程. 强制统一传入 是为了让 caller (manager.go / 未来 platform 层) 对所有 transport 类型用同一种 API.
支持的传输类型(cfg.Transport):
- "" / "stdio":子进程 stdin/stdout(本地 MCP 服务器)
- "sse":HTTP SSE(远程 MCP 服务器,2024-11-05 规范)
- "http":HTTP Streamable(远程 MCP 服务器,2025-03-26 规范)
func NewClient ¶
func NewClient(cfg config.MCPServerConfig, tp Transport) *Client
NewClient 使用指定传输层创建 MCP 客户端,并启动后台分发循环.
返回后 tp 已在后台运行,可立即调用 Initialize.
func (*Client) Capabilities ¶
func (c *Client) Capabilities() ServerCapabilities
Capabilities 返回服务器能力(Initialize 后可用).
func (*Client) GetServerInfo ¶
func (c *Client) GetServerInfo() ServerInfo
GetServerInfo 返回服务器信息(Initialize 后可用).
func (*Client) Initialize ¶
Initialize 执行 MCP 协议握手.
发送 initialize 请求获取服务器能力,然后发送 initialized 通知. 幂等:多次调用安全,已初始化时直接返回.
func (*Client) IsAlive ¶
IsAlive 返回底层连接是否仍然活跃.
对于 stdio 传输:检查服务器进程是否存活. 对于 SSE / HTTP 传输:假设活跃(网络状态通过错误返回检测).
func (*Client) ListPrompts ¶
ListPrompts 列出服务器提供的所有提示模板.
func (*Client) ListResources ¶
func (c *Client) ListResources() ([]MCPResource, error)
ListResources 列出服务器提供的所有资源.
func (*Client) ProcessError ¶
ProcessError 返回服务器进程退出错误(仅 stdio 传输有效,其他传输返回 nil).
func (*Client) ReadResource ¶
func (c *Client) ReadResource(uri string) (*ReadResourceResult, error)
ReadResource 读取指定资源的内容.
func (*Client) SetElicitationHandler ¶
func (c *Client) SetElicitationHandler(handler ElicitationHandler)
SetElicitationHandler 注册 elicitation/create server-to-client 请求处理器.
升华改进(ELEVATED): MCP 2025-03-26 规范新增服务器向客户端主动请求用户输入. 必须在 Initialize 前注册,否则 initialize 阶段的 elicitation 请求会被自动 cancel. nil 处理器不会 panic--dispatchLoop 自动回复 cancel(NoopElicitation 语义).
func (*Client) SetNotificationHandler ¶
func (c *Client) SetNotificationHandler(handler func(method string, params json.RawMessage))
SetNotificationHandler 注册服务端通知回调.
handler 在独立 goroutine 中调用,不会阻塞分发循环. 必须在 Initialize 前注册,否则 initialize 阶段的通知会被忽略.
func (*Client) SetOnClose ¶
func (c *Client) SetOnClose(fn func())
SetOnClose 注册连接断开回调(Transport.Recv 返回错误时触发).
必须在 dispatchLoop 启动前注册(即 NewClient 之前),否则可能漏触发. 实际上 NewClient 在创建 Client 后立即启动 dispatchLoop goroutine, 但 dispatchLoop 首先阻塞在 Recv,注册窗口足够大,线程安全(mu 保护).
回调在独立 goroutine 中调用,不阻塞 dispatchLoop 的清理流程.
func (*Client) SetRequestTimeout ¶
SetRequestTimeout 设置后续请求的超时时间.
type ContentItem ¶
type ContentItem struct {
// Type 内容类型(text, image, resource)
Type string `json:"type"`
// Text 文本内容(type=text 时)
Text string `json:"text,omitempty"`
// MimeType MIME 类型(type=resource 时)
MimeType string `json:"mimeType,omitempty"`
// URI 资源 URI(type=resource 时)
URI string `json:"uri,omitempty"`
// Data base64 编码的数据(type=image 时)
Data string `json:"data,omitempty"`
}
ContentItem 是工具结果的内容项.
type ElicitationCreateParams ¶
type ElicitationCreateParams struct {
// Message 展示给用户的说明文字(Markdown)
Message string `json:"message"`
// RequestedSchema 用户输入的 JSON Schema 约束(简化版,仅支持 object type)
RequestedSchema *ElicitationSchema `json:"requestedSchema,omitempty"`
}
ElicitationCreateParams 是 elicitation/create server-to-client 请求的参数.
升华改进(ELEVATED): 早期实现 无此类型,遇到服务器请求时直接挂起. 我们完整定义 spec 数据结构,消费层可以根据 Schema 渲染合适的 UI.
type ElicitationCreateResult ¶
type ElicitationCreateResult struct {
// Action 用户操作:"accept" / "decline" / "cancel"
Action string `json:"action"`
// Content 用户填写的字段值(仅 action=="accept" 时有意义)
// 精妙之处(CLEVER): 用 map[string]any 而非强类型--
// 服务器 schema 声明了类型,客户端只需传字符串,服务器负责类型转换.
// 这样避免客户端需要反射每个字段的 Go 类型,代码简单.
Content map[string]any `json:"content,omitempty"`
}
ElicitationCreateResult 是 elicitation/create 请求的响应(client → server).
type ElicitationHandler ¶
type ElicitationHandler interface {
HandleElicitation(serverName, message string, schema *ElicitationSchema) ElicitationCreateResult
}
ElicitationHandler 是 MCP client 层的 Elicitation 处理接口. 与 engine.ElicitationHandler 平行定义(mcp 包不导入 engine 包,避免循环依赖). Manager 注入时将 engine.ElicitationHandler 包装为此接口.
精妙之处(CLEVER): 接口在 mcp 包内独立定义,而非直接使用 engine 包的接口-- 这打破了 internal/mcp → pkg/engine 的导入链(internal 包不应依赖 pkg/engine). Manager 层(被 engine 包使用)负责适配器注入,接口语义完全相同.
type ElicitationProperty ¶
type ElicitationProperty struct {
// Type 字段类型:"string" / "number" / "boolean"
Type string `json:"type"`
// Title 显示给用户的标题(可选)
Title string `json:"title,omitempty"`
// Description 字段描述(可选)
Description string `json:"description,omitempty"`
// Default 默认值(any,通过 JSON 编码传输)
Default any `json:"default,omitempty"`
}
ElicitationProperty 是 elicitation schema 中单个字段的定义.
type ElicitationSchema ¶
type ElicitationSchema struct {
// Type 固定为 "object"
Type string `json:"type"`
// Properties 字段定义(key 为字段名)
Properties map[string]ElicitationProperty `json:"properties,omitempty"`
// Required 必填字段列表
Required []string `json:"required,omitempty"`
}
ElicitationSchema 是 elicitation 请求中描述用户输入结构的简化 JSON Schema. MCP 规范只支持 object type + 基本属性类型(string/number/boolean).
type GetPromptResult ¶
type GetPromptResult struct {
// Description 提示描述
Description string `json:"description,omitempty"`
// Messages 提示消息列表
Messages []MCPPromptMessage `json:"messages"`
}
GetPromptResult 是 prompts/get 请求的响应.
type HTTPTransport ¶
type HTTPTransport struct {
// contains filtered or unexported fields
}
HTTPTransport 通过 HTTP Streamable 实现 Transport 接口.
func NewHTTPTransport ¶
func NewHTTPTransport(url string, auth AuthProvider) (*HTTPTransport, error)
NewHTTPTransport 创建 HTTP Streamable 传输实例.
url 是服务端端点,例如 "https://mcp.example.com/mcp". auth 是鉴权提供者;传 nil 则使用 NoopAuth(匿名访问).
与 SSETransport.NewSSETransport 不同,本构造函数不建立持久连接, 连接在第一次 Send() 时才发生,是惰性的.
type InitializeResult ¶
type InitializeResult struct {
// ProtocolVersion 协议版本
ProtocolVersion string `json:"protocolVersion"`
// Capabilities 服务器能力
Capabilities ServerCapabilities `json:"capabilities"`
// ServerInfo 服务器信息
ServerInfo ServerInfo `json:"serverInfo"`
}
InitializeResult 是 initialize 请求的响应.
type ListPromptsResult ¶
type ListPromptsResult struct {
Prompts []MCPPrompt `json:"prompts"`
}
ListPromptsResult 是 prompts/list 请求的响应.
type ListResourcesResult ¶
type ListResourcesResult struct {
Resources []MCPResource `json:"resources"`
}
ListResourcesResult 是 resources/list 请求的响应.
type ListToolsResult ¶
type ListToolsResult struct {
Tools []MCPTool `json:"tools"`
}
ListToolsResult 是 tools/list 请求的响应.
type MCPPrompt ¶
type MCPPrompt struct {
// Name 提示名称
Name string `json:"name"`
// Description 提示描述
Description string `json:"description,omitempty"`
// Arguments 提示模板的参数定义
Arguments []MCPPromptArgument `json:"arguments,omitempty"`
}
MCPPrompt 表示一个 MCP 服务器提供的提示模板.
type MCPPromptArgument ¶
type MCPPromptArgument struct {
// Name 参数名称
Name string `json:"name"`
// Description 参数描述
Description string `json:"description,omitempty"`
// Required 是否必填
Required bool `json:"required,omitempty"`
}
MCPPromptArgument 是提示模板的参数定义.
type MCPPromptMessage ¶
type MCPPromptMessage struct {
// Role 消息角色(user / assistant)
Role string `json:"role"`
// Content 消息内容
Content any `json:"content"`
}
MCPPromptMessage 是提示模板返回的消息.
type MCPResource ¶
type MCPResource struct {
// URI 资源唯一标识符
URI string `json:"uri"`
// Name 资源名称
Name string `json:"name"`
// Description 资源描述
Description string `json:"description,omitempty"`
// MimeType 资源的 MIME 类型
MimeType string `json:"mimeType,omitempty"`
}
MCPResource 表示一个 MCP 服务器提供的资源.
type MCPTool ¶
type MCPTool struct {
// Name 工具名称
Name string `json:"name"`
// Description 工具描述
Description string `json:"description,omitempty"`
// InputSchema 工具的 JSON Schema 输入定义
InputSchema json.RawMessage `json:"inputSchema,omitempty"`
}
MCPTool 表示一个 MCP 服务器提供的工具.
type MCPToolBridge ¶
type MCPToolBridge struct {
// contains filtered or unexported fields
}
MCPToolBridge 将一个 MCP 工具桥接为 Engine 的 Tool 接口.
func NewMCPToolBridge ¶
func NewMCPToolBridge(tool MCPTool, serverName string, manager *Manager) *MCPToolBridge
NewMCPToolBridge 创建一个使用 mcp__serverName__toolName 格式的桥接实例.
相较于早期方案的两个构造函数(无前缀 / 有前缀),我们统一为一个, 因为现在所有 MCP 工具都必须使用前缀格式.
func (*MCPToolBridge) Description ¶
func (b *MCPToolBridge) Description(_ context.Context) string
Description 返回工具描述,追加 MCP 服务器来源信息.
func (*MCPToolBridge) Execute ¶
func (b *MCPToolBridge) Execute(ctx context.Context, input json.RawMessage, _ tools.ProgressFunc) (*tools.Result, error)
Execute 执行 MCP 工具.
func (*MCPToolBridge) InputSchema ¶
func (b *MCPToolBridge) InputSchema() json.RawMessage
InputSchema 返回工具的 JSON Schema 输入定义.
升华改进(ELEVATED): 早期方案仅透传 MCP 工具的 inputSchema,不做任何转换. 我们补充了 oneOf/anyOf/default 字段的处理-- 部分 MCP 服务器(如 filesystem,search 服务器)会使用这些关键字, 而 engine 的 JSON Schema 验证器需要它们才能正确推断参数类型. 替代方案:<在验证器层忽略 oneOf/anyOf> - 否决,会导致必填参数校验失败.
func (*MCPToolBridge) Metadata ¶
func (b *MCPToolBridge) Metadata() tools.Metadata
Metadata 返回工具元数据. MCP 工具保守设置:不可并发(服务端状态未知),非只读(可能有副作用).
func (*MCPToolBridge) Name ¶
func (b *MCPToolBridge) Name() string
Name 返回工具完整名称(mcp__serverName__toolName).
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager 管理多个 MCP 服务器连接.
func NewManager ¶
NewManager 创建 MCP 管理器.
exec 是子进程启动抽象, 必填. 本地模式传 execenv.DefaultExecutor{}, 云端模式传 sandbox.Backend{}. 方案 β 严格 DI, 不做 nil fallback — nil 时 Manager 可以构造但 4b-3 后的 Connect 会 panic.
func NewManagerWithResourceCacheTTL ¶
NewManagerWithResourceCacheTTL 创建 MCP 管理器,并指定资源缓存 TTL.
ttl <= 0 表示资源永不过期(仅依靠 InvalidateServer 手动失效). 适用场景:只读静态资源(不会变化的知识库),或测试环境注入可控 TTL.
exec 参数语义同 NewManager, 见其 doc.
func (*Manager) AllPrompts ¶
AllPrompts 收集所有已连接服务器提供的提示模板.
func (*Manager) AllTools ¶
AllTools 返回所有已连接服务器提供的工具,工具名采用 mcp__server__tool 格式.
懒惰刷新:若某服务器的工具缓存被标记为脏(收到 ToolListChanged 通知), 在返回其工具前先重新拉取.
func (*Manager) CallTool ¶
func (m *Manager) CallTool(serverName, toolName string, args map[string]any) (*ToolCallResult, error)
CallTool 在指定服务器上调用工具(serverName 和 toolName 分开传入).
func (*Manager) CallToolByName ¶
CallToolByName 根据完整工具名 mcp__serverName__toolName 路由到正确的服务器.
仅接受 mcp__ 前缀格式,不支持旧的 serverName/toolName 格式.
func (*Manager) ConnectAll ¶
ConnectAll 并发连接所有配置的 MCP 服务器.
并发控制:
- stdio 类服务器最多同时连接 2 个
- SSE/HTTP 类服务器最多同时连接 5 个
某个服务器连接失败不影响其他服务器. 返回成功连接的数量和所有失败的错误列表.
func (*Manager) ConnectOne ¶
func (m *Manager) ConnectOne(name string, cfg config.MCPServerConfig) error
ConnectOne 连接单个 MCP 服务器.
func (*Manager) InvalidateResourceCache ¶
InvalidateResourceCache 手动失效指定资源的缓存.
适用于消费层明确知道资源已更新的场景(如本地文件变更通知).
func (*Manager) InvalidateServerResourceCache ¶
InvalidateServerResourceCache 清除指定服务器的所有资源缓存.
在不触发 ToolListChanged(但服务器内容确实变化)时手动调用.
func (*Manager) ReadResource ¶
func (m *Manager) ReadResource(ctx context.Context, serverName, uri string) ([]ResourceContent, error)
ReadResource 读取指定服务器的指定资源内容,优先使用缓存.
流程:
- 查询 resourceCache(serverName:uri 组合键)
- 命中且未过期 → 直接返回,0 RPC
- Miss 或过期 → 发起 resources/read RPC → 写入缓存 → 返回
升华改进(ELEVATED): ctx 控制 RPC 超时-- 调用方可设置合理的 deadline(如 3s),避免慢服务器阻塞整个 Agent 轮次. 缓存命中路径不需要 ctx(无 I/O),只有回源路径才使用 ctx. 替代方案:<不接受 ctx,内部使用 defaultRequestTimeout> - 否决:调用方(如 runLoop)有整体 context,应当传递以便统一取消.
func (*Manager) SetElicitationHandler ¶
func (m *Manager) SetElicitationHandler(handler ElicitationHandler)
SetElicitationHandler 设置全局 elicitation handler,应在 ConnectAll 之前调用.
已连接的 Client 不会追溯注入(它们在 Connect 时已经使用旧 handler 注册). 最佳实践:engine.New() 时调用 SetElicitationHandler,再调用 ConnectAll.
func (*Manager) SetEventEmitter ¶
SetEventEmitter 注册结构化事件发射函数(可选).
Manager 通过此函数发送以下事件:
- mcp_reconnecting:开始重连(含 server,attempt,max_attempts 字段)
- mcp_reconnected:重连成功(含 server,attempt 字段)
- mcp_reconnect_failed:单次重连失败(含 server,attempt,error 字段)
- mcp_server_removed:服务器被永久移除(含 server,reason 字段)
消费层(CLI/SDK)通过此接口把事件转发给 EngineObserver/日志系统/监控面板. 应在 ConnectAll/ConnectOne 之前调用.
func (*Manager) SetOnServerRemoved ¶
SetOnServerRemoved 注册服务器被永久移除时的回调.
触发时机:服务器断连后重连 reconnectMaxAttempts 次全部失败. 回调在独立 goroutine 中异步调用,不阻塞重连流程. 典型用途:engine 注销 mcp__server__* 工具;监控系统发告警. 应在 ConnectAll/ConnectOne 之前调用,确保回调对所有连接生效.
type NoopAuth ¶
type NoopAuth struct{}
NoopAuth 不提供任何认证信息.
适用场景:匿名访问的 MCP 服务器,或凭证已内嵌于 URL(如 ?token=...)的场景.
type Notification ¶
type Notification struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}
Notification 是 JSON-RPC 2.0 通知(无 ID,不需要响应).
type RPCError ¶
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data json.RawMessage `json:"data,omitempty"`
}
RPCError 是 JSON-RPC 2.0 错误对象.
type ReadResourceResult ¶
type ReadResourceResult struct {
Contents []ResourceContent `json:"contents"`
}
ReadResourceResult 是 resources/read 请求的响应.
type Request ¶
type Request struct {
JSONRPC string `json:"jsonrpc"`
ID json.RawMessage `json:"id"`
Method string `json:"method"`
Params any `json:"params,omitempty"`
}
Request 是 JSON-RPC 2.0 请求(有 ID,需要响应).
type ResourceCache ¶
type ResourceCache struct {
// contains filtered or unexported fields
}
ResourceCache 是 MCP 资源内容的 TTL + LRU 缓存.
线程安全:所有方法可从任意 goroutine 并发调用.
精妙之处(CLEVER): 用 container/list 双向链表维护访问顺序-- list.Front() 是最近访问的条目,list.Back() 是最久未访问的驱逐候选. Get 命中时把链表节点移到 Front,Set 新 key 满容量时驱逐 Back. map[key]*list.Element 提供 O(1) 查找,list 提供 O(1) 移动/删除. 经典 LRU 实现:空间 O(n),时间 O(1) per operation.
func NewResourceCache ¶
func NewResourceCache(ttl time.Duration) *ResourceCache
NewResourceCache 创建一个新的资源缓存.
ttl 是每个条目的生存时间.ttl <= 0 表示永不过期(适用于只读静态资源).
升华改进(ELEVATED): ttl=0 永不过期,而非 ttl=0 立即过期-- 永不过期更符合"资源不变"的业务语义(对比 TTLCache 语义不同). 调用方可通过 Invalidate/InvalidateServer 手动触发失效.
func (*ResourceCache) Get ¶
func (c *ResourceCache) Get(serverName, uri string) ([]ResourceContent, bool)
Get 查询缓存,命中时将条目移到 LRU 链表头部(最近访问).
返回 (content, true):命中且未过期. 返回 (nil, false):未命中(key 不存在或已过期).
过期条目在 Get 时不自动删除-- 懒清理策略:避免每次 Get 都做删除(需要更多锁操作), 过期条目占一个链表位置,下次 Set 时被驱逐或被覆盖写替换.
func (*ResourceCache) Invalidate ¶
func (c *ResourceCache) Invalidate(serverName, uri string)
Invalidate 使单个资源条目立即失效(删除),同步维护 LRU 链表.
下次 Get 返回 miss,调用方触发重新拉取.
func (*ResourceCache) InvalidateServer ¶
func (c *ResourceCache) InvalidateServer(serverName string)
InvalidateServer 清除指定服务器的所有资源缓存,同步维护 LRU 链表.
典型触发场景:
- 收到 notifications/tools/list_changed 通知(服务器可能已更新/重启)
- 用户手动重连服务器
- 服务器断线重连后
升华改进(ELEVATED): 批量失效而非逐条失效-- ToolListChanged 意味着服务器整体状态变化,资源列表也可能变化, 批量清除比逐条 Invalidate 更安全(不会遗漏未知 URI 的条目).
func (*ResourceCache) Set ¶
func (c *ResourceCache) Set(serverName, uri string, content []ResourceContent)
Set 写入或更新缓存条目,刷新过期时间,并将条目移到 LRU 链表头部.
如果缓存已满(>= maxResourceCacheEntries)且目标 key 不存在, 驱逐 LRU 链表尾部(最久未访问)的条目后再写入.
type ResourceContent ¶
type ResourceContent struct {
// URI 资源 URI
URI string `json:"uri"`
// MimeType MIME 类型
MimeType string `json:"mimeType,omitempty"`
// Text 文本内容
Text string `json:"text,omitempty"`
// Blob base64 编码的二进制数据
Blob string `json:"blob,omitempty"`
}
ResourceContent 是读取资源的响应内容.
type Response ¶
type Response struct {
JSONRPC string `json:"jsonrpc"`
ID json.RawMessage `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
}
Response 是 JSON-RPC 2.0 响应.
type SSETransport ¶
type SSETransport struct {
// contains filtered or unexported fields
}
SSETransport 通过 HTTP SSE 实现 Transport 接口.
func NewSSETransport ¶
func NewSSETransport(sseURL string, auth AuthProvider) (*SSETransport, error)
NewSSETransport 连接到 SSE MCP 服务器,等待 endpoint URL 就绪后返回.
sseURL 是服务端 SSE 端点,例如 "https://mcp.example.com/sse". auth 是鉴权提供者;传 nil 则使用 NoopAuth(匿名访问).
建连超时:30 秒(等待服务端发送 endpoint 事件).
type ServerCapabilities ¶
type ServerCapabilities struct {
// Tools 是否支持工具能力
Tools *CapabilityConfig `json:"tools,omitempty"`
// Resources 是否支持资源能力
Resources *CapabilityConfig `json:"resources,omitempty"`
// Prompts 是否支持提示能力
Prompts *CapabilityConfig `json:"prompts,omitempty"`
}
ServerCapabilities 表示 MCP 服务器声明的能力.
type ServerInfo ¶
type ServerInfo struct {
// Name 服务器名称
Name string `json:"name"`
// Version 服务器版本
Version string `json:"version"`
}
ServerInfo 是 MCP 服务器的基本信息.
type StaticTokenAuth ¶
type StaticTokenAuth struct {
// Token 是 Bearer token 值,不含 "Bearer " 前缀.
Token string
}
StaticTokenAuth 提供静态 Bearer token 认证.
适用场景:API key,服务账号令牌等不过期的长效凭证. 对于需要 token 刷新的场景,消费层应自行实现 AuthProvider 接口.
type StdioTransport ¶
type StdioTransport struct {
// contains filtered or unexported fields
}
StdioTransport 通过子进程 stdin/stdout 实现 Transport 接口.
func NewStdioTransport ¶
func NewStdioTransport(exec execenv.Executor, cfg config.MCPServerConfig) (*StdioTransport, error)
NewStdioTransport 启动 MCP 服务器子进程并返回就绪的传输实例.
exec 是子进程启动抽象, 必填 (方案 β 严格 DI). 本地模式传 execenv.DefaultExecutor{}, 云端模式传 sandbox.Backend{}.
返回后可立即调用 Send/Recv,无需额外的连接步骤.
注: 内部用 context.Background() 调 executor.Command - transport 生命周期 由 Close() 和 done channel 管理, 不依赖 ctx 取消. 未来 commit 4b-4 可能 升级为接受 ctx 参数做 lifecycle 统一, 届时改本签名.
func (*StdioTransport) Close ¶
func (t *StdioTransport) Close() error
Close 优雅关闭进程(先 SIGINT,3 秒后 SIGKILL)并释放所有资源.
func (*StdioTransport) ProcessError ¶
func (t *StdioTransport) ProcessError() error
ProcessError 返回进程异常退出时的错误信息(进程正常运行时返回 nil).
type ToolCallResult ¶
type ToolCallResult struct {
// Content 结果内容列表
Content []ContentItem `json:"content"`
// IsError 是否为错误
IsError bool `json:"isError,omitempty"`
}
ToolCallResult 是工具调用的结果.
type Transport ¶
type Transport interface {
// Send 向服务器发送一条 JSON-encoded 消息(JSON-RPC 请求或通知).
// msg 不含换行符;传输实现自行决定帧格式(stdio 追加 '\n',HTTP 用 body).
Send(ctx context.Context, msg []byte) error
// Recv 阻塞等待来自服务器的一条 JSON-encoded 消息.
// 正常关闭时返回 (nil, io.EOF),ctx 超时时返回 ctx.Err().
Recv(ctx context.Context) ([]byte, error)
// Close 终止连接并释放资源.可安全地重复调用.
Close() error
}
Transport 是 MCP 传输层的核心接口.
实现:
- StdioTransport:子进程 stdin/stdout(MCP 2024-11-05 规范必须支持)
- SSETransport:HTTP GET SSE + POST(远程服务器,2024-11-05 规范)
- HTTPTransport:HTTP Streamable(2025-03-26 规范,新服务器优先)