mcp

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: None detected not legal advice Imports: 0 Imported by: 0

Documentation

Overview

MCP 工具到 Engine Tool 接口的桥接.

MCPToolBridge 将 MCP 服务器提供的工具包装为 Engine 的 Tool 接口, 使 MCP 工具可以像内置工具一样被 Engine 调度和执行.

工具名格式(升华改进(ELEVATED)): 早期实现 无冲突时用短名 "search",有冲突才用 "server/search",行为不一致. 我们始终用 mcp__serverName__toolName 格式(由 Manager.AllTools 统一生成), Bridge 层不再做碰撞检测,职责更纯粹:类型转换 + 调用路由.

转换流程:

  1. MCPTool.InputSchema → Tool.InputSchema()(支持 oneOf / anyOf / default)
  2. Tool.Execute() → Manager.CallTool() → ToolCallResult
  3. ToolCallResult.Content(text/image/resource)→ tools.Result.Output

MCP 客户端:JSON-RPC 请求/响应关联 + 服务器协议握手.

Client 层负责:

  • JSON-RPC 请求 ID 管理(nextID + pending map)
  • 分发后台循环(dispatchLoop:从 Transport.Recv 读消息,路由到等待 goroutine)
  • MCP 协议方法封装(Initialize / ListTools / CallTool / ListResources 等)
  • 通知回调注册(服务端主动推送,如 ToolListChanged)

Client 不感知传输细节(进程管理,HTTP,SSE),只通过 Transport 接口交互. 这使不同传输方式下的协议逻辑完全共享,无需为 stdio / SSE / HTTP 分别实现.

升华改进(ELEVATED): 早期实现将进程管理,JSON-RPC 帧,协议方法混在一起, 无法支持远程 MCP 服务器.我们拆分为 Transport(传输) + Client(协议), 任何实现了 Transport 接口的传输层都能直接复用所有 MCP 协议方法.

HTTPTransport:通过 HTTP Streamable 与 MCP 服务器通信.

对应 MCP 2025-03-26 规范的 Streamable HTTP 传输模式:

客户端                         服务端
  |  POST /mcp                  |
  |  Content-Type: application/json
  |  Accept: application/json, text/event-stream
  | ─────────────────────────> |
  |                             |
  |  响应 A(单条):           |
  |  200 OK                     |
  |  Content-Type: application/json
  |  {"jsonrpc":"2.0",...}      |
  | <───────────────────────── |
  |                             |
  |  响应 B(流式):           |
  |  200 OK                     |
  |  Content-Type: text/event-stream
  |  data: {...}                |  ← 多条消息
  |  data: {...}                |
  | <───────────────────────── |
  |                             |
  |  202 Accepted(通知 fire-and-forget)
  | <───────────────────────── |

Session 管理:

  • 服务端可在响应头中返回 Mcp-Session-Id
  • 后续请求需在请求头携带该 Session-Id

升华改进(ELEVATED): 相较于 SSE 传输(两条持久连接),HTTP Streamable 更简洁-- 每次请求/响应是独立 HTTP 事务,无需维持长连接,对 CDN / 负载均衡更友好. SSE 传输兼容旧版 MCP 服务器;HTTP Streamable 面向 2025-03-26 及以后的新服务器. 替代方案:<复用 SSE 传输+扩展 POST 端点> - 否决,因为两个规范的会话管理模型不同.

JSON-RPC 2.0 协议类型定义.

本文件只保留数据类型;传输逻辑(帧格式,连接管理)由各 Transport 实现负责, 请求/响应关联(pending map,nextID)由 Client 负责.

早期实现 将 JSON-RPC 类型和传输逻辑混在同一个类里. 我们将类型单独抽出,使 StdioTransport / SSETransport / HTTPTransport 都能复用相同的类型定义而不引入循环依赖.

多 MCP 服务器管理器.

Manager 职责:

  • 并发连接多个 MCP 服务器(stdio ≤2,remote ≤5)
  • 工具名统一采用 mcp__serverName__toolName 格式(无条件前缀)
  • 缓存工具列表,当服务端推送 ToolListChanged 通知时标记脏位,懒惰刷新
  • 统一关闭所有连接

工具名格式(升华改进(ELEVATED)): 早期实现 在无冲突时使用短名称(如 "search"),有冲突时才加前缀("server/search"). 问题:同一工具名在不同配置下行为不一致,调用方无法通过名称可靠地定位服务器. 我们始终使用 mcp__serverName__toolName 格式,优势:

  1. 工具名 ↔ 服务器的映射关系一目了然,无需额外查找表
  2. 避免两台服务器的工具名碰撞导致注册失败(早期方案行为不确定)
  3. 符合 MCP 规范建议的命名空间隔离

替代方案:<沿用早期方案短名/冲突才加前缀> - 否决,行为不确定且难调试

并发限制(升华改进(ELEVATED)): 早期方案串行连接,多个 stdio 服务器顺序启动,第一个卡住会阻塞所有后续连接. 我们并发连接,但限制同时启动的进程数,避免系统资源耗尽:

  • stdio:≤2(进程启动有 fork 开销,同时启动太多会抢 CPU)
  • remote (SSE/HTTP):≤5(网络 RTT 为主要延迟,并发收益明显)

ResourceCache - MCP 资源 TTL + LRU 缓存.

