// transport_test.go - Transport 接口及实现的单元测试. // // 覆盖: // - StaticTokenAuth / NoopAuth // - StdioTransport(通过 cat/sleep 命令) // - SSETransport 辅助函数(resolveRelativeURL) // - HTTPTransport 构造函数 // - parseToolFullName(manager.go) package mcp import ( "context" "io" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // ── AuthProvider 测试 ───────────────────────────────────────────────────────── func TestStaticTokenAuth_Headers(t *testing.T) { auth := &StaticTokenAuth{Token: "my-secret-token"} headers, err := auth.Headers(context.Background()) if err != nil { t.Fatalf("Headers: %v", err) } if got := headers["Authorization"]; got != "Bearer my-secret-token" { t.Errorf("Authorization = %q, want %q", got, "Bearer my-secret-token") } } func TestStaticTokenAuth_EmptyToken(t *testing.T) { auth := &StaticTokenAuth{Token: ""} headers, err := auth.Headers(context.Background()) if err != nil { t.Fatalf("Headers: %v", err) } if got := headers["Authorization"]; got != "Bearer " { t.Errorf("Authorization = %q, want \"Bearer \"", got) } } func TestNoopAuth_Headers(t *testing.T) { auth := &NoopAuth{} headers, err := auth.Headers(context.Background()) if err != nil { t.Fatalf("Headers: %v", err) } if headers != nil { t.Errorf("NoopAuth.Headers should return nil, got %v", headers) } } // ── StdioTransport 测试 ──────────────────────────────────────────────────────── // TestStdioTransport_SendRecv 通过 cat 命令验证 Send/Recv 端到端. // // 精妙之处(CLEVER): cat 将 stdin 原样转发到 stdout,天然实现 echo 语义, // 无需编写真实的 MCP 服务器进程即可测试传输层的完整数据路径. func TestStdioTransport_SendRecv(t *testing.T) { tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "cat-test", Command: "cat", }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } defer tp.Close() msg := []byte(`{"jsonrpc":"2.0","id":1,"method":"test"}`) if err := tp.Send(context.Background(), msg); err != nil { t.Fatalf("Send: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() got, err := tp.Recv(ctx) if err != nil { t.Fatalf("Recv: %v", err) } if string(got) != string(msg) { t.Errorf("Recv = %q, want %q", got, msg) } } // TestStdioTransport_MultipleSendRecv 验证多条消息顺序收发. func TestStdioTransport_MultipleSendRecv(t *testing.T) { tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "multi-test", Command: "cat", }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } defer tp.Close() messages := []string{ `{"id":1,"method":"a"}`, `{"id":2,"method":"b"}`, `{"id":3,"method":"c"}`, } for _, m := range messages { if err := tp.Send(context.Background(), []byte(m)); err != nil { t.Fatalf("Send %q: %v", m, err) } } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() for _, want := range messages { got, err := tp.Recv(ctx) if err != nil { t.Fatalf("Recv: %v", err) } if string(got) != want { t.Errorf("Recv = %q, want %q", got, want) } } } // TestStdioTransport_Close 测试 Close 后 Send 返回错误,Recv 返回 EOF. func TestStdioTransport_Close(t *testing.T) { tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "close-test", Command: "cat", }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } tp.Close() // Send 应返回错误 if err := tp.Send(context.Background(), []byte(`{}`)); err == nil { t.Error("Send after Close should return error") } // Recv 应立即返回错误(io.EOF 或进程退出错误,取决于 select 的调度顺序) // Close() 先关闭 done channel,再 SIGINT 进程;两种错误都表示连接已终止,均合法. ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) defer cancel() _, err = tp.Recv(ctx) if err == nil { t.Error("Recv after Close should return a non-nil error") } } // TestStdioTransport_DoubleClose 测试重复关闭安全(不 panic). func TestStdioTransport_DoubleClose(t *testing.T) { tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "double-close", Command: "cat", }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } tp.Close() tp.Close() // 不应 panic } // TestStdioTransport_EmptyCommand 测试空命令返回错误. func TestStdioTransport_EmptyCommand(t *testing.T) { _, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "empty-cmd", Command: "", }) if err == nil { t.Error("NewStdioTransport with empty command should fail") } } // TestStdioTransport_InvalidCommand 测试无效命令返回错误. func TestStdioTransport_InvalidCommand(t *testing.T) { _, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "invalid-cmd", Command: "/nonexistent/command/xyz", }) if err == nil { t.Error("NewStdioTransport with invalid command should fail") } } // TestStdioTransport_IsAlive 测试进程启动后 IsAlive 返回 true. func TestStdioTransport_IsAlive(t *testing.T) { tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "alive-test", Command: "cat", }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } defer tp.Close() if !tp.IsAlive() { t.Error("IsAlive should return true after start") } } // TestStdioTransport_Recv_ContextCancel 测试 ctx 取消后 Recv 立即返回. func TestStdioTransport_Recv_ContextCancel(t *testing.T) { // sleep 命令:永不输出,Recv 会一直阻塞直到 ctx 取消 tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "ctx-cancel", Command: "sleep", Args: []string{"10"}, }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } defer tp.Close() ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(100 * time.Millisecond) cancel() }() _, err = tp.Recv(ctx) if err != context.Canceled { t.Errorf("Recv should return context.Canceled, got %v", err) } } // ── SSETransport 辅助函数测试 ───────────────────────────────────────────────── func TestResolveRelativeURL_AbsolutePassthrough(t *testing.T) { got := resolveRelativeURL("https://base.com/sse", "https://other.com/endpoint") if got != "https://other.com/endpoint" { t.Errorf("absolute URL should pass through: got %q", got) } } func TestResolveRelativeURL_RootRelative(t *testing.T) { got := resolveRelativeURL("https://example.com/sse", "/messages") if got != "https://example.com/messages" { t.Errorf("got %q, want %q", got, "https://example.com/messages") } } func TestResolveRelativeURL_WithPort(t *testing.T) { got := resolveRelativeURL("http://localhost:8080/sse", "/api/messages") if got != "http://localhost:8080/api/messages" { t.Errorf("got %q, want %q", got, "http://localhost:8080/api/messages") } } func TestResolveRelativeURL_RelativePath(t *testing.T) { got := resolveRelativeURL("https://example.com/sse", "messages") if got != "https://example.com/messages" { t.Errorf("got %q, want %q", got, "https://example.com/messages") } } func TestSSETransport_EmptyURL(t *testing.T) { _, err := NewSSETransport("", nil) if err == nil { t.Error("NewSSETransport with empty URL should fail") } } // ── HTTPTransport 测试 ──────────────────────────────────────────────────────── func TestHTTPTransport_EmptyURL(t *testing.T) { _, err := NewHTTPTransport("", nil) if err == nil { t.Error("NewHTTPTransport with empty URL should fail") } } func TestHTTPTransport_NilAuth_UsesNoopAuth(t *testing.T) { tp, err := NewHTTPTransport("https://example.com/mcp", nil) if err != nil { t.Fatalf("NewHTTPTransport: %v", err) } defer tp.Close() headers, err := tp.auth.Headers(context.Background()) if err != nil { t.Fatalf("auth.Headers: %v", err) } if headers != nil { t.Errorf("NoopAuth.Headers should return nil, got %v", headers) } } func TestHTTPTransport_DoubleClose(t *testing.T) { tp, err := NewHTTPTransport("https://example.com/mcp", nil) if err != nil { t.Fatalf("NewHTTPTransport: %v", err) } tp.Close() tp.Close() // 不应 panic } func TestHTTPTransport_Recv_AfterClose(t *testing.T) { tp, err := NewHTTPTransport("https://example.com/mcp", nil) if err != nil { t.Fatalf("NewHTTPTransport: %v", err) } tp.Close() ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() _, err = tp.Recv(ctx) if err != io.EOF && err != context.DeadlineExceeded { t.Errorf("Recv after Close should return EOF or DeadlineExceeded, got %v", err) } } // ── parseToolFullName 测试 ──────────────────────────────────────────────────── func TestParseToolFullName_Valid(t *testing.T) { tests := []struct { input string wantServer string wantTool string }{ {"mcp__github__search_code", "github", "search_code"}, {"mcp__filesystem__read_file", "filesystem", "read_file"}, // 工具名本身含 __:只拆第一个 {"mcp__my_server__tool__with__underscores", "my_server", "tool__with__underscores"}, } for _, tc := range tests { srv, tool := parseToolFullName(tc.input) if srv != tc.wantServer || tool != tc.wantTool { t.Errorf("parseToolFullName(%q) = (%q, %q), want (%q, %q)", tc.input, srv, tool, tc.wantServer, tc.wantTool) } } } func TestParseToolFullName_NoPrefix(t *testing.T) { srv, tool := parseToolFullName("github/search") if srv != "" || tool != "github/search" { t.Errorf("parseToolFullName(no-prefix) = (%q, %q), want ('', 'github/search')", srv, tool) } } func TestParseToolFullName_OnlyPrefix(t *testing.T) { // mcp__github 没有工具名段 srv, tool := parseToolFullName("mcp__github") if srv != "" { t.Errorf("parseToolFullName(only-prefix) serverName = %q, want ''", srv) } _ = tool } func TestParseToolFullName_Empty(t *testing.T) { srv, tool := parseToolFullName("") if srv != "" || tool != "" { t.Errorf("parseToolFullName('') = (%q, %q), want ('', '')", srv, tool) } } // ── mcpToolPrefix 常量测试 ──────────────────────────────────────────────────── func TestMcpToolPrefix_Value(t *testing.T) { if mcpToolPrefix != "mcp__" { t.Errorf("mcpToolPrefix = %q, want %q", mcpToolPrefix, "mcp__") } }