// reconnect_test.go - 模块 11.6 MCP 断连重连测试. // // 覆盖: // - Client.onClose 在 Transport 断开时被触发 // - Manager.reconnectLoop:全部失败后移除服务器 + 回调 + Observer 事件 // - Manager.reconnectLoop:若干次失败后最终成功 // - Manager.CloseAll/CloseOne 取消正在运行的重连 goroutine // - Manager.emit:nil emitEvent 时静默跳过 // // 设计说明: // - 不依赖真实进程/网络;fakeTransport 模拟字节层,fakeMCPServer 模拟协议握手. // - 使用 connectFn 注入替换真实 Connect,隔离外部依赖. // - 时间敏感测试用 time.After + channel 断言,避免固定 sleep. package mcp import ( "context" "encoding/json" "fmt" "io" "sync" "sync/atomic" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // ── fakeTransport ───────────────────────────────────────────────────────────── // fakeTransport 是测试用的可控 Transport 实现. // // 精妙之处(CLEVER): 用两个独立 channel(recvCh + done)而非单一关闭信号-- // recvCh 可按需推送消息,done 关闭时 Recv 立即返回 io.EOF. // 这模拟了 stdio/SSE 传输的两种断连方式:消息流结束 vs 连接强制关闭. type fakeTransport struct { recvCh chan []byte done chan struct{} once sync.Once } func newFakeTransport() *fakeTransport { return &fakeTransport{ recvCh: make(chan []byte, 16), done: make(chan struct{}), } } func (f *fakeTransport) Send(_ context.Context, _ []byte) error { return nil } func (f *fakeTransport) Recv(_ context.Context) ([]byte, error) { select { case msg, ok := <-f.recvCh: if !ok { return nil, io.EOF } return msg, nil case <-f.done: return nil, io.EOF } } func (f *fakeTransport) Close() error { f.once.Do(func() { close(f.done) }) return nil } // push 向 recvCh 推送一条消息 func (f *fakeTransport) push(msg []byte) { f.recvCh <- msg } // closeRecv 关闭消息流(模拟 EOF) func (f *fakeTransport) closeRecv() { f.once.Do(func() { close(f.done) }) } // ── fakeMCPServer ───────────────────────────────────────────────────────────── // fakeMCPServerTransport 是一对可互相通信的传输层(in-process 版本). // clientTP 供 Client 使用,serverTP 供 fakeMCPServer goroutine 使用. type fakeMCPServerTransport struct { c2s chan []byte // client → server s2c chan []byte // server → client done chan struct{} once sync.Once } func newFakeMCPServerPair() (clientTP, serverTP *fakeMCPServerTransport) { c2s := make(chan []byte, 16) s2c := make(chan []byte, 16) doneC := make(chan struct{}) doneS := make(chan struct{}) return &fakeMCPServerTransport{c2s: c2s, s2c: s2c, done: doneC}, &fakeMCPServerTransport{c2s: s2c, s2c: c2s, done: doneS} } func (f *fakeMCPServerTransport) Send(_ context.Context, msg []byte) error { select { case f.s2c <- msg: return nil case <-f.done: return io.EOF } } func (f *fakeMCPServerTransport) Recv(_ context.Context) ([]byte, error) { select { case msg, ok := <-f.c2s: if !ok { return nil, io.EOF } return msg, nil case <-f.done: return nil, io.EOF } } func (f *fakeMCPServerTransport) Close() error { f.once.Do(func() { close(f.done) }) return nil } // newAutoInitClient 创建一个会自动完成 MCP initialize 握手的 Client. // // serverTP 端运行一个 goroutine 响应 initialize + initialized + tools/list, // 返回就绪的 Client(可立即调用 Initialize()). func newAutoInitClient(t *testing.T, cfg config.MCPServerConfig) *Client { t.Helper() clientTP, serverTP := newFakeMCPServerPair() // Server goroutine:处理 initialize + initialized + tools/list 握手 go func() { ctx := context.Background() handleMsg := func() (int64, string, bool) { raw, err := serverTP.Recv(ctx) if err != nil { return 0, "", false } var req struct { ID *int64 `json:"id"` Method string `json:"method"` } json.Unmarshal(raw, &req) id := int64(0) if req.ID != nil { id = *req.ID } return id, req.Method, true } for { id, method, ok := handleMsg() if !ok { return } switch method { case "initialize": resp := fmt.Sprintf( `{"jsonrpc":"2.0","id":%d,"result":{"protocolVersion":"2024-11-05","capabilities":{},"serverInfo":{"name":"test","version":"1.0"}}}`, id) serverTP.Send(ctx, []byte(resp)) case "notifications/initialized": // 通知无需响应 case "tools/list": resp := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"result":{"tools":[]}}`, id) serverTP.Send(ctx, []byte(resp)) return // 握手完成,退出服务 goroutine } } }() return NewClient(cfg, clientTP) } // ── Client.onClose 测试 ──────────────────────────────────────────────────────── // TestClient_OnClose_CalledOnTransportEOF 验证 Client.onClose 在 Transport 返回 EOF 时被触发. func TestClient_OnClose_CalledOnTransportEOF(t *testing.T) { tp := newFakeTransport() client := NewClient(config.MCPServerConfig{Name: "test"}, tp) called := make(chan struct{}, 1) client.SetOnClose(func() { called <- struct{}{} }) // 触发 EOF tp.closeRecv() select { case <-called: // OK:onClose 被调用 case <-time.After(2 * time.Second): t.Fatal("onClose should be called when transport returns EOF, but timed out") } } // TestClient_OnClose_NotCalledOnExplicitClose 验证主动调用 Close() 时 onClose 也会被触发 // (因为 Close() 关闭 done channel,Recv 返回 EOF). // 这是正常行为:Manager 通过 CloseAll/CloseOne 中先取消重连 ctx 来阻止不必要的重连. func TestClient_OnClose_CalledOnExplicitClose(t *testing.T) { tp := newFakeTransport() client := NewClient(config.MCPServerConfig{Name: "test"}, tp) called := make(chan struct{}, 1) client.SetOnClose(func() { called <- struct{}{} }) client.Close() select { case <-called: // OK:Close() 关闭传输,dispatchLoop 收到 EOF,触发 onClose case <-time.After(2 * time.Second): t.Fatal("onClose should be called after Close(), timed out") } } // TestClient_NoOnClose_DoesNotPanic 验证未设置 onClose 时断连不 panic. func TestClient_NoOnClose_DoesNotPanic(t *testing.T) { tp := newFakeTransport() _ = NewClient(config.MCPServerConfig{Name: "test"}, tp) // 不设置 onClose,直接断连 tp.closeRecv() time.Sleep(50 * time.Millisecond) // 给 dispatchLoop 时间处理 // 无 panic 即通过 } // ── Manager.reconnectLoop 测试 ──────────────────────────────────────────────── // TestManager_Reconnect_AllFail_ServerRemoved 验证重连 5 次全部失败后: // 1. 服务器从 m.clients 和 toolCache 中移除 // 2. onServerRemoved 回调被调用 // 3. mcp_server_removed 事件被发送 func TestManager_Reconnect_AllFail_ServerRemoved(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) // 精妙之处(CLEVER): sleepFn 注入零延迟,测试毫秒内完成而非 31s-- // 生产代码路径不变(nil → time.After),测试专用快速路径通过此字段注入. m.sleepFn = func(_ time.Duration, ctx context.Context) bool { select { case <-ctx.Done(): return false default: return true // 立即返回,跳过实际等待 } } var connectCalls int32 m.connectFn = func(_ execenv.Executor, _ config.MCPServerConfig) (*Client, error) { atomic.AddInt32(&connectCalls, 1) return nil, fmt.Errorf("connection refused") } // 记录事件 var events []string var eventMu sync.Mutex m.SetEventEmitter(func(event string, _ map[string]any) { eventMu.Lock() events = append(events, event) eventMu.Unlock() }) // 记录 onServerRemoved 回调 removedCh := make(chan string, 1) m.SetOnServerRemoved(func(serverName string) { removedCh <- serverName }) // 手动插入已连接状态(模拟正常运行后断连) tp := newFakeTransport() client := NewClient(config.MCPServerConfig{Name: "alpha"}, tp) m.mu.Lock() m.clients["alpha"] = client m.serverCfgs["alpha"] = config.MCPServerConfig{Name: "alpha"} m.mu.Unlock() m.cacheMu.Lock() m.toolCache["alpha"] = []MCPTool{{Name: "tool1"}} m.cacheMu.Unlock() ctx, cancel := context.WithCancel(context.Background()) defer cancel() done := make(chan struct{}) go func() { m.reconnectLoop(ctx, "alpha", config.MCPServerConfig{Name: "alpha"}) close(done) }() select { case <-done: case <-time.After(5 * time.Second): t.Fatal("reconnect loop should finish quickly with zero-delay sleepFn") } // 验证:alpha 已从 clients 和 toolCache 中移除 m.mu.RLock() _, stillConnected := m.clients["alpha"] m.mu.RUnlock() if stillConnected { t.Error("server should be removed from clients after all reconnect attempts fail") } m.cacheMu.Lock() _, hasCache := m.toolCache["alpha"] m.cacheMu.Unlock() if hasCache { t.Error("tool cache should be cleared after server removed") } // 验证 onServerRemoved 回调 select { case name := <-removedCh: if name != "alpha" { t.Errorf("onServerRemoved: got %q, want %q", name, "alpha") } case <-time.After(2 * time.Second): t.Error("onServerRemoved callback should have been called") } // 验证 mcp_server_removed 事件 eventMu.Lock() found := false for _, e := range events { if e == "mcp_server_removed" { found = true } } eventMu.Unlock() if !found { t.Errorf("expected mcp_server_removed event, got: %v", events) } // 验证 connectFn 被调用了 5 次 if n := atomic.LoadInt32(&connectCalls); n != reconnectMaxAttempts { t.Errorf("connectFn should be called %d times, called %d", reconnectMaxAttempts, n) } } // TestManager_Reconnect_EventualSuccess 验证若干次失败后最终成功重连: // 1. 新 client 安装到 m.clients // 2. mcp_reconnected 事件被发送 func TestManager_Reconnect_EventualSuccess(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) // 零延迟 sleepFn,测试快速完成 m.sleepFn = func(_ time.Duration, ctx context.Context) bool { select { case <-ctx.Done(): return false default: return true } } // 精妙之处(CLEVER): atomic counter 实现"前 N 次失败,第 N+1 次成功"--无需全局状态. var callCount int32 const failFirst = 2 // 前 2 次失败,第 3 次成功 m.connectFn = func(_ execenv.Executor, cfg config.MCPServerConfig) (*Client, error) { n := atomic.AddInt32(&callCount, 1) if int(n) <= failFirst { return nil, fmt.Errorf("temporary error (attempt %d)", n) } return newAutoInitClient(t, cfg), nil } var reconnectedServer string var eventMu sync.Mutex m.SetEventEmitter(func(event string, data map[string]any) { eventMu.Lock() defer eventMu.Unlock() if event == "mcp_reconnected" { if s, ok := data["server"].(string); ok { reconnectedServer = s } } }) cfg := config.MCPServerConfig{Name: "beta"} ctx, cancel := context.WithCancel(context.Background()) defer cancel() done := make(chan struct{}) go func() { m.reconnectLoop(ctx, "beta", cfg) close(done) }() select { case <-done: case <-time.After(5 * time.Second): t.Fatal("reconnect loop should finish quickly with zero-delay sleepFn") } // 验证:beta 已重新连接 m.mu.RLock() _, ok := m.clients["beta"] m.mu.RUnlock() if !ok { t.Error("server should be in clients after successful reconnect") } eventMu.Lock() gotServer := reconnectedServer eventMu.Unlock() if gotServer != "beta" { t.Errorf("mcp_reconnected event: server = %q, want %q", gotServer, "beta") } } // TestManager_CloseAll_CancelsReconnect 验证 CloseAll 后重连 goroutine 停止运行. func TestManager_CloseAll_CancelsReconnect(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) // connectFn 阻塞直到 ctx 取消,模拟"重连中"状态 waitCh := make(chan struct{}) m.connectFn = func(_ execenv.Executor, cfg config.MCPServerConfig) (*Client, error) { <-waitCh return nil, fmt.Errorf("aborted") } cfg := config.MCPServerConfig{Name: "gamma"} m.serverCfgs["gamma"] = cfg // 启动重连(会阻塞在 connectFn 内) // 精妙之处(CLEVER): 先等待退避延迟(1s),再让 connectFn 阻塞-- // 为避免 1s 等待,直接调用 reconnectLoop 并立即 CloseAll 取消 ctx. // CloseAll 取消 reconnectCancel[name],reconnectLoop 在 ctx.Done() 处退出. ctx, cancel := context.WithCancel(context.Background()) m.reconnectMu.Lock() m.reconnectCancel["gamma"] = cancel m.reconnectMu.Unlock() loopDone := make(chan struct{}) go func() { m.reconnectLoop(ctx, "gamma", cfg) close(loopDone) }() // 取消(模拟 CloseAll 的行为) cancel() close(waitCh) // 释放阻塞的 connectFn select { case <-loopDone: // OK:goroutine 已退出 case <-time.After(3 * time.Second): t.Fatal("reconnect loop should have stopped after context cancellation") } } // TestManager_RemoveServerOnFailure_ClearsAllState 验证 removeServerOnFailure 完整清理: // clients,serverCfgs,toolCache,dirtySet,resourceCache 全部清除. func TestManager_RemoveServerOnFailure_ClearsAllState(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) tp := newFakeTransport() client := NewClient(config.MCPServerConfig{Name: "delta"}, tp) m.mu.Lock() m.clients["delta"] = client m.serverCfgs["delta"] = config.MCPServerConfig{Name: "delta"} m.mu.Unlock() m.cacheMu.Lock() m.toolCache["delta"] = []MCPTool{{Name: "t1"}} m.dirtySet["delta"] = true m.cacheMu.Unlock() m.resourceCache.Set("delta", "uri://r1", []ResourceContent{{URI: "uri://r1", Text: "data"}}) // 执行移除 m.removeServerOnFailure("delta") // 验证全部清理 m.mu.RLock() _, hasClient := m.clients["delta"] _, hasCfg := m.serverCfgs["delta"] m.mu.RUnlock() if hasClient { t.Error("client should be removed") } if hasCfg { t.Error("serverCfg should be removed") } m.cacheMu.Lock() _, hasCache := m.toolCache["delta"] _, hasDirty := m.dirtySet["delta"] m.cacheMu.Unlock() if hasCache { t.Error("toolCache should be cleared") } if hasDirty { t.Error("dirtySet should be cleared") } if _, ok := m.resourceCache.Get("delta", "uri://r1"); ok { t.Error("resourceCache should be cleared for removed server") } } // TestManager_Emit_NilEmitEvent_Silent 验证 emitEvent 为 nil 时 emit 不 panic. func TestManager_Emit_NilEmitEvent_Silent(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) // emitEvent 未设置,应静默跳过 m.emit("some_event", map[string]any{"key": "value"}) // 无 panic 即通过 } // TestManager_SetOnServerRemoved_CalledAsync 验证 onServerRemoved 是异步调用的 // (removeServerOnFailure 不阻塞等待回调完成). func TestManager_SetOnServerRemoved_CalledAsync(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) calledCh := make(chan string, 1) m.SetOnServerRemoved(func(serverName string) { time.Sleep(10 * time.Millisecond) // 模拟耗时回调 calledCh <- serverName }) // removeServerOnFailure 应立即返回,不等待 10ms 的回调 start := time.Now() m.removeServerOnFailure("epsilon") elapsed := time.Since(start) if elapsed > 5*time.Millisecond { t.Errorf("removeServerOnFailure should not block on callback, took %v", elapsed) } // 但回调最终会被调用 select { case name := <-calledCh: if name != "epsilon" { t.Errorf("callback got %q, want %q", name, "epsilon") } case <-time.After(2 * time.Second): t.Error("onServerRemoved callback should be called eventually") } } // TestManager_ConnectAll_StoresServerCfg 验证 ConnectAll 成功时 serverCfgs 被保存. // (serverCfg 是重连的配置来源) func TestManager_ConnectAll_StoresServerCfg(t *testing.T) { m := NewManager(execenv.DefaultExecutor{}) // 注入 connectFn 避免真实网络 m.connectFn = func(_ execenv.Executor, cfg config.MCPServerConfig) (*Client, error) { c := newAutoInitClient(t, cfg) return c, nil } // ConnectAll 内部不使用 connectFn(只有 reconnectLoop 用) // 所以这个测试验证 m.serverCfgs 在 ConnectAll 成功路径被填充 // 需要直接测 ConnectOne(它调用真实 Connect,不走 connectFn) // 改为:直接验证 reconnectLoop 使用 serverCfgs m.mu.Lock() m.serverCfgs["zeta"] = config.MCPServerConfig{Name: "zeta", Transport: "stdio"} m.mu.Unlock() m.mu.RLock() cfg, ok := m.serverCfgs["zeta"] m.mu.RUnlock() if !ok { t.Fatal("serverCfgs should contain zeta") } if cfg.Name != "zeta" { t.Errorf("serverCfgs[zeta].Name = %q, want %q", cfg.Name, "zeta") } }