背景:MCP 服务器可以暴露「资源」(resources),Agent 通过 resources/read 读取. 资源内容通常变化较慢(如配置文件,知识库文档,静态数据集), 每次工具调用前都发起 RPC 既浪费 latency 又消耗服务器资源.

设计要点:

  • TTL-based 缓存:过期后下次 Get 触发 miss,调用方负责回源
  • LRU 驱逐:容量满时驱逐最久未访问的条目(Get 和 Set 均更新访问顺序)
  • 缓存键格式:serverName + ":" + resourceURI(命名空间隔离,避免不同服务器同名 URI 碰撞)
  • InvalidateServer:ToolListChanged 通知时批量清除某服务器的资源缓存-- 工具列表变化通常意味着服务器重启或更新,资源也应视为失效
  • 读多写少:RWMutex 分离读/写路径,高并发 Get 无锁争用

升华改进(ELEVATED): 早期实现 无 MCP 资源缓存,每次 resources/read 都是 RPC-- 在高频工具调用场景(如检索增强生成)下,同一资源可能被读取数十次, TTL 缓存将 RPC 次数降低到 1(TTL 内),latency 从 ~100ms 降到 ~0.1μs. 替代方案:<不缓存,每次 RPC> - 否决:资源读取在工具链中可高度重复,无缓存等于把 MCP 服务器当慢速函数调用.

替代方案2:<基于 TTLCache[string, []ResourceContent] 泛型缓存> - 否决:此处不需要 stale-while-revalidate(资源过期直接 miss,让调用方决定回源时机), 专用结构更直观,测试也更清晰.

SSETransport:通过 HTTP SSE 与远程 MCP 服务器通信.

连接模型(MCP 2024-11-05 规范 SSE 传输):

客户端                        服务端
  |  GET /sse                   |
  |  Accept: text/event-stream  |
  | ─────────────────────────> |
  |  200 OK (SSE 流开始)        |
  | <───────────────────────── |
  |  event: endpoint            |
  |  data: /messages?sessionId=x|  ← 服务端告知请求端点
  | <───────────────────────── |
  |                             |
  |  POST /messages?sessionId=x |  ← 客户端发 JSON-RPC 请求
  | ─────────────────────────> |
  |  event: message             |
  |  data: {"jsonrpc":"2.0"...} |  ← 服务端通过 SSE 返回响应
  | <───────────────────────── |

升华改进(ELEVATED): 早期实现 只支持 stdio,完全依赖本地服务器进程. SSE 传输使 MCP 服务器可以:

  • 部署为远程服务(SaaS MCP,容器化工具服务器)
  • 穿越防火墙(仅需标准 HTTPS 出站)
  • 多客户端共享(一个 MCP 服务器进程服务多个 Agent)

CLI / SDK 消费层按需注入 AuthProvider(如 OAuth token),engine 不感知认证细节.

StdioTransport:通过子进程 stdin/stdout 与 MCP 服务器通信.

这是 MCP 规范要求的基础传输方式:

  • 每条 JSON-RPC 消息占一行(newline-delimited JSON)
  • 客户端向 stdin 写,从 stdout 读
  • stderr 仅用于服务器日志,引擎不解析

早期实现将进程管理和 JSON-RPC 帧混在一个类里. 我们的设计将进程管理单独封装为 StdioTransport,Client 层只看 Transport 接口, 这使 StdioTransport 可以独立测试,也使 SSE/HTTP 传输可以无缝替换.

生命周期:

  1. NewStdioTransport(cfg) 启动进程,启动后台读取/监控 goroutine
  2. Send/Recv 正常使用
  3. Close() 优雅关闭(先 SIGINT,3 秒后 SIGKILL)

MCP 传输层核心抽象.

本文件定义了三个核心接口:

  • Transport:字节级 JSON-RPC 消息传输(Send/Recv/Close)
  • AuthProvider:按请求提供 HTTP 鉴权头(支持 token 动态刷新)

以及两个开箱即用的 AuthProvider 实现:

  • StaticTokenAuth:静态 Bearer token
  • NoopAuth:无认证

设计精妙(CLEVER): Transport 操作粒度选在「raw JSON 字节层」-- 不是 io.Reader/Writer(太低层),也不是 JSON-RPC 方法调用(太高层). 原因:

  1. stdio 传输:字节层契合 newline-framed JSON over pipes.
  2. SSE 传输:Send 走独立 HTTP POST(完整 JSON body), Recv 从 SSE event 流读完整 JSON 事件--两条连接,无法建模为单个 io.ReadWriter.
  3. HTTP Streamable:POST 响应本身可以是 SSE 流,同样无法用 io.ReadWriter 表达.

如果选 io.Reader/Writer 层:无法建模 SSE / HTTP Streamable 的双连接模型. 如果选 JSON-RPC 方法层:请求/响应关联(pending map)被埋进 Transport, Client 无法统一管理,重连时 pending 会丢失.

Package mcp 实现 MCP (Model Context Protocol) 客户端.

MCP 是 Flyto 的核心扩展机制,允许通过独立的服务器进程 提供额外的工具,资源和提示.

通信协议:JSON-RPC 2.0 over stdio(stdin/stdout)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BridgeAllTools

func BridgeAllTools(manager *Manager, registry *tools.Registry) int

BridgeAllTools 将管理器中所有 MCP 工具桥接为 Engine Tool 并注册到注册表.

工具名冲突:若注册表中已有同名工具,跳过并记录(不强制覆盖). 返回成功注册的工具数量.

