// stdio_integration_test.go - MCP stdio 真子进程集成测试. // // 这一组测试是 L512 的核心交付: 验证 StdioTransport -> Client -> Manager // 全链路在真实子进程上的 JSON-RPC 通信. 此前仅有 wiring 单元测试 (无真 stdio), // 本文件补齐了端到端验证. // // 技术方案: Go self-exec 模式 -- TestMain 检测 __FAKE_MCP_SERVER__ env var, // 将测试二进制分叉为 fake MCP server. 这是 Go 标准库 os/exec 包自身测试 // 使用的模式, 无外部依赖. // // 精妙之处 (CLEVER): self-exec 模式的 env 传递本身就验证了 execenv 隔离路径: // __FAKE_MCP_SERVER__ 必须通过 MCPServerConfig.Env + ${VAR} 展开才能到达子进程, // 如果 MinimalEnv 白名单或 ExpandEnvMap 有 bug, 测试直接启动不了. // // Fake server 提供三个工具: // - echo: 返回 "echo: ", 验证参数序列化和结果解析 // - env_check: 返回指定 env var 值 (或 __NOT_SET__), 验证 env 隔离 // - exit_server: 进程立即退出 code 42, 验证 crash 检测 package mcp import ( "bufio" "encoding/json" "fmt" "io" "os" "runtime" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // TestMain 双重角色: 正常运行时是测试入口, 子进程模式下变身 fake MCP server. // // 精妙之处 (CLEVER): 同一个二进制既是测试 runner 也是被测的 MCP server -- // 这是 Go os/exec 包自身测试使用的 self-exec 模式. 好处: (1) 无需编译额外 // binary, (2) fake server 代码与测试代码同文件, 修改原子性高, (3) JSON-RPC // 协议处理用 Go encoding/json, 远比 shell 脚本 sed 提取可靠. // // NOTE: 本包只能有一个 TestMain. 如果未来需要扩展, 在此函数内部加分支即可. func TestMain(m *testing.M) { if os.Getenv("__FAKE_MCP_SERVER__") == "1" { serveFakeMCP(os.Stdin, os.Stdout) os.Exit(0) } // 设置到宿主进程 env, 让子进程的 ${__FAKE_MCP_SERVER__} 展开能找到值. os.Setenv("__FAKE_MCP_SERVER__", "1") os.Exit(m.Run()) } // ── Fake MCP Server ──────────────────────────────────────────────────────── // serveFakeMCP 实现最小化 MCP server: JSON-RPC 2.0 over stdio. // // 协议流程: // // client -> initialize request // server <- initialize response (capabilities + serverInfo) // client -> notifications/initialized (no response needed) // client -> tools/list // server <- tools list // client -> tools/call // server <- tool result func serveFakeMCP(in io.Reader, out io.Writer) { scanner := bufio.NewScanner(in) scanner.Buffer(make([]byte, 64*1024), 1*1024*1024) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } var req struct { JSONRPC string `json:"jsonrpc"` ID json.RawMessage `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params,omitempty"` } if err := json.Unmarshal(line, &req); err != nil { continue // skip non-JSON lines } switch req.Method { case "initialize": writeFakeResponse(out, req.ID, map[string]any{ "protocolVersion": "2024-11-05", "capabilities": map[string]any{"tools": map[string]any{}}, "serverInfo": map[string]any{"name": "fake-mcp-server", "version": "0.1.0"}, }) case "notifications/initialized": // Notification -- no response needed. case "tools/list": writeFakeResponse(out, req.ID, map[string]any{ "tools": []any{ map[string]any{ "name": "echo", "description": "Echoes the message back", "inputSchema": map[string]any{ "type": "object", "properties": map[string]any{ "message": map[string]any{"type": "string"}, }, "required": []string{"message"}, }, }, map[string]any{ "name": "env_check", "description": "Reports env var value", "inputSchema": map[string]any{ "type": "object", "properties": map[string]any{ "var_name": map[string]any{"type": "string"}, }, "required": []string{"var_name"}, }, }, map[string]any{ "name": "exit_server", "description": "Exits the server process (crash simulation)", "inputSchema": map[string]any{ "type": "object", "properties": map[string]any{}, }, }, }, }) case "tools/call": handleFakeToolCall(out, req.ID, req.Params) default: if req.ID != nil { writeFakeError(out, req.ID, -32601, "method not found: "+req.Method) } } } } func handleFakeToolCall(out io.Writer, id json.RawMessage, rawParams json.RawMessage) { var params struct { Name string `json:"name"` Arguments json.RawMessage `json:"arguments"` } if err := json.Unmarshal(rawParams, ¶ms); err != nil { writeFakeError(out, id, -32600, "invalid params: "+err.Error()) return } switch params.Name { case "echo": var args struct { Message string `json:"message"` } json.Unmarshal(params.Arguments, &args) writeFakeResponse(out, id, map[string]any{ "content": []any{ map[string]any{"type": "text", "text": "echo: " + args.Message}, }, }) case "env_check": var args struct { VarName string `json:"var_name"` } json.Unmarshal(params.Arguments, &args) val, ok := os.LookupEnv(args.VarName) if !ok { val = "__NOT_SET__" } writeFakeResponse(out, id, map[string]any{ "content": []any{ map[string]any{"type": "text", "text": val}, }, }) case "exit_server": // Exit without responding -- simulates unexpected crash. os.Exit(42) default: writeFakeError(out, id, -32601, "unknown tool: "+params.Name) } } func writeFakeResponse(w io.Writer, id json.RawMessage, result any) { resp := struct { JSONRPC string `json:"jsonrpc"` ID json.RawMessage `json:"id"` Result any `json:"result"` }{"2.0", id, result} data, _ := json.Marshal(resp) fmt.Fprintf(w, "%s\n", data) } func writeFakeError(w io.Writer, id json.RawMessage, code int, message string) { resp := struct { JSONRPC string `json:"jsonrpc"` ID json.RawMessage `json:"id"` Error any `json:"error"` }{"2.0", id, map[string]any{"code": code, "message": message}} data, _ := json.Marshal(resp) fmt.Fprintf(w, "%s\n", data) } // ── Helpers ──────────────────────────────────────────────────────────────── func skipOnWindowsIntegration(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skip on windows: self-exec integration test not validated on windows") } } // newFakeServerConfig 返回以测试二进制自身为 MCP server 的配置. // // 精妙之处 (CLEVER): __FAKE_MCP_SERVER__ 必须通过 Env + ${VAR} 展开传递 -- // MinimalEnv 白名单不含它, 如果 ExpandEnvMap 或 MinimalEnvMap 有 bug, // 测试直接 fail (子进程不会进入 fake server 模式), 相当于用集成测试 // 同时验证了 env 隔离基础设施本身. func newFakeServerConfig(name string) config.MCPServerConfig { return config.MCPServerConfig{ Name: name, Command: os.Args[0], Args: []string{"-test.run=^$"}, Env: map[string]string{ "__FAKE_MCP_SERVER__": "${__FAKE_MCP_SERVER__}", }, } } // ── Integration Tests ────────────────────────────────────────────────────── // TestStdioIntegration_InitializeAndListTools 验证完整 MCP 握手 + 工具发现. // // 覆盖: NewStdioTransport -> Client.Initialize (initialize + initialized // notification) -> Client.ListTools -> JSON Schema 解析. func TestStdioIntegration_InitializeAndListTools(t *testing.T) { skipOnWindowsIntegration(t) cfg := newFakeServerConfig("test-init") client, err := Connect(execenv.DefaultExecutor{}, cfg) if err != nil { t.Fatalf("Connect: %v", err) } defer client.Close() if err := client.Initialize(); err != nil { t.Fatalf("Initialize: %v", err) } // Verify server info from initialize response. info := client.GetServerInfo() if info.Name != "fake-mcp-server" { t.Errorf("ServerInfo.Name = %q, want %q", info.Name, "fake-mcp-server") } if info.Version != "0.1.0" { t.Errorf("ServerInfo.Version = %q, want %q", info.Version, "0.1.0") } // List tools and verify all three are present. tools, err := client.ListTools() if err != nil { t.Fatalf("ListTools: %v", err) } wantTools := map[string]bool{"echo": false, "env_check": false, "exit_server": false} for _, tool := range tools { if _, ok := wantTools[tool.Name]; ok { wantTools[tool.Name] = true } } for name, found := range wantTools { if !found { t.Errorf("tool %q not found in tools/list response", name) } } } // TestStdioIntegration_CallTool 验证工具调用端到端: // 参数 JSON 序列化 -> 子进程解析 -> 执行 -> 结果 JSON -> 客户端解析. func TestStdioIntegration_CallTool(t *testing.T) { skipOnWindowsIntegration(t) cfg := newFakeServerConfig("test-call") client, err := Connect(execenv.DefaultExecutor{}, cfg) if err != nil { t.Fatalf("Connect: %v", err) } defer client.Close() if err := client.Initialize(); err != nil { t.Fatalf("Initialize: %v", err) } result, err := client.CallTool("echo", map[string]any{ "message": "hello from flyto", }) if err != nil { t.Fatalf("CallTool(echo): %v", err) } if len(result.Content) != 1 { t.Fatalf("echo: got %d content items, want 1", len(result.Content)) } if result.Content[0].Type != "text" { t.Errorf("echo: content type = %q, want %q", result.Content[0].Type, "text") } want := "echo: hello from flyto" if result.Content[0].Text != want { t.Errorf("echo: text = %q, want %q", result.Content[0].Text, want) } } // TestStdioIntegration_EnvIsolation 验证 execenv 白名单在真实子进程中的隔离效果. // // 三项检查: // 1. 宿主敏感 env (模拟 ANTHROPIC_API_KEY) 不泄漏到子进程 // 2. 显式声明的 env (MCPServerConfig.Env) 正确传递 // 3. 白名单 env (PATH) 正常可用 // // 与 stdio_env_test.go 的 TestStdioTransport_NoOsEnvLeak 互补: // 那个测试用 shell printf 验证, 本测试通过 MCP 协议验证 (更接近真实使用路径). func TestStdioIntegration_EnvIsolation(t *testing.T) { skipOnWindowsIntegration(t) // Simulate a host secret that must not leak. t.Setenv("FLYTO_INTEGRATION_CANARY_SECRET", "super-secret-should-never-leak") cfg := newFakeServerConfig("test-env") client, err := Connect(execenv.DefaultExecutor{}, cfg) if err != nil { t.Fatalf("Connect: %v", err) } defer client.Close() if err := client.Initialize(); err != nil { t.Fatalf("Initialize: %v", err) } // 1. Canary secret must NOT be visible in subprocess. result, err := client.CallTool("env_check", map[string]any{ "var_name": "FLYTO_INTEGRATION_CANARY_SECRET", }) if err != nil { t.Fatalf("CallTool(env_check canary): %v", err) } if result.Content[0].Text != "__NOT_SET__" { t.Errorf("SECURITY: host env leaked to subprocess: got %q, want __NOT_SET__", result.Content[0].Text) } // 2. Explicitly declared env must be passed through. result2, err := client.CallTool("env_check", map[string]any{ "var_name": "__FAKE_MCP_SERVER__", }) if err != nil { t.Fatalf("CallTool(env_check explicit): %v", err) } if result2.Content[0].Text != "1" { t.Errorf("explicit env not passed: got %q, want %q", result2.Content[0].Text, "1") } // 3. Whitelisted env (PATH) must be available. result3, err := client.CallTool("env_check", map[string]any{ "var_name": "PATH", }) if err != nil { t.Fatalf("CallTool(env_check PATH): %v", err) } if result3.Content[0].Text == "__NOT_SET__" { t.Error("whitelisted PATH should be set in subprocess but was __NOT_SET__") } } // TestStdioIntegration_ManagerConnectAndCallTool 验证 Manager 层全链路: // ConnectOne -> Initialize -> ListTools -> toolCache -> AllTools // (mcp__server__tool 格式) -> CallTool + CallToolByName 路由. func TestStdioIntegration_ManagerConnectAndCallTool(t *testing.T) { skipOnWindowsIntegration(t) mgr := NewManager(execenv.DefaultExecutor{}) defer mgr.CloseAll() cfg := newFakeServerConfig("") if err := mgr.ConnectOne("testsvr", cfg); err != nil { t.Fatalf("ConnectOne: %v", err) } // Verify mcp__server__tool naming in AllTools. tools := mgr.AllTools() found := false for _, tool := range tools { if tool.Name == "mcp__testsvr__echo" { found = true break } } if !found { names := make([]string, len(tools)) for i, tool := range tools { names[i] = tool.Name } t.Fatalf("mcp__testsvr__echo not in AllTools, got: %v", names) } // CallTool: route by server name + tool name. result, err := mgr.CallTool("testsvr", "echo", map[string]any{ "message": "via manager", }) if err != nil { t.Fatalf("Manager.CallTool: %v", err) } if result.Content[0].Text != "echo: via manager" { t.Errorf("Manager.CallTool: got %q, want %q", result.Content[0].Text, "echo: via manager") } // CallToolByName: route by full mcp__server__tool name. result2, err := mgr.CallToolByName("mcp__testsvr__echo", map[string]any{ "message": "by full name", }) if err != nil { t.Fatalf("Manager.CallToolByName: %v", err) } if result2.Content[0].Text != "echo: by full name" { t.Errorf("CallToolByName: got %q, want %q", result2.Content[0].Text, "echo: by full name") } } // TestStdioIntegration_SubprocessExit 验证子进程异常退出的检测和错误传播. // // 覆盖: exit_server 触发 os.Exit(42) -> StdioTransport 检测进程退出 -> // Client.CallTool 返回错误 -> IsAlive() false -> ProcessError() non-nil. func TestStdioIntegration_SubprocessExit(t *testing.T) { skipOnWindowsIntegration(t) cfg := newFakeServerConfig("test-exit") client, err := Connect(execenv.DefaultExecutor{}, cfg) if err != nil { t.Fatalf("Connect: %v", err) } defer client.Close() if err := client.Initialize(); err != nil { t.Fatalf("Initialize: %v", err) } if !client.IsAlive() { t.Fatal("client should be alive before crash") } // exit_server causes os.Exit(42) without responding. // CallTool should return error when the connection drops. client.SetRequestTimeout(3 * time.Second) _, err = client.CallTool("exit_server", nil) if err == nil { t.Fatal("CallTool(exit_server) should return error after subprocess crash") } // Poll for process exit detection (monitorProcess goroutine may lag slightly). deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if !client.IsAlive() { break } time.Sleep(10 * time.Millisecond) } if client.IsAlive() { t.Error("client should not be alive after subprocess exit") } if procErr := client.ProcessError(); procErr == nil { t.Error("ProcessError should be non-nil after subprocess crash") } }