// 多 MCP 服务器管理器. // // Manager 职责: // - 并发连接多个 MCP 服务器(stdio ≤2,remote ≤5) // - 工具名统一采用 mcp__serverName__toolName 格式(无条件前缀) // - 缓存工具列表,当服务端推送 ToolListChanged 通知时标记脏位,懒惰刷新 // - 统一关闭所有连接 // // 工具名格式(升华改进(ELEVATED)): // 早期实现 在无冲突时使用短名称(如 "search"),有冲突时才加前缀("server/search"). // 问题:同一工具名在不同配置下行为不一致,调用方无法通过名称可靠地定位服务器. // 我们始终使用 mcp__serverName__toolName 格式,优势: // 1. 工具名 ↔ 服务器的映射关系一目了然,无需额外查找表 // 2. 避免两台服务器的工具名碰撞导致注册失败(早期方案行为不确定) // 3. 符合 MCP 规范建议的命名空间隔离 // // 替代方案:<沿用早期方案短名/冲突才加前缀> - 否决,行为不确定且难调试 // // 并发限制(升华改进(ELEVATED)): // 早期方案串行连接,多个 stdio 服务器顺序启动,第一个卡住会阻塞所有后续连接. // 我们并发连接,但限制同时启动的进程数,避免系统资源耗尽: // - stdio:≤2(进程启动有 fork 开销,同时启动太多会抢 CPU) // - remote (SSE/HTTP):≤5(网络 RTT 为主要延迟,并发收益明显) package mcp import ( "context" "encoding/json" "fmt" "math/rand" "strings" "sync" "sync/atomic" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // defaultResourceCacheTTL 是资源缓存的默认 TTL. // // 5 分钟选择依据: // - 大多数 MCP 资源(配置文件,知识库文档)更新频率 <<1/5min // - 足够短,使得手动重载/重连服务器后能在合理时间内看到新内容 // - 与 ToolListChanged 联动(工具列表变化时立即失效),无需等待 TTL // // 精妙之处(CLEVER): 常量而非硬编码--未来可通过 MCPServerConfig 扩展 per-server TTL. const defaultResourceCacheTTL = 5 * time.Minute // mcpToolPrefix 是所有 MCP 工具名的统一前缀. // 下划线分隔以与 Go 函数名/工具名区分,__ 用于分隔 server 和 tool 两段. const mcpToolPrefix = "mcp__" // reconnectConstants 定义重连策略参数. // // 精妙之处(CLEVER): 常量组而非魔法数字-- // 与早期实现对齐(MAX_RECONNECT_ATTEMPTS=5, // INITIAL_BACKOFF_MS=1000,MAX_BACKOFF_MS=30000),但集中定义便于维护. const ( reconnectMaxAttempts = 5 // 最多重试 5 次,与早期方案 对齐 reconnectBaseDelay = 1 * time.Second // 首次重连延迟 1s reconnectMaxDelay = 30 * time.Second // 最大退避 30s reconnectJitter = 0.25 // ±25% 随机抖动,防止多服务器同时重连风暴 ) // Manager 管理多个 MCP 服务器连接. type Manager struct { mu sync.RWMutex clients map[string]*Client // serverName → client serverCfgs map[string]config.MCPServerConfig // serverName → 原始配置(用于重连) // toolCache 缓存每个服务器的工具列表,避免每次 AllTools 都远程调用 // dirtySet 记录收到 ToolListChanged 通知后需要刷新的服务器 cacheMu sync.Mutex toolCache map[string][]MCPTool // serverName → tools dirtySet map[string]bool // serverName → true(需要刷新) // resourceCache 缓存 resources/read 的响应内容(TTL-based). // // 升华改进(ELEVATED): 工具列表变化时同时触发资源缓存失效-- // ToolListChanged 通常意味着服务器整体状态更新(重启/版本升级), // 资源内容也应视为过期.双缓存联动避免工具调用拿到旧资源数据. // 替代方案:<只靠 TTL 过期,不联动工具变化> // - 否决:服务器重启后 TTL 内的缓存数据可能来自旧进程,逻辑不一致. resourceCache *ResourceCache // elicitationHandler 在连接每个 Client 时注入(可选). // 升华改进(ELEVATED): Manager 层统一持有 handler-- // ConnectAll/ConnectOne 创建 Client 后立即注入, // 不需要消费层手动调用 client.SetElicitationHandler. // 替代方案:<消费层逐个调用 GetClient().SetElicitationHandler()> // - 否决:ConnectAll 是并发的,消费层获取 Client 时注入可能已错过 Initialize. elicitationHandler ElicitationHandler // ── 重连基础设施(模块 11.6)──────────────────────────────────────────────── // reconnectMu 保护 reconnectCancel map(与主 mu 分离,避免重连 goroutine 阻塞正常操作). reconnectMu sync.Mutex reconnectCancel map[string]context.CancelFunc // serverName → 取消重连 goroutine // onServerRemoved 在服务器重连彻底失败并被移除后调用(可选). // // 升华改进(ELEVATED): 早期实现 只更新 React state,SDK/API 模式无法感知. // 我们通过回调解耦:消费层(CLI/SDK/engine)注册此回调,在服务器移除时做业务处理 // (如从工具注册表注销 mcp__server__* 工具,上报告警等). // 回调在独立 goroutine 中调用,不阻塞重连循环. // 替代方案: // - 否决:需要额外接口定义,且回调更轻量(消费层只关心"有服务器消失"这一件事). onServerRemoved func(serverName string) // emitEvent 向外部发送结构化观测事件(可选). // Manager 不持有完整 Observer 接口,只需要 Event(name, data) 能力. // 消费层通过 SetEventEmitter 注入,nil 时静默跳过(无 observer 的 SDK 嵌入场景). emitEvent func(event string, data map[string]any) // connectFn 可替换的连接函数(测试注入,生产环境为 nil → 使用 Connect). // // 历史包袱(LEGACY): Connect 是包级函数,无法直接 mock. // 将函数引用存为字段是 Go 中标准的依赖注入模式,测试可注入假实现, // 生产代码 nil 则走真实 Connect,无性能代价. // 未来改进:若 Manager 需要支持多种连接策略(如连接池),可将此字段升级为接口. // // 2026-04-15 commit 4b-3: 签名对齐 Connect 加 execenv.Executor 参数, // 测试注入的 fake 函数忽略此参数即可 (stdio subprocess 是被 mock 的目标). connectFn func(exec execenv.Executor, cfg config.MCPServerConfig) (*Client, error) // sleepFn 可替换的退避等待函数(测试注入,生产环境为 nil → time.After). // // 精妙之处(CLEVER): 注入 sleepFn 让测试完全控制时间,无需 mock time 包-- // 注入 func(d) { return } 让所有退避立即完成,测试在毫秒内完成而不是 31s. // 生产代码 nil → 使用真实 time.After,不引入任何额外开销. sleepFn func(d time.Duration, ctx context.Context) bool // true=正常等待完成,false=ctx 取消 // executor 是子进程启动抽象, 从 NewManager 传入, 传递给 Connect/ // NewStdioTransport (见 commit 4b-3). 本地模式 DefaultExecutor, 云端 // 模式 sandbox.Backend. 2026-04-15 M1 commit 4b-2 引入. // // 本 commit 只存字段, 不实际使用 — Connect/NewStdioTransport 的 // 签名级联由 4b-3 处理. Go struct 字段允许 unused, 不报错. executor execenv.Executor } // NewManager 创建 MCP 管理器. // // exec 是子进程启动抽象, 必填. 本地模式传 execenv.DefaultExecutor{}, // 云端模式传 sandbox.Backend{}. 方案 β 严格 DI, 不做 nil fallback — // nil 时 Manager 可以构造但 4b-3 后的 Connect 会 panic. func NewManager(exec execenv.Executor) *Manager { return &Manager{ clients: make(map[string]*Client), serverCfgs: make(map[string]config.MCPServerConfig), toolCache: make(map[string][]MCPTool), dirtySet: make(map[string]bool), resourceCache: NewResourceCache(defaultResourceCacheTTL), reconnectCancel: make(map[string]context.CancelFunc), executor: exec, } } // NewManagerWithResourceCacheTTL 创建 MCP 管理器,并指定资源缓存 TTL. // // ttl <= 0 表示资源永不过期(仅依靠 InvalidateServer 手动失效). // 适用场景:只读静态资源(不会变化的知识库),或测试环境注入可控 TTL. // // exec 参数语义同 NewManager, 见其 doc. func NewManagerWithResourceCacheTTL(exec execenv.Executor, ttl time.Duration) *Manager { m := NewManager(exec) m.resourceCache = NewResourceCache(ttl) return m } // SetOnServerRemoved 注册服务器被永久移除时的回调. // // 触发时机:服务器断连后重连 reconnectMaxAttempts 次全部失败. // 回调在独立 goroutine 中异步调用,不阻塞重连流程. // 典型用途:engine 注销 mcp__server__* 工具;监控系统发告警. // 应在 ConnectAll/ConnectOne 之前调用,确保回调对所有连接生效. func (m *Manager) SetOnServerRemoved(fn func(serverName string)) { m.mu.Lock() defer m.mu.Unlock() m.onServerRemoved = fn } // SetEventEmitter 注册结构化事件发射函数(可选). // // Manager 通过此函数发送以下事件: // - mcp_reconnecting:开始重连(含 server,attempt,max_attempts 字段) // - mcp_reconnected:重连成功(含 server,attempt 字段) // - mcp_reconnect_failed:单次重连失败(含 server,attempt,error 字段) // - mcp_server_removed:服务器被永久移除(含 server,reason 字段) // // 消费层(CLI/SDK)通过此接口把事件转发给 EngineObserver/日志系统/监控面板. // 应在 ConnectAll/ConnectOne 之前调用. func (m *Manager) SetEventEmitter(fn func(event string, data map[string]any)) { m.mu.Lock() defer m.mu.Unlock() m.emitEvent = fn } // SetElicitationHandler 设置全局 elicitation handler,应在 ConnectAll 之前调用. // // 已连接的 Client 不会追溯注入(它们在 Connect 时已经使用旧 handler 注册). // 最佳实践:engine.New() 时调用 SetElicitationHandler,再调用 ConnectAll. func (m *Manager) SetElicitationHandler(handler ElicitationHandler) { m.mu.Lock() defer m.mu.Unlock() m.elicitationHandler = handler } // ConnectAll 并发连接所有配置的 MCP 服务器. // // 并发控制: // - stdio 类服务器最多同时连接 2 个 // - SSE/HTTP 类服务器最多同时连接 5 个 // // 某个服务器连接失败不影响其他服务器. // 返回成功连接的数量和所有失败的错误列表. func (m *Manager) ConnectAll(configs map[string]config.MCPServerConfig) (int, []error) { // 精妙之处(CLEVER): 用 semaphore channel 而非 sync.Pool 控制并发-- // semaphore channel 的容量即并发上限,goroutine 在 send 时阻塞, // 在 recv 时释放.比 worker pool 代码简单,比 WaitGroup+mutex 更直观. stdioSem := make(chan struct{}, 2) // stdio 最多 2 并发 remoteSem := make(chan struct{}, 5) // SSE/HTTP 最多 5 并发 var ( wg sync.WaitGroup errMu sync.Mutex errs []error connected int32 // atomic ) for name, cfg := range configs { wg.Add(1) go func(name string, cfg config.MCPServerConfig) { defer wg.Done() // 根据传输类型选择信号量 isRemote := cfg.Transport == "sse" || cfg.Transport == "http" if isRemote { remoteSem <- struct{}{} defer func() { <-remoteSem }() } else { stdioSem <- struct{}{} defer func() { <-stdioSem }() } cfg.Name = name client, err := Connect(m.executor, cfg) if err != nil { errMu.Lock() errs = append(errs, fmt.Errorf("mcp: connect %q: %w", name, err)) errMu.Unlock() return } // 注册 ElicitationHandler(Initialize 前注入,避免 initialize 阶段请求被漏处理) // 精妙之处(CLEVER): Initialize 前设置-- // MCP 规范允许服务器在 initialize 过程中发送 elicitation, // 必须在 Initialize() 调用前注册,否则 dispatchLoop 收到请求时 handler 还是 nil. m.mu.RLock() elicitHandler := m.elicitationHandler m.mu.RUnlock() if elicitHandler != nil { client.SetElicitationHandler(elicitHandler) } if err := client.Initialize(); err != nil { client.Close() errMu.Lock() errs = append(errs, fmt.Errorf("mcp: initialize %q: %w", name, err)) errMu.Unlock() return } // 注册 ToolListChanged 通知处理器(脏标记 + 懒惰刷新) client.SetNotificationHandler(func(method string, _ json.RawMessage) { if method == "notifications/tools/list_changed" { m.markDirty(name) } }) m.mu.Lock() // 如有同名旧连接,先关闭 if old, ok := m.clients[name]; ok { old.Close() } m.clients[name] = client // 保存配置以备重连使用 m.serverCfgs[name] = cfg m.mu.Unlock() // 注册断连回调:连接意外断开时自动启动重连循环 // 精妙之处(CLEVER): 在 m.clients[name] 写入后才注册 onClose-- // onClose 里的 startReconnectLoop 会读 m.serverCfgs[name], // 必须在 cfg 存入后注册,否则重连时找不到配置. client.SetOnClose(func() { m.startReconnectLoop(name, cfg) }) atomic.AddInt32(&connected, 1) // 预热工具缓存 m.refreshToolCache(name, client) }(name, cfg) } wg.Wait() return int(atomic.LoadInt32(&connected)), errs } // ConnectOne 连接单个 MCP 服务器. func (m *Manager) ConnectOne(name string, cfg config.MCPServerConfig) error { cfg.Name = name client, err := Connect(m.executor, cfg) if err != nil { return err } // 注入 ElicitationHandler(与 ConnectAll 逻辑一致,在 Initialize 前) m.mu.RLock() elicitHandler := m.elicitationHandler m.mu.RUnlock() if elicitHandler != nil { client.SetElicitationHandler(elicitHandler) } if err := client.Initialize(); err != nil { client.Close() return err } client.SetNotificationHandler(func(method string, _ json.RawMessage) { if method == "notifications/tools/list_changed" { m.markDirty(name) } }) m.mu.Lock() if old, ok := m.clients[name]; ok { old.Close() } m.clients[name] = client m.serverCfgs[name] = cfg m.mu.Unlock() // 注册断连自动重连(与 ConnectAll 路径对称) client.SetOnClose(func() { m.startReconnectLoop(name, cfg) }) m.refreshToolCache(name, client) return nil } // markDirty 将指定服务器的工具缓存标记为脏(需要懒惰刷新). // 由 ToolListChanged 通知触发. // // 升华改进(ELEVATED): 同时清除该服务器的资源缓存-- // ToolListChanged 意味着服务器整体状态变化(重启/更新), // 资源内容可能也已更新,一并失效避免工具调用读到旧资源. func (m *Manager) markDirty(serverName string) { m.cacheMu.Lock() defer m.cacheMu.Unlock() m.dirtySet[serverName] = true // 双缓存联动:工具列表变化时同时失效资源缓存 // 精妙之处(CLEVER): 在 cacheMu 下调用 InvalidateServer-- // InvalidateServer 内部持有 resourceCache.mu(与 cacheMu 不同), // 无死锁风险(锁层级:cacheMu > resourceCache.mu,始终按序获取). m.resourceCache.InvalidateServer(serverName) } // refreshToolCache 拉取并更新指定服务器的工具缓存. func (m *Manager) refreshToolCache(serverName string, client *Client) { tools, err := client.ListTools() if err != nil { return } m.cacheMu.Lock() defer m.cacheMu.Unlock() m.toolCache[serverName] = tools delete(m.dirtySet, serverName) // 刷新后清除脏标记 } // AllTools 返回所有已连接服务器提供的工具,工具名采用 mcp__server__tool 格式. // // 懒惰刷新:若某服务器的工具缓存被标记为脏(收到 ToolListChanged 通知), // 在返回其工具前先重新拉取. func (m *Manager) AllTools() []MCPTool { m.mu.RLock() // 拷贝客户端引用,避免长时间持有读锁 clients := make(map[string]*Client, len(m.clients)) for k, v := range m.clients { clients[k] = v } m.mu.RUnlock() var allTools []MCPTool for serverName, client := range clients { // 检查是否需要懒惰刷新 m.cacheMu.Lock() isDirty := m.dirtySet[serverName] if isDirty { delete(m.dirtySet, serverName) } m.cacheMu.Unlock() if isDirty { // 在锁外刷新,避免死锁 m.refreshToolCache(serverName, client) } m.cacheMu.Lock() cached := m.toolCache[serverName] m.cacheMu.Unlock() for _, tool := range cached { t := tool // 无条件加前缀:mcp__serverName__toolName t.Name = mcpToolPrefix + serverName + "__" + tool.Name allTools = append(allTools, t) } } return allTools } // AllPrompts 收集所有已连接服务器提供的提示模板. func (m *Manager) AllPrompts() []MCPPrompt { m.mu.RLock() defer m.mu.RUnlock() var allPrompts []MCPPrompt for _, client := range m.clients { prompts, err := client.ListPrompts() if err != nil { continue } allPrompts = append(allPrompts, prompts...) } return allPrompts } // CallTool 在指定服务器上调用工具(serverName 和 toolName 分开传入). func (m *Manager) CallTool(serverName, toolName string, args map[string]any) (*ToolCallResult, error) { m.mu.RLock() client, ok := m.clients[serverName] m.mu.RUnlock() if !ok { return nil, fmt.Errorf("mcp: server %q not connected", serverName) } return client.CallTool(toolName, args) } // CallToolByName 根据完整工具名 mcp__serverName__toolName 路由到正确的服务器. // // 仅接受 mcp__ 前缀格式,不支持旧的 serverName/toolName 格式. func (m *Manager) CallToolByName(fullName string, args map[string]any) (*ToolCallResult, error) { serverName, toolName := parseToolFullName(fullName) if serverName == "" { return nil, fmt.Errorf("mcp: invalid tool name %q (expected format: mcp__serverName__toolName)", fullName) } return m.CallTool(serverName, toolName, args) } // GetClient 返回指定名称的客户端(不存在时返回 false). func (m *Manager) GetClient(name string) (*Client, bool) { m.mu.RLock() defer m.mu.RUnlock() c, ok := m.clients[name] return c, ok } // ClientNames 返回所有已连接的服务器名称. func (m *Manager) ClientNames() []string { m.mu.RLock() defer m.mu.RUnlock() names := make([]string, 0, len(m.clients)) for name := range m.clients { names = append(names, name) } return names } // CloseAll 关闭所有 MCP 服务器连接,并取消所有正在进行的重连循环. func (m *Manager) CloseAll() { // 先取消所有重连 goroutine--防止 client.Close() 触发 onClose → 启动新重连 // 精妙之处(CLEVER): 先取消重连再关闭客户端-- // 若先关闭客户端,onClose 回调被触发,startReconnectLoop 启动新 goroutine, // 然后 cancel 时新 goroutine 才退出,留下短暂的"僵尸重连"窗口. // 先取消确保 startReconnectLoop 里的 ctx 已 Done,goroutine 立即退出. m.reconnectMu.Lock() for _, cancel := range m.reconnectCancel { cancel() } m.reconnectCancel = make(map[string]context.CancelFunc) m.reconnectMu.Unlock() m.mu.Lock() defer m.mu.Unlock() for name, client := range m.clients { client.Close() delete(m.clients, name) delete(m.serverCfgs, name) } m.cacheMu.Lock() m.toolCache = make(map[string][]MCPTool) m.dirtySet = make(map[string]bool) m.cacheMu.Unlock() // 清空资源缓存(服务器全部断开,缓存数据无意义) m.resourceCache.Clear() } // CloseOne 关闭指定的 MCP 服务器连接,并取消其重连循环. func (m *Manager) CloseOne(name string) error { // 先取消重连(与 CloseAll 同理:防止僵尸重连窗口) m.reconnectMu.Lock() if cancel, ok := m.reconnectCancel[name]; ok { cancel() delete(m.reconnectCancel, name) } m.reconnectMu.Unlock() m.mu.Lock() defer m.mu.Unlock() client, ok := m.clients[name] if !ok { return fmt.Errorf("mcp: server %q not connected", name) } client.Close() delete(m.clients, name) delete(m.serverCfgs, name) m.cacheMu.Lock() delete(m.toolCache, name) delete(m.dirtySet, name) m.cacheMu.Unlock() // 清除该服务器的资源缓存 m.resourceCache.InvalidateServer(name) return nil } // ── 资源读取 ────────────────────────────────────────────────────────────────── // ReadResource 读取指定服务器的指定资源内容,优先使用缓存. // // 流程: // 1. 查询 resourceCache(serverName:uri 组合键) // 2. 命中且未过期 → 直接返回,0 RPC // 3. Miss 或过期 → 发起 resources/read RPC → 写入缓存 → 返回 // // 升华改进(ELEVATED): ctx 控制 RPC 超时-- // 调用方可设置合理的 deadline(如 3s),避免慢服务器阻塞整个 Agent 轮次. // 缓存命中路径不需要 ctx(无 I/O),只有回源路径才使用 ctx. // 替代方案:<不接受 ctx,内部使用 defaultRequestTimeout> // - 否决:调用方(如 runLoop)有整体 context,应当传递以便统一取消. func (m *Manager) ReadResource(ctx context.Context, serverName, uri string) ([]ResourceContent, error) { // 1. 先查缓存(RLock 读路径) if cached, ok := m.resourceCache.Get(serverName, uri); ok { return cached, nil } // 2. 缓存 miss:找到对应的 Client m.mu.RLock() client, ok := m.clients[serverName] m.mu.RUnlock() if !ok { return nil, fmt.Errorf("mcp: server %q not connected", serverName) } // 3. 发起 resources/read RPC // 精妙之处(CLEVER): 直接使用 client.ReadResource(ctx 版本未封装, // 用 sendWithTimeout 内部也是 context.WithTimeout(background, 30s))-- // 这里不额外包装 ctx,因为 Client.ReadResource 内部已有 defaultRequestTimeout 兜底. // 如果消费层 ctx 更短(用户取消),取消信号会通过 ctx.Done 在 runLoop 上层处理. result, err := client.ReadResource(uri) if err != nil { return nil, fmt.Errorf("mcp: read resource %q from server %q: %w", uri, serverName, err) } // 4. 写入缓存 m.resourceCache.Set(serverName, uri, result.Contents) return result.Contents, nil } // InvalidateResourceCache 手动失效指定资源的缓存. // // 适用于消费层明确知道资源已更新的场景(如本地文件变更通知). func (m *Manager) InvalidateResourceCache(serverName, uri string) { m.resourceCache.Invalidate(serverName, uri) } // InvalidateServerResourceCache 清除指定服务器的所有资源缓存. // // 在不触发 ToolListChanged(但服务器内容确实变化)时手动调用. func (m *Manager) InvalidateServerResourceCache(serverName string) { m.resourceCache.InvalidateServer(serverName) } // ── 重连基础设施(模块 11.6)──────────────────────────────────────────────────── // startReconnectLoop 为指定服务器启动(或重启)后台重连 goroutine. // // 精妙之处(CLEVER): 先取消旧的重连 goroutine 再启动新的-- // 如果上次重连还没结束(如正在等待退避延迟),不能同时有两个 goroutine 竞争重连. // cancel 旧的确保幂等:同一服务器任意时刻只有一个重连 goroutine 在跑. // // 此方法由 client.onClose 触发(在独立 goroutine 中),也可由测试直接调用. func (m *Manager) startReconnectLoop(name string, cfg config.MCPServerConfig) { m.reconnectMu.Lock() // 取消旧的重连 goroutine(若存在) if cancel, ok := m.reconnectCancel[name]; ok { cancel() } ctx, cancel := context.WithCancel(context.Background()) m.reconnectCancel[name] = cancel m.reconnectMu.Unlock() go m.reconnectLoop(ctx, name, cfg) } // reconnectLoop 执行指数退避重连(最多 reconnectMaxAttempts 次). // // 升华改进(ELEVATED): 早期实现把重连逻辑写在 React Hook 里-- // SDK 嵌入/HTTP API 场景没有 React,重连完全缺失. // 我们把重连逻辑内嵌在 Manager 层,与 UI 零耦合.CLI 通过 Observer 事件感知状态, // SDK 通过 OnServerRemoved 回调处理后续,HTTP API 模式天然支持(Manager 是纯 Go). // 替代方案:<消费层轮询 GetClient().IsAlive(),断了自己重连> // - 否决:每个消费层都要重新实现退避逻辑,容易不一致;且 IsAlive 有延迟. func (m *Manager) reconnectLoop(ctx context.Context, name string, cfg config.MCPServerConfig) { for attempt := 1; attempt <= reconnectMaxAttempts; attempt++ { // 检查是否已被主动关闭(CloseAll/CloseOne 取消了 ctx) select { case <-ctx.Done(): return default: } // 发送重连中事件 m.emit("mcp_reconnecting", map[string]any{ "server": name, "attempt": attempt, "max_attempts": reconnectMaxAttempts, }) // 指数退避:delay = min(baseDelay * 2^(attempt-1), maxDelay) ± 25% 抖动 // // 精妙之处(CLEVER): ±25% 随机抖动防止多服务器同时崩溃时的重连风暴-- // 如果 10 个 MCP 服务器同时崩溃(如系统重启),没有抖动它们会在完全相同的时刻 // 同时重连,对重启后的服务器造成瞬时峰值压力. // 抖动公式来自早期方案:510-537,保持与 TS 对齐. base := reconnectBaseDelay * (1 << uint(attempt-1)) // 2^(attempt-1) if base > reconnectMaxDelay { base = reconnectMaxDelay } jitter := time.Duration(float64(base) * reconnectJitter * (2*rand.Float64() - 1)) wait := base + jitter if wait < 0 { wait = 0 } // 精妙之处(CLEVER): sleepFn 注入让测试控制时间-- // 生产代码 nil → time.After;测试注入 func(d,ctx) bool { return true } 立即返回. if m.sleepFn != nil { if !m.sleepFn(wait, ctx) { return // ctx 取消 } } else { select { case <-ctx.Done(): return // 主动关闭,不再重连 case <-time.After(wait): } } // 再次检查(等待期间可能被 CloseAll/CloseOne 取消) select { case <-ctx.Done(): return default: } // 尝试重新连接(优先使用注入的 connectFn,否则调用 Connect) connectFn := m.connectFn if connectFn == nil { connectFn = Connect } client, err := connectFn(m.executor, cfg) if err != nil { m.emit("mcp_reconnect_failed", map[string]any{ "server": name, "attempt": attempt, "error": err.Error(), }) continue } // 注入 ElicitationHandler m.mu.RLock() elicitHandler := m.elicitationHandler m.mu.RUnlock() if elicitHandler != nil { client.SetElicitationHandler(elicitHandler) } if err := client.Initialize(); err != nil { client.Close() m.emit("mcp_reconnect_failed", map[string]any{ "server": name, "attempt": attempt, "error": err.Error(), }) continue } // 注册通知处理器(ToolListChanged 等) client.SetNotificationHandler(func(method string, _ json.RawMessage) { if method == "notifications/tools/list_changed" { m.markDirty(name) } }) // 注册下一次断连的重连回调(链式自愈) client.SetOnClose(func() { m.startReconnectLoop(name, cfg) }) // 安装新 Client m.mu.Lock() if old, ok := m.clients[name]; ok { old.Close() } m.clients[name] = client m.mu.Unlock() // 刷新工具缓存 m.refreshToolCache(name, client) // 清除本服务器的重连 cancel(重连成功,无需再持有) m.reconnectMu.Lock() delete(m.reconnectCancel, name) m.reconnectMu.Unlock() m.emit("mcp_reconnected", map[string]any{ "server": name, "attempt": attempt, }) return // 重连成功,退出循环 } // 所有尝试耗尽,永久移除此服务器 m.removeServerOnFailure(name) } // removeServerOnFailure 将重连彻底失败的服务器从 Manager 中永久移除. // // 清理顺序:clients → toolCache → resourceCache → emit 事件 → onServerRemoved 回调. // 顺序设计:先清缓存(AllTools 立即停止返回该服务器工具),再通知外部(外部看到的状态一致). func (m *Manager) removeServerOnFailure(name string) { m.mu.Lock() if client, ok := m.clients[name]; ok { client.Close() delete(m.clients, name) } delete(m.serverCfgs, name) m.mu.Unlock() m.cacheMu.Lock() delete(m.toolCache, name) delete(m.dirtySet, name) m.cacheMu.Unlock() m.resourceCache.InvalidateServer(name) m.reconnectMu.Lock() delete(m.reconnectCancel, name) m.reconnectMu.Unlock() m.emit("mcp_server_removed", map[string]any{ "server": name, "reason": "reconnect_exhausted", }) // 通知外部(如 engine 注销工具) m.mu.RLock() cb := m.onServerRemoved m.mu.RUnlock() if cb != nil { go cb(name) // 异步调用,不阻塞重连循环 } } // emit 安全地发送观测事件(emitEvent 为 nil 时静默跳过). func (m *Manager) emit(event string, data map[string]any) { m.mu.RLock() fn := m.emitEvent m.mu.RUnlock() if fn != nil { fn(event, data) } } // parseToolFullName 解析 mcp__serverName__toolName 格式的工具名. // // "mcp__github__search_code" → ("github", "search_code") // 不符合格式的字符串 → ("", 原字符串) // // 精妙之处(CLEVER): 只识别 mcp__ 前缀后的第一个 __ 作为分隔符-- // 工具名本身可能包含 __(如 search__v2),只拆第一个确保工具名完整保留. func parseToolFullName(fullName string) (serverName, toolName string) { if !strings.HasPrefix(fullName, mcpToolPrefix) { return "", fullName } rest := fullName[len(mcpToolPrefix):] // 去掉 "mcp__" idx := strings.Index(rest, "__") if idx < 0 { return "", fullName // 只有 "mcp__serverName",无工具名 } return rest[:idx], rest[idx+2:] }