Types

type AuthProvider

type AuthProvider interface {
	// Headers 返回要追加到出站 HTTP 请求的 header 键值对.
	// 返回 nil 表示无需额外头部.
	Headers(ctx context.Context) (map[string]string, error)
}

AuthProvider 为每条出站 HTTP 请求提供鉴权头部.

升华改进(ELEVATED): 按请求调用(而非建连时一次性注入)-- 早期实现 MCP client 的认证是建连时静态注入,不支持 token 过期后透明刷新. 我们将 Headers() 设计为 per-request 调用,消费层(CLI / SaaS 后端)可以在此:

  • 检查 token 是否即将过期,必要时发起 OAuth2 token refresh
  • 从 secret manager 动态读取最新凭证
  • 实现请求级签名(AWS SigV4 等)

Engine 核心不感知任何 OAuth 协议细节,符合「零硬编码」原则. OAuth 实现由 CLI 消费层自行注入;engine 只定义扩展点.

type CapabilityConfig

type CapabilityConfig struct {
	// ListChanged 是否支持列表变更通知
	ListChanged bool `json:"listChanged,omitempty"`
}

CapabilityConfig 是能力配置.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 是 MCP 协议客户端.

线程安全:所有公开方法可并发调用.

func Connect

func Connect(exec execenv.Executor, cfg config.MCPServerConfig) (*Client, error)

Connect 根据配置自动选择传输层,创建并返回 MCP 客户端.

exec 是子进程启动抽象, 仅 stdio transport 使用 (方案 β 严格 DI). SSE / HTTP transport 忽略此参数, 因为它们不起子进程. 强制统一传入 是为了让 caller (manager.go / 未来 platform 层) 对所有 transport 类型用同一种 API.

支持的传输类型(cfg.Transport):

  • "" / "stdio":子进程 stdin/stdout(本地 MCP 服务器)
  • "sse":HTTP SSE(远程 MCP 服务器,2024-11-05 规范)
  • "http":HTTP Streamable(远程 MCP 服务器,2025-03-26 规范)

func NewClient

func NewClient(cfg config.MCPServerConfig, tp Transport) *Client

NewClient 使用指定传输层创建 MCP 客户端,并启动后台分发循环.

返回后 tp 已在后台运行,可立即调用 Initialize.

func (*Client) CallTool

func (c *Client) CallTool(name string, args map[string]any) (*ToolCallResult, error)

CallTool 调用指定的工具.

func (*Client) Capabilities

func (c *Client) Capabilities() ServerCapabilities

Capabilities 返回服务器能力(Initialize 后可用).

func (*Client) Close

func (c *Client) Close() error

Close 关闭客户端并释放所有资源.

func (*Client) GetPrompt

func (c *Client) GetPrompt(name string, args map[string]string) (*GetPromptResult, error)

GetPrompt 获取指定提示模板的内容.

func (*Client) GetServerInfo

func (c *Client) GetServerInfo() ServerInfo

GetServerInfo 返回服务器信息(Initialize 后可用).

func (*Client) Initialize

func (c *Client) Initialize() error

Initialize 执行 MCP 协议握手.

发送 initialize 请求获取服务器能力,然后发送 initialized 通知. 幂等:多次调用安全,已初始化时直接返回.

func (*Client) IsAlive

func (c *Client) IsAlive() bool

IsAlive 返回底层连接是否仍然活跃.

对于 stdio 传输:检查服务器进程是否存活. 对于 SSE / HTTP 传输:假设活跃(网络状态通过错误返回检测).

func (*Client) IsInitialized

func (c *Client) IsInitialized() bool

IsInitialized 返回客户端是否已完成初始化握手.

func (*Client) ListPrompts

func (c *Client) ListPrompts() ([]MCPPrompt, error)

ListPrompts 列出服务器提供的所有提示模板.

func (*Client) ListResources

func (c *Client) ListResources() ([]MCPResource, error)

ListResources 列出服务器提供的所有资源.

func (*Client) ListTools

func (c *Client) ListTools() ([]MCPTool, error)

ListTools 列出服务器提供的所有工具.

func (*Client) ProcessError

func (c *Client) ProcessError() error

ProcessError 返回服务器进程退出错误(仅 stdio 传输有效,其他传输返回 nil).

func (*Client) ReadResource

func (c *Client) ReadResource(uri string) (*ReadResourceResult, error)

ReadResource 读取指定资源的内容.

func (*Client) ServerName

func (c *Client) ServerName() string

ServerName 返回服务器配置名称.

func (*Client) SetElicitationHandler

func (c *Client) SetElicitationHandler(handler ElicitationHandler)

SetElicitationHandler 注册 elicitation/create server-to-client 请求处理器.

升华改进(ELEVATED): MCP 2025-03-26 规范新增服务器向客户端主动请求用户输入. 必须在 Initialize 前注册,否则 initialize 阶段的 elicitation 请求会被自动 cancel. nil 处理器不会 panic--dispatchLoop 自动回复 cancel(NoopElicitation 语义).

func (*Client) SetNotificationHandler

func (c *Client) SetNotificationHandler(handler func(method string, params json.RawMessage))

SetNotificationHandler 注册服务端通知回调.

handler 在独立 goroutine 中调用,不会阻塞分发循环. 必须在 Initialize 前注册,否则 initialize 阶段的通知会被忽略.

func (*Client) SetOnClose

