// stdio_env_test.go - StdioTransport 环境变量隔离和 ${VAR} 展开的测试. // // 这一组测试是 L511 安全修复的核心验证: 没有这些测试, 整个 env 隔离机制 // 就是"声称的保护"而非"验证过的保护". 命名和 pkg/plugin/plugin_tool_test.go // 的 TestPluginShellTool_NoOsEnvLeak 对齐, 方便 grep "NoOsEnvLeak" 定位全部 // env 隔离测试. package mcp import ( "context" "runtime" "strings" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // skipOnWindows 所有基于 /bin/sh + printf 的 fake stdio server 都不跑 Windows. // Flyto 的 Windows 支持本就是 best-effort (见 plugin_tool_windows.go 注释). func skipOnWindowsStdio(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skip on windows: test relies on /bin/sh + printf") } } // recvOneLine 给测试提供一个带超时的单行读取助手. func recvOneLine(t *testing.T, tp *StdioTransport, timeout time.Duration) string { t.Helper() ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() got, err := tp.Recv(ctx) if err != nil { t.Fatalf("Recv: %v", err) } return string(got) } // TestStdioTransport_NoOsEnvLeak 验证 Flyto 主进程的敏感 env **不会**泄漏 // 到 MCP stdio subprocess. 这是 L511 的安全核心测试. // // 场景: Flyto 主进程持有 ANTHROPIC_API_KEY 等模型厂商密钥; plugin 下发的 // MCP server 不可信, 必须看不到这些密钥. // // 做法: 注入一个模拟 "主进程密钥" 到当前测试进程的 env (t.Setenv 自动清理), // 启动一个 fake stdio server (echo env var 到 stdout 作为第一行, 然后降级 // 为 cat), 读第一行断言变量值为空. func TestStdioTransport_NoOsEnvLeak(t *testing.T) { skipOnWindowsStdio(t) // 模拟"主进程敏感 env" - 命名避免和真实密钥冲突 t.Setenv("FLYTO_STDIO_CANARY_XYZ", "supersecret-should-never-leak-to-mcp-subprocess") tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "env-leak-canary", Command: "/bin/sh", // 精妙之处 (CLEVER): 第一行 printf 变量值 (空则 printf 空串 + 换行), // 随后 exec cat 变成正常 echo 服务器. 这样既能测 env 隔离又不影响 // 后续 Send/Recv 基础设施 (虽然本测试不用). Args: []string{"-c", `printf "%s\n" "${FLYTO_STDIO_CANARY_XYZ:-__EMPTY__}"; exec cat`}, }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } defer tp.Close() line := recvOneLine(t, tp, 2*time.Second) if line != "__EMPTY__" { t.Errorf("SECURITY: Flyto 主进程 env 泄漏到 MCP subprocess: got %q (expected __EMPTY__)", line) } } // TestStdioTransport_PathLookupStillWorks 验证 PATH 在白名单里, 非绝对路径 // 命令仍能工作. 如果误把 PATH 一起过滤掉, NewStdioTransport 会 "command // not found" 失败. func TestStdioTransport_PathLookupStillWorks(t *testing.T) { skipOnWindowsStdio(t) // 使用 sh (非绝对路径) 依赖 PATH 查找 tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "path-lookup", Command: "sh", Args: []string{"-c", `printf "ok\n"; exec cat`}, }) if err != nil { t.Fatalf("PATH 查找失败, 白名单可能漏了 PATH: %v", err) } defer tp.Close() if got := recvOneLine(t, tp, 2*time.Second); got != "ok" { t.Errorf("expected 'ok', got %q", got) } } // TestStdioTransport_EnvVarExpansion 验证 ${VAR} 展开: 配置里写 // "${HOST_VAR}", 子进程看到的是宿主 env 里 HOST_VAR 的值. // // 这是 L511 另一半: 配合零信任白名单, 用户想让子进程看到 BRAVE_API_KEY // 必须在配置里显式写 "BRAVE_API_KEY": "${HOST_BRAVE_KEY}", 而不是隐式 // 继承. ${VAR} 是对宿主 env 的显式引用. func TestStdioTransport_EnvVarExpansion(t *testing.T) { skipOnWindowsStdio(t) t.Setenv("FLYTO_STDIO_TEST_HOST_BRAVE", "brave-abc-123") tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "env-expansion", Command: "/bin/sh", Env: map[string]string{ // 配置的值是字面 "${FLYTO_STDIO_TEST_HOST_BRAVE}", 展开后 // 子进程的 BRAVE_API_KEY 应当等于 "brave-abc-123". "BRAVE_API_KEY": "${FLYTO_STDIO_TEST_HOST_BRAVE}", }, Args: []string{"-c", `printf "%s\n" "$BRAVE_API_KEY"; exec cat`}, }) if err != nil { t.Fatalf("NewStdioTransport: %v", err) } defer tp.Close() if got := recvOneLine(t, tp, 2*time.Second); got != "brave-abc-123" { t.Errorf("${VAR} 展开失败: got %q, want brave-abc-123", got) } } // TestStdioTransport_MissingEnvVarFailsStartup 验证引用未设置的宿主 env 时, // NewStdioTransport 直接返回 error, 不启动子进程. 显式失败 > 静默降级. func TestStdioTransport_MissingEnvVarFailsStartup(t *testing.T) { // 故意不 Setenv - 让 ${FLYTO_DEFINITELY_NOT_SET_XYZ} 未定义 _, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "missing-env", Command: "/bin/sh", Args: []string{"-c", "cat"}, Env: map[string]string{ "WILL_FAIL": "${FLYTO_DEFINITELY_NOT_SET_XYZ_12345}", }, }) if err == nil { t.Fatal("expected error when env var is missing, got nil") } if !strings.Contains(err.Error(), "FLYTO_DEFINITELY_NOT_SET_XYZ_12345") { t.Errorf("error should name the missing var, got: %v", err) } if !strings.Contains(err.Error(), "missing-env") { t.Errorf("error should name the server, got: %v", err) } } // TestStdioTransport_ExplicitEmptyEnvVarIsOK 验证显式 set 为空 (export FOO=) // 不算未设置, 是合法值. 区分依赖 os.LookupEnv, os.Getenv 不能区分. func TestStdioTransport_ExplicitEmptyEnvVarIsOK(t *testing.T) { skipOnWindowsStdio(t) t.Setenv("FLYTO_STDIO_EXPLICIT_EMPTY", "") tp, err := NewStdioTransport(execenv.DefaultExecutor{}, config.MCPServerConfig{ Name: "explicit-empty", Command: "/bin/sh", Env: map[string]string{ "OPTIONAL_FLAG": "${FLYTO_STDIO_EXPLICIT_EMPTY}", }, Args: []string{"-c", `printf "%s\n" "[${OPTIONAL_FLAG}]"; exec cat`}, }) if err != nil { t.Fatalf("explicit empty should not fail: %v", err) } defer tp.Close() if got := recvOneLine(t, tp, 2*time.Second); got != "[]" { t.Errorf("expected '[]', got %q", got) } }