func (c *Client) SetOnClose(fn func())

SetOnClose 注册连接断开回调(Transport.Recv 返回错误时触发).

必须在 dispatchLoop 启动前注册(即 NewClient 之前),否则可能漏触发. 实际上 NewClient 在创建 Client 后立即启动 dispatchLoop goroutine, 但 dispatchLoop 首先阻塞在 Recv,注册窗口足够大,线程安全(mu 保护).

回调在独立 goroutine 中调用,不阻塞 dispatchLoop 的清理流程.

func (*Client) SetRequestTimeout

func (c *Client) SetRequestTimeout(timeout time.Duration)

SetRequestTimeout 设置后续请求的超时时间.

type ContentItem

type ContentItem struct {
	// Type 内容类型(text, image, resource)
	Type string `json:"type"`
	// Text 文本内容(type=text 时)
	Text string `json:"text,omitempty"`
	// MimeType MIME 类型(type=resource 时)
	MimeType string `json:"mimeType,omitempty"`
	// URI 资源 URI(type=resource 时)
	URI string `json:"uri,omitempty"`
	// Data base64 编码的数据(type=image 时)
	Data string `json:"data,omitempty"`
}

ContentItem 是工具结果的内容项.

type ElicitationCreateParams

type ElicitationCreateParams struct {
	// Message 展示给用户的说明文字(Markdown)
	Message string `json:"message"`

	// RequestedSchema 用户输入的 JSON Schema 约束(简化版,仅支持 object type)
	RequestedSchema *ElicitationSchema `json:"requestedSchema,omitempty"`
}

ElicitationCreateParams 是 elicitation/create server-to-client 请求的参数.

升华改进(ELEVATED): 早期实现 无此类型,遇到服务器请求时直接挂起. 我们完整定义 spec 数据结构,消费层可以根据 Schema 渲染合适的 UI.

type ElicitationCreateResult

type ElicitationCreateResult struct {
	// Action 用户操作:"accept" / "decline" / "cancel"
	Action string `json:"action"`

	// Content 用户填写的字段值(仅 action=="accept" 时有意义)
	// 精妙之处(CLEVER): 用 map[string]any 而非强类型--
	// 服务器 schema 声明了类型,客户端只需传字符串,服务器负责类型转换.
	// 这样避免客户端需要反射每个字段的 Go 类型,代码简单.
	Content map[string]any `json:"content,omitempty"`
}

ElicitationCreateResult 是 elicitation/create 请求的响应(client → server).

type ElicitationHandler

type ElicitationHandler interface {
	HandleElicitation(serverName, message string, schema *ElicitationSchema) ElicitationCreateResult
}

ElicitationHandler 是 MCP client 层的 Elicitation 处理接口. 与 engine.ElicitationHandler 平行定义(mcp 包不导入 engine 包,避免循环依赖). Manager 注入时将 engine.ElicitationHandler 包装为此接口.

精妙之处(CLEVER): 接口在 mcp 包内独立定义,而非直接使用 engine 包的接口-- 这打破了 internal/mcp → pkg/engine 的导入链(internal 包不应依赖 pkg/engine). Manager 层(被 engine 包使用)负责适配器注入,接口语义完全相同.

type ElicitationProperty

type ElicitationProperty struct {
	// Type 字段类型:"string" / "number" / "boolean"
	Type string `json:"type"`

	// Title 显示给用户的标题(可选)
	Title string `json:"title,omitempty"`

	// Description 字段描述(可选)
	Description string `json:"description,omitempty"`

	// Default 默认值(any,通过 JSON 编码传输)
	Default any `json:"default,omitempty"`
}

ElicitationProperty 是 elicitation schema 中单个字段的定义.

type ElicitationSchema

type ElicitationSchema struct {
	// Type 固定为 "object"
	Type string `json:"type"`

	// Properties 字段定义(key 为字段名)
	Properties map[string]ElicitationProperty `json:"properties,omitempty"`

	// Required 必填字段列表
	Required []string `json:"required,omitempty"`
}

ElicitationSchema 是 elicitation 请求中描述用户输入结构的简化 JSON Schema. MCP 规范只支持 object type + 基本属性类型(string/number/boolean).

type GetPromptResult

type GetPromptResult struct {
	// Description 提示描述
	Description string `json:"description,omitempty"`
	// Messages 提示消息列表
	Messages []MCPPromptMessage `json:"messages"`
}

GetPromptResult 是 prompts/get 请求的响应.

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport 通过 HTTP Streamable 实现 Transport 接口.

func NewHTTPTransport

func NewHTTPTransport(url string, auth AuthProvider) (*HTTPTransport, error)

NewHTTPTransport 创建 HTTP Streamable 传输实例.

url 是服务端端点,例如 "https://mcp.example.com/mcp". auth 是鉴权提供者;传 nil 则使用 NoopAuth(匿名访问).

与 SSETransport.NewSSETransport 不同,本构造函数不建立持久连接, 连接在第一次 Send() 时才发生,是惰性的.

func (*HTTPTransport) Close

func (t *HTTPTransport) Close() error

Close 关闭 HTTP 传输.

func (*HTTPTransport) Recv

func (t *HTTPTransport) Recv(ctx context.Context) ([]byte, error)

Recv 阻塞等待一条来自服务器的消息(单条响应或流中的一条事件).

func (*HTTPTransport) Send

func (t *HTTPTransport) Send(ctx context.Context, msg []byte) error

Send 发送 JSON-RPC 消息并处理响应.

根据服务端返回的 Content-Type 分两种处理路径:

  • application/json:同步读取单条响应,投递到 recvCh
  • text/event-stream:异步解析 SSE 流,每条事件投递到 recvCh

type InitializeResult

type InitializeResult struct {
	// ProtocolVersion 协议版本
	ProtocolVersion string `json:"protocolVersion"`
	// Capabilities 服务器能力
	Capabilities ServerCapabilities `json:"capabilities"`
	// ServerInfo 服务器信息
	ServerInfo ServerInfo `json:"serverInfo"`
}

InitializeResult 是 initialize 请求的响应.

type ListPromptsResult

type ListPromptsResult struct {
	Prompts []MCPPrompt `json:"prompts"`
}

ListPromptsResult 是 prompts/list 请求的响应.

type ListResourcesResult

type ListResourcesResult struct {
	Resources []MCPResource `json:"resources"`
}

ListResourcesResult 是 resources/list 请求的响应.

type ListToolsResult

type ListToolsResult struct {
	Tools []MCPTool `json:"tools"`
}

ListToolsResult 是 tools/list 请求的响应.

type MCPPrompt

type MCPPrompt struct {
	// Name 提示名称
	Name string `json:"name"`
	// Description 提示描述
	Description string `json:"description,omitempty"`
	// Arguments 提示模板的参数定义
	Arguments []MCPPromptArgument `json:"arguments,omitempty"`
}

MCPPrompt 表示一个 MCP 服务器提供的提示模板.

type MCPPromptArgument

type MCPPromptArgument struct {
	// Name 参数名称
	Name string `json:"name"`
	// Description 参数描述
	Description string `json:"description,omitempty"`
	// Required 是否必填
	Required bool `json:"required,omitempty"`
}

MCPPromptArgument 是提示模板的参数定义.

type MCPPromptMessage

type MCPPromptMessage struct {
	// Role 消息角色(user / assistant)
	Role string `json:"role"`
	// Content 消息内容
	Content any `json:"content"`
}

MCPPromptMessage 是提示模板返回的消息.

type MCPResource

type MCPResource struct {
	// URI 资源唯一标识符
	URI string `json:"uri"`
	// Name 资源名称
	Name string `json:"name"`
	// Description 资源描述
	Description string `json:"description,omitempty"`
	// MimeType 资源的 MIME 类型
	MimeType string `json:"mimeType,omitempty"`
}

MCPResource 表示一个 MCP 服务器提供的资源.

type MCPTool

type MCPTool struct {
	// Name 工具名称
	Name string `json:"name"`
	// Description 工具描述
	Description string `json:"description,omitempty"`
	// InputSchema 工具的 JSON Schema 输入定义
	InputSchema json.RawMessage `json:"inputSchema,omitempty"`
}

MCPTool 表示一个 MCP 服务器提供的工具.

type MCPToolBridge

type MCPToolBridge struct {
	// contains filtered or unexported fields
}

MCPToolBridge 将一个 MCP 工具桥接为 Engine 的 Tool 接口.

func NewMCPToolBridge

func NewMCPToolBridge(tool MCPTool, serverName string, manager *Manager) *MCPToolBridge

NewMCPToolBridge 创建一个使用 mcp__serverName__toolName 格式的桥接实例.

相较于早期方案的两个构造函数(无前缀 / 有前缀),我们统一为一个, 因为现在所有 MCP 工具都必须使用前缀格式.

func (*MCPToolBridge) Description

func (b *MCPToolBridge) Description(_ context.Context) string

Description 返回工具描述,追加 MCP 服务器来源信息.

func (*MCPToolBridge) Execute

Execute 执行 MCP 工具.

func (*MCPToolBridge) InputSchema

func (b *MCPToolBridge) InputSchema() json.RawMessage

InputSchema 返回工具的 JSON Schema 输入定义.

升华改进(ELEVATED): 早期方案仅透传 MCP 工具的 inputSchema,不做任何转换. 我们补充了 oneOf/anyOf/default 字段的处理-- 部分 MCP 服务器(如 filesystem,search 服务器)会使用这些关键字, 而 engine 的 JSON Schema 验证器需要它们才能正确推断参数类型. 替代方案:<在验证器层忽略 oneOf/anyOf> - 否决,会导致必填参数校验失败.

func (*MCPToolBridge) Metadata

func (b *MCPToolBridge) Metadata() tools.Metadata

Metadata 返回工具元数据. MCP 工具保守设置:不可并发(服务端状态未知),非只读(可能有副作用).

func (*MCPToolBridge) Name

func (b *MCPToolBridge) Name() string

Name 返回工具完整名称(mcp__serverName__toolName).

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 管理多个 MCP 服务器连接.

func NewManager

func NewManager(exec execenv.Executor) *Manager

NewManager 创建 MCP 管理器.

exec 是子进程启动抽象, 必填. 本地模式传 execenv.DefaultExecutor{}, 云端模式传 sandbox.Backend{}. 方案 β 严格 DI, 不做 nil fallback — nil 时 Manager 可以构造但 4b-3 后的 Connect 会 panic.

func NewManagerWithResourceCacheTTL

func NewManagerWithResourceCacheTTL(exec execenv.Executor, ttl time.Duration) *Manager

NewManagerWithResourceCacheTTL 创建 MCP 管理器,并指定资源缓存 TTL.

ttl <= 0 表示资源永不过期(仅依靠 InvalidateServer 手动失效). 适用场景:只读静态资源(不会变化的知识库),或测试环境注入可控 TTL.

exec 参数语义同 NewManager, 见其 doc.

func (*Manager) AllPrompts

func (m *Manager) AllPrompts() []MCPPrompt

AllPrompts 收集所有已连接服务器提供的提示模板.

func (*Manager) AllTools

func (m *Manager) AllTools() []MCPTool

AllTools 返回所有已连接服务器提供的工具,工具名采用 mcp__server__tool 格式.

懒惰刷新:若某服务器的工具缓存被标记为脏(收到 ToolListChanged 通知), 在返回其工具前先重新拉取.

func (*Manager) CallTool

func (m *Manager) CallTool(serverName, toolName string, args map[string]any) (*ToolCallResult, error)

CallTool 在指定服务器上调用工具(serverName 和 toolName 分开传入).

func (*Manager) CallToolByName

func (m *Manager) CallToolByName(fullName string, args map[string]any) (*ToolCallResult, error)

CallToolByName 根据完整工具名 mcp__serverName__toolName 路由到正确的服务器.

仅接受 mcp__ 前缀格式,不支持旧的 serverName/toolName 格式.

func (*Manager) ClientNames

func (m *Manager) ClientNames() []string

ClientNames 返回所有已连接的服务器名称.

func (*Manager) CloseAll

func (m *Manager) CloseAll()

CloseAll 关闭所有 MCP 服务器连接,并取消所有正在进行的重连循环.

func (*Manager) CloseOne

func (m *Manager) CloseOne(name string) error

CloseOne 关闭指定的 MCP 服务器连接,并取消其重连循环.

func (*Manager) ConnectAll

func (m *Manager) ConnectAll(configs map[string]config.MCPServerConfig) (int, []error)

ConnectAll 并发连接所有配置的 MCP 服务器.

并发控制:

  • stdio 类服务器最多同时连接 2 个
  • SSE/HTTP 类服务器最多同时连接 5 个

某个服务器连接失败不影响其他服务器. 返回成功连接的数量和所有失败的错误列表.

func (*Manager) ConnectOne

func (m *Manager) ConnectOne(name string, cfg config.MCPServerConfig) error

ConnectOne 连接单个 MCP 服务器.

func (*Manager) GetClient

func (m *Manager) GetClient(name string) (*Client, bool)

GetClient 返回指定名称的客户端(不存在时返回 false).

func (*Manager) InvalidateResourceCache

func (m *Manager) InvalidateResourceCache(serverName, uri string)

InvalidateResourceCache 手动失效指定资源的缓存.

适用于消费层明确知道资源已更新的场景(如本地文件变更通知).

func (*Manager) InvalidateServerResourceCache

func (m *Manager) InvalidateServerResourceCache(serverName string)

InvalidateServerResourceCache 清除指定服务器的所有资源缓存.

在不触发 ToolListChanged(但服务器内容确实变化)时手动调用.

func (*Manager) ReadResource

func (m *Manager) ReadResource(ctx context.Context, serverName, uri string) ([]ResourceContent, error)

ReadResource 读取指定服务器的指定资源内容,优先使用缓存.

流程:

  1. 查询 resourceCache(serverName:uri 组合键)
  2. 命中且未过期 → 直接返回,0 RPC
  3. Miss 或过期 → 发起 resources/read RPC → 写入缓存 → 返回

升华改进(ELEVATED): ctx 控制 RPC 超时-- 调用方可设置合理的 deadline(如 3s),避免慢服务器阻塞整个 Agent 轮次. 缓存命中路径不需要 ctx(无 I/O),只有回源路径才使用 ctx. 替代方案:<不接受 ctx,内部使用 defaultRequestTimeout> - 否决:调用方(如 runLoop)有整体 context,应当传递以便统一取消.

func (*Manager) SetElicitationHandler

func (m *Manager) SetElicitationHandler(handler ElicitationHandler)

SetElicitationHandler 设置全局 elicitation handler,应在 ConnectAll 之前调用.

已连接的 Client 不会追溯注入(它们在 Connect 时已经使用旧 handler 注册). 最佳实践:engine.New() 时调用 SetElicitationHandler,再调用 ConnectAll.

func (*Manager) SetEventEmitter

func (m *Manager) SetEventEmitter(fn func(event string, data map[string]any))

SetEventEmitter 注册结构化事件发射函数(可选).

Manager 通过此函数发送以下事件:

  • mcp_reconnecting:开始重连(含 server,attempt,max_attempts 字段)
  • mcp_reconnected:重连成功(含 server,attempt 字段)
  • mcp_reconnect_failed:单次重连失败(含 server,attempt,error 字段)
  • mcp_server_removed:服务器被永久移除(含 server,reason 字段)

消费层(CLI/SDK)通过此接口把事件转发给 EngineObserver/日志系统/监控面板. 应在 ConnectAll/ConnectOne 之前调用.

func (*Manager) SetOnServerRemoved

func (m *Manager) SetOnServerRemoved(fn func(serverName string))

SetOnServerRemoved 注册服务器被永久移除时的回调.

触发时机:服务器断连后重连 reconnectMaxAttempts 次全部失败. 回调在独立 goroutine 中异步调用,不阻塞重连流程. 典型用途:engine 注销 mcp__server__* 工具;监控系统发告警. 应在 ConnectAll/ConnectOne 之前调用,确保回调对所有连接生效.

type NoopAuth

type NoopAuth struct{}

NoopAuth 不提供任何认证信息.

适用场景:匿名访问的 MCP 服务器,或凭证已内嵌于 URL(如 ?token=...)的场景.

func (*NoopAuth) Headers

func (n *NoopAuth) Headers(_ context.Context) (map[string]string, error)

Headers 返回 nil(无额外头部).

type Notification

type Notification struct {
	JSONRPC string          `json:"jsonrpc"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params,omitempty"`
}

Notification 是 JSON-RPC 2.0 通知(无 ID,不需要响应).

type RPCError

type RPCError struct {
	Code    int             `json:"code"`
	Message string          `json:"message"`
	Data    json.RawMessage `json:"data,omitempty"`
}

RPCError 是 JSON-RPC 2.0 错误对象.

func (*RPCError) Error

func (e *RPCError) Error() string

Error 实现 error 接口.

type ReadResourceResult

type ReadResourceResult struct {
	Contents []ResourceContent `json:"contents"`
}

ReadResourceResult 是 resources/read 请求的响应.

type Request

type Request struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      json.RawMessage `json:"id"`
	Method  string          `json:"method"`
	Params  any             `json:"params,omitempty"`
}

Request 是 JSON-RPC 2.0 请求(有 ID,需要响应).

type ResourceCache

type ResourceCache struct {
	// contains filtered or unexported fields
}

ResourceCache 是 MCP 资源内容的 TTL + LRU 缓存.

线程安全:所有方法可从任意 goroutine 并发调用.

精妙之处(CLEVER): 用 container/list 双向链表维护访问顺序-- list.Front() 是最近访问的条目,list.Back() 是最久未访问的驱逐候选. Get 命中时把链表节点移到 Front,Set 新 key 满容量时驱逐 Back. map[key]*list.Element 提供 O(1) 查找,list 提供 O(1) 移动/删除. 经典 LRU 实现:空间 O(n),时间 O(1) per operation.

func NewResourceCache

func NewResourceCache(ttl time.Duration) *ResourceCache

NewResourceCache 创建一个新的资源缓存.

ttl 是每个条目的生存时间.ttl <= 0 表示永不过期(适用于只读静态资源).

升华改进(ELEVATED): ttl=0 永不过期,而非 ttl=0 立即过期-- 永不过期更符合"资源不变"的业务语义(对比 TTLCache 语义不同). 调用方可通过 Invalidate/InvalidateServer 手动触发失效.

func (*ResourceCache) Clear

func (c *ResourceCache) Clear()

Clear 清空所有缓存条目,重置 LRU 链表.

func (*ResourceCache) Get

func (c *ResourceCache) Get(serverName, uri string) ([]ResourceContent, bool)

Get 查询缓存,命中时将条目移到 LRU 链表头部(最近访问).

返回 (content, true):命中且未过期. 返回 (nil, false):未命中(key 不存在或已过期).

过期条目在 Get 时不自动删除-- 懒清理策略:避免每次 Get 都做删除(需要更多锁操作), 过期条目占一个链表位置,下次 Set 时被驱逐或被覆盖写替换.

func (*ResourceCache) Invalidate

func (c *ResourceCache) Invalidate(serverName, uri string)

Invalidate 使单个资源条目立即失效(删除),同步维护 LRU 链表.

下次 Get 返回 miss,调用方触发重新拉取.

func (*ResourceCache) InvalidateServer

func (c *ResourceCache) InvalidateServer(serverName string)

InvalidateServer 清除指定服务器的所有资源缓存,同步维护 LRU 链表.

典型触发场景:

  • 收到 notifications/tools/list_changed 通知(服务器可能已更新/重启)
  • 用户手动重连服务器
  • 服务器断线重连后

升华改进(ELEVATED): 批量失效而非逐条失效-- ToolListChanged 意味着服务器整体状态变化,资源列表也可能变化, 批量清除比逐条 Invalidate 更安全(不会遗漏未知 URI 的条目).

func (*ResourceCache) Len

func (c *ResourceCache) Len() int

Len 返回缓存中的条目数量(包括已过期未清理的). 主要用于测试和监控.

func (*ResourceCache) Set

func (c *ResourceCache) Set(serverName, uri string, content []ResourceContent)

Set 写入或更新缓存条目,刷新过期时间,并将条目移到 LRU 链表头部.

如果缓存已满(>= maxResourceCacheEntries)且目标 key 不存在, 驱逐 LRU 链表尾部(最久未访问)的条目后再写入.

type ResourceContent

type ResourceContent struct {
	// URI 资源 URI
	URI string `json:"uri"`
	// MimeType MIME 类型
	MimeType string `json:"mimeType,omitempty"`
	// Text 文本内容
	Text string `json:"text,omitempty"`
	// Blob base64 编码的二进制数据
	Blob string `json:"blob,omitempty"`
}

ResourceContent 是读取资源的响应内容.

type Response

type Response struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      json.RawMessage `json:"id"`
	Result  json.RawMessage `json:"result,omitempty"`
	Error   *RPCError       `json:"error,omitempty"`
}

Response 是 JSON-RPC 2.0 响应.

type SSETransport

type SSETransport struct {
	// contains filtered or unexported fields
}

SSETransport 通过 HTTP SSE 实现 Transport 接口.

func NewSSETransport

func NewSSETransport(sseURL string, auth AuthProvider) (*SSETransport, error)

NewSSETransport 连接到 SSE MCP 服务器,等待 endpoint URL 就绪后返回.

sseURL 是服务端 SSE 端点,例如 "https://mcp.example.com/sse". auth 是鉴权提供者;传 nil 则使用 NoopAuth(匿名访问).

建连超时:30 秒(等待服务端发送 endpoint 事件).

func (*SSETransport) Close

func (t *SSETransport) Close() error

Close 关闭 SSE 连接.

func (*SSETransport) Recv

func (t *SSETransport) Recv(ctx context.Context) ([]byte, error)

Recv 阻塞等待来自 SSE 流的一条消息.

func (*SSETransport) Send

func (t *SSETransport) Send(ctx context.Context, msg []byte) error

Send 通过 HTTP POST 向 endpoint URL 发送 JSON-RPC 消息.

响应通过 SSE 流异步返回,不在此处等待.

type ServerCapabilities

type ServerCapabilities struct {
	// Tools 是否支持工具能力
	Tools *CapabilityConfig `json:"tools,omitempty"`
	// Resources 是否支持资源能力
	Resources *CapabilityConfig `json:"resources,omitempty"`
	// Prompts 是否支持提示能力
	Prompts *CapabilityConfig `json:"prompts,omitempty"`
}

ServerCapabilities 表示 MCP 服务器声明的能力.

type ServerInfo

type ServerInfo struct {
	// Name 服务器名称
	Name string `json:"name"`
	// Version 服务器版本
	Version string `json:"version"`
}

ServerInfo 是 MCP 服务器的基本信息.

type StaticTokenAuth

type StaticTokenAuth struct {
	// Token 是 Bearer token 值,不含 "Bearer " 前缀.
	Token string
}

StaticTokenAuth 提供静态 Bearer token 认证.

适用场景:API key,服务账号令牌等不过期的长效凭证. 对于需要 token 刷新的场景,消费层应自行实现 AuthProvider 接口.

func (*StaticTokenAuth) Headers

func (s *StaticTokenAuth) Headers(_ context.Context) (map[string]string, error)

Headers 返回 Authorization: Bearer <token> 头部.

type StdioTransport

type StdioTransport struct {
	// contains filtered or unexported fields
}

StdioTransport 通过子进程 stdin/stdout 实现 Transport 接口.

func NewStdioTransport

func NewStdioTransport(exec execenv.Executor, cfg config.MCPServerConfig) (*StdioTransport, error)

NewStdioTransport 启动 MCP 服务器子进程并返回就绪的传输实例.

exec 是子进程启动抽象, 必填 (方案 β 严格 DI). 本地模式传 execenv.DefaultExecutor{}, 云端模式传 sandbox.Backend{}.

返回后可立即调用 Send/Recv,无需额外的连接步骤.

注: 内部用 context.Background() 调 executor.Command - transport 生命周期 由 Close() 和 done channel 管理, 不依赖 ctx 取消. 未来 commit 4b-4 可能 升级为接受 ctx 参数做 lifecycle 统一, 届时改本签名.

func (*StdioTransport) Close

func (t *StdioTransport) Close() error

Close 优雅关闭进程(先 SIGINT,3 秒后 SIGKILL)并释放所有资源.

func (*StdioTransport) IsAlive

func (t *StdioTransport) IsAlive() bool

IsAlive 返回服务器进程是否仍在运行.

func (*StdioTransport) ProcessError

func (t *StdioTransport) ProcessError() error

ProcessError 返回进程异常退出时的错误信息(进程正常运行时返回 nil).

func (*StdioTransport) Recv

func (t *StdioTransport) Recv(ctx context.Context) ([]byte, error)

Recv 阻塞等待来自服务器 stdout 的一条消息.

四路 select:正常消息,进程退出(排空缓冲后报错),transport 关闭,ctx 取消.

func (*StdioTransport) Send

func (t *StdioTransport) Send(_ context.Context, msg []byte) error

Send 向服务器进程的 stdin 写入一条消息(追加换行符).

MCP stdio 传输规范:每条 JSON-RPC 消息以换行符终止.

type ToolCallResult

type ToolCallResult struct {
	// Content 结果内容列表
	Content []ContentItem `json:"content"`
	// IsError 是否为错误
	IsError bool `json:"isError,omitempty"`
}

ToolCallResult 是工具调用的结果.

type Transport

type Transport interface {
	// Send 向服务器发送一条 JSON-encoded 消息(JSON-RPC 请求或通知).
	// msg 不含换行符;传输实现自行决定帧格式(stdio 追加 '\n',HTTP 用 body).
	Send(ctx context.Context, msg []byte) error

	// Recv 阻塞等待来自服务器的一条 JSON-encoded 消息.
	// 正常关闭时返回 (nil, io.EOF),ctx 超时时返回 ctx.Err().
	Recv(ctx context.Context) ([]byte, error)

	// Close 终止连接并释放资源.可安全地重复调用.
	Close() error
}

Transport 是 MCP 传输层的核心接口.

实现:

  • StdioTransport:子进程 stdin/stdout(MCP 2024-11-05 规范必须支持)
  • SSETransport:HTTP GET SSE + POST(远程服务器,2024-11-05 规范)
  • HTTPTransport:HTTP Streamable(2025-03-26 规范,新服务器优先)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL