// StdioTransport:通过子进程 stdin/stdout 与 MCP 服务器通信. // // 这是 MCP 规范要求的基础传输方式: // - 每条 JSON-RPC 消息占一行(newline-delimited JSON) // - 客户端向 stdin 写,从 stdout 读 // - stderr 仅用于服务器日志,引擎不解析 // // 早期实现将进程管理和 JSON-RPC 帧混在一个类里. // 我们的设计将进程管理单独封装为 StdioTransport,Client 层只看 Transport 接口, // 这使 StdioTransport 可以独立测试,也使 SSE/HTTP 传输可以无缝替换. // // 生命周期: // 1. NewStdioTransport(cfg) 启动进程,启动后台读取/监控 goroutine // 2. Send/Recv 正常使用 // 3. Close() 优雅关闭(先 SIGINT,3 秒后 SIGKILL) package mcp import ( "bufio" "context" "fmt" "io" "os" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" "git.flytoex.net/yuanwei/flyto-agent/pkg/logger" ) // StdioTransport 通过子进程 stdin/stdout 实现 Transport 接口. type StdioTransport struct { cfg config.MCPServerConfig // executor 是子进程启动器, 从 NewStdioTransport 注入 (方案 β 严格 DI). // 本地模式 DefaultExecutor (零开销包装 os/exec), 云端模式 sandbox.Backend // (E2B microVM). 2026-04-15 M1 commit 4b-3 引入. executor execenv.Executor // 进程资源. process 替代原 *exec.Cmd, Process interface 封装了 Start 前后 // 的状态检查, 调用方无需手动 nil-guard Process 字段. 详见 execenv.Process // interface 定义. process execenv.Process stdin io.WriteCloser stdout io.ReadCloser stderr io.ReadCloser // 精妙之处(CLEVER): recvCh 使用有界缓冲(32 条)而非无界-- // 背压天然传导:若 Client 消费慢,readLoop 阻塞在 <-recvCh 而非积压内存. // 32 条足以吸收突发,99% 的 MCP 服务器不会在一个 RTT 内推送 32 条消息. recvCh chan []byte // readLoop → Recv() 的消息管道 // 进程退出通知 processExited chan struct{} // 进程退出时关闭 processErr error // 进程退出错误(procMu 保护) procMu sync.Mutex // 生命周期 done chan struct{} // Close() 时关闭,通知所有 goroutine 退出 once sync.Once // 精妙之处(CLEVER): Process.Wait() 只能调用一次--底层 *exec.Cmd.Wait() 的 // DATA RACE 约束通过 Process interface 透传. monitorProcess 和 cleanupProcess // 都需要 Wait, 用 waitOnce 保证只执行一次. // 替代方案:<只在 monitorProcess 中 Wait> - 否决, cleanupProcess 的超时路径 // 也需要 Wait 确认进程已回收. waitOnce sync.Once waitErr error // process.Wait() 的返回值(waitOnce 后安全读取) writeMu sync.Mutex // 保护 stdin 并发写 // stderrLog 接收服务器 stderr 输出,每行转发为日志. // 升华改进(ELEVATED): 早期方案丢弃 stderr,服务端报错信息全丢失. // stderr 输出通常包含服务器启动警告,连接错误等关键诊断信息. // 替代方案:<将 stderr 合并到 stdout> - 否决:MCP stdout 是 JSON-RPC 流,混入日志破坏格式. stderrLog logger.Logger } // NewStdioTransport 启动 MCP 服务器子进程并返回就绪的传输实例. // // exec 是子进程启动抽象, 必填 (方案 β 严格 DI). 本地模式传 // execenv.DefaultExecutor{}, 云端模式传 sandbox.Backend{}. // // 返回后可立即调用 Send/Recv,无需额外的连接步骤. // // 注: 内部用 context.Background() 调 executor.Command - transport 生命周期 // 由 Close() 和 done channel 管理, 不依赖 ctx 取消. 未来 commit 4b-4 可能 // 升级为接受 ctx 参数做 lifecycle 统一, 届时改本签名. func NewStdioTransport(exec execenv.Executor, cfg config.MCPServerConfig) (*StdioTransport, error) { if cfg.Command == "" { return nil, fmt.Errorf("mcp: stdio transport: command is empty for server %q", cfg.Name) } t := &StdioTransport{ cfg: cfg, executor: exec, recvCh: make(chan []byte, 32), processExited: make(chan struct{}), done: make(chan struct{}), stderrLog: logger.Default(), } if err := t.startProcess(); err != nil { return nil, err } return t, nil } // startProcess 启动子进程并注册后台 goroutine. func (t *StdioTransport) startProcess() error { // Env 隔离 (见 pkg/execenv 文档): 不继承完整 os.Environ(), 只透传 // PATH / HOME / LANG / LC_ALL 白名单 + cfg.Env 显式声明的变量. 目的是 // 防止 Flyto 主进程的 ANTHROPIC_API_KEY / OPENAI_API_KEY / // AWS_SECRET_ACCESS_KEY 等敏感 env 泄漏到不可信的 MCP server 子进程. // // ${VAR} 展开: cfg.Env 值支持 ${HOST_VAR} 引用宿主 env, 让用户能显式 // 声明 "这个 API key 从宿主环境取" 而不是把密钥写死到 settings.json // (后者会被 dotfile 同步工具暴露). 未 set 的引用 → NewStdioTransport // 启动失败, 不静默降级. // // 关于 plugin-owned vs user-configured: 本策略对两者**一视同仁**. // 反向讨论后结论: Flyto 目标场景 (飞驼云仓等 B2B SaaS) 下"受信 user // 手配 MCP server"是假想客户, 双轨制的宽松轨是结构性死代码, 反而引 // 入跨场景行为不一致. 前缀区分 (parsePluginMCPServerKey) 只服务 // lifecycle 隔离, 和 env 策略正交. 详见 engine.go:1998 和 execenv 包 // 文档. // // 替代方案 (已否决): <按 server key 前缀区分, plugin-owned 走严格, // user-configured 继承 os.Environ> - 否决, 云仓等目标场景里 // user-configured 根本不存在, 维护双轨等于为不存在的客户支付成本. expandedEnv, err := execenv.ExpandEnvMap(t.cfg.Env) if err != nil { return fmt.Errorf("mcp: stdio: resolve env for server %q: %w", t.cfg.Name, err) } // 构造 Spec 并通过 Executor 起子进程. ClassPluginMCP 标记此进程为 // 第三方 plugin 的 MCP stdio subprocess (L511 威胁模型), 云端 backend // 据此应用最强隔离策略 (no-net, no-fs, 独立 microVM). // // context.Background() 的选择: transport 生命周期由 Close() 和 done // channel 管理, 不依赖 ctx 取消 - 原代码用 exec.Command (无 ctx) 是 // 语义等价. commit 4b-4 会统一重构 mcp 包的 ctx 参数, 那时改此处. spec := execenv.Spec{ Class: execenv.ClassPluginMCP, Path: t.cfg.Command, Args: t.cfg.Args, Env: execenv.MinimalEnvMap(expandedEnv), } process := t.executor.Command(context.Background(), spec) stdin, err := process.StdinPipe() if err != nil { return fmt.Errorf("mcp: stdio: create stdin pipe for server %q: %w", t.cfg.Name, err) } stdout, err := process.StdoutPipe() if err != nil { stdin.Close() return fmt.Errorf("mcp: stdio: create stdout pipe for server %q: %w", t.cfg.Name, err) } stderr, err := process.StderrPipe() if err != nil { stdin.Close() stdout.Close() return fmt.Errorf("mcp: stdio: create stderr pipe for server %q: %w", t.cfg.Name, err) } if err := process.Start(); err != nil { stdin.Close() stdout.Close() stderr.Close() return fmt.Errorf("mcp: stdio: start server %q (%s %v): %w", t.cfg.Name, t.cfg.Command, t.cfg.Args, err) } t.process = process t.stdin = stdin t.stdout = stdout t.stderr = stderr go t.readLoop() go t.monitorProcess() go t.handleStderr() // P1-E: 转发 stderr 到日志 return nil } // Send 向服务器进程的 stdin 写入一条消息(追加换行符). // // MCP stdio 传输规范:每条 JSON-RPC 消息以换行符终止. func (t *StdioTransport) Send(_ context.Context, msg []byte) error { select { case <-t.done: return fmt.Errorf("mcp: stdio transport closed (server: %q)", t.cfg.Name) default: } // 精妙之处(CLEVER): 在锁外做 done 检查(select default),锁内只做 Write-- // 避免锁内阻塞(stdin Write 在管道满时会阻塞),减少锁持有时间. // 竞争窗口(done 检查与 Write 之间)可接受:此时 transport 已在关闭流程中, // Write 会立即返回 broken pipe 错误. line := make([]byte, len(msg)+1) copy(line, msg) line[len(msg)] = '\n' t.writeMu.Lock() defer t.writeMu.Unlock() if _, err := t.stdin.Write(line); err != nil { return fmt.Errorf("mcp: stdio write to server %q: %w", t.cfg.Name, err) } return nil } // Recv 阻塞等待来自服务器 stdout 的一条消息. // // 四路 select:正常消息,进程退出(排空缓冲后报错),transport 关闭,ctx 取消. func (t *StdioTransport) Recv(ctx context.Context) ([]byte, error) { select { case msg, ok := <-t.recvCh: if !ok { // recvCh 被 readLoop 关闭,说明 stdout 已关闭(进程退出) return nil, io.EOF } return msg, nil case <-t.processExited: // 精妙之处(CLEVER): 进程退出时先尝试排空 recvCh 中已缓冲的消息, // 再返回错误.这样进程崩溃时最后几条响应不会丢失. select { case msg, ok := <-t.recvCh: if ok { return msg, nil } default: } t.procMu.Lock() err := t.processErr t.procMu.Unlock() if err != nil { return nil, err } return nil, io.EOF case <-t.done: return nil, io.EOF case <-ctx.Done(): return nil, ctx.Err() } } // Close 优雅关闭进程(先 SIGINT,3 秒后 SIGKILL)并释放所有资源. func (t *StdioTransport) Close() error { t.once.Do(func() { close(t.done) t.cleanupProcess() }) return nil } // IsAlive 返回服务器进程是否仍在运行. func (t *StdioTransport) IsAlive() bool { select { case <-t.processExited: return false default: return true } } // ProcessError 返回进程异常退出时的错误信息(进程正常运行时返回 nil). func (t *StdioTransport) ProcessError() error { t.procMu.Lock() defer t.procMu.Unlock() return t.processErr } // readLoop 后台循环:从 stdout 按行读取 JSON,投递到 recvCh. func (t *StdioTransport) readLoop() { scanner := bufio.NewScanner(t.stdout) // 历史包袱(LEGACY): 固定的 10MB scanner buffer-- // bufio.Scanner 默认仅 64KB,但 MCP 工具结果(代码搜索,文件读取)可达数 MB. // 正确做法是 bufio.Reader.ReadBytes('\n')(无固定上限),但 Scanner 的实现更简洁. // 10MB 足够日常使用;极端大响应应由 Client 层截断,Transport 层不做内容感知处理. // 如遇生产问题,将此 const 改为 cfg 中的可配置项. const maxLine = 10 * 1024 * 1024 scanner.Buffer(make([]byte, 64*1024), maxLine) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } // 精妙之处(CLEVER): 必须 copy--scanner.Bytes() 返回的 slice 指向 // scanner 内部 buffer,下次 Scan() 调用会覆盖它. msg := make([]byte, len(line)) copy(msg, line) select { case t.recvCh <- msg: case <-t.done: return } } // stdout 关闭(进程正常退出或 Close() 关闭了管道) close(t.recvCh) } // processWait 调用 process.Wait(),确保只执行一次,结果保存在 t.waitErr. // Process interface 内部封装了 Start 前后状态检查, 无需外层 nil guard. func (t *StdioTransport) processWait() error { t.waitOnce.Do(func() { t.waitErr = t.process.Wait() }) return t.waitErr } // monitorProcess 后台监控进程退出状态. func (t *StdioTransport) monitorProcess() { if t.process == nil { return } err := t.processWait() t.procMu.Lock() if err != nil { t.processErr = fmt.Errorf("mcp: server %q exited unexpectedly: %w", t.cfg.Name, err) } else { t.processErr = fmt.Errorf("mcp: server %q exited with status 0", t.cfg.Name) } t.procMu.Unlock() // 关闭信号:只关闭一次(进程可能在 Close() 触发后自然退出) select { case <-t.processExited: default: close(t.processExited) } } // handleStderr 从 stderr 读取并转发到日志. // // 每行作为一条 [INFO] 日志输出,前缀 [mcp-stdio stderr]. // // 精妙之处(CLEVER): scanner.Scan() 是阻塞调用,进程异常崩溃时 stderr pipe // 未必立即关闭(取决于内核缓冲区与 OS 回收时序),goroutine 会无限挂起. // 修复:将 scan 放入内部 goroutine,外层 select 同时监听 done channel-- // 任一先触发(pipe 自然关闭 或 Close() 主动关闭)则整个 handleStderr 退出. // // 原方案:<只靠 scanner.Scan() 返回 false 退出> // - 否决:进程崩溃时 stderr pipe 可能永不关闭,goroutine 永久泄漏. func (t *StdioTransport) handleStderr() { if t.stderr == nil || t.stderrLog == nil { return } // scanDone 在 scanner 自然结束(pipe 关闭/EOF)时关闭 scanDone := make(chan struct{}) go func() { defer close(scanDone) scanner := bufio.NewScanner(t.stderr) for scanner.Scan() { line := scanner.Text() if line != "" { t.stderrLog.Info("[mcp-stdio stderr] " + line) } } }() // 等待 stderr 自然结束 或 transport 关闭,两者都可退出 select { case <-scanDone: case <-t.done: } } // cleanupProcess 关闭所有 I/O 管道并强杀残留进程. func (t *StdioTransport) cleanupProcess() { // 先关闭 stdin:服务器检测到 EOF 后会自行退出 if t.stdin != nil { t.stdin.Close() } if t.stdout != nil { t.stdout.Close() } if t.stderr != nil { t.stderr.Close() } if t.process == nil { return } // 优雅关闭:先 SIGINT,给服务器 3 秒保存状态 t.process.Signal(os.Interrupt) done := make(chan struct{}) go func() { t.processWait() // 通过 waitOnce 保证与 monitorProcess 不竞争 close(done) }() select { case <-done: case <-time.After(3 * time.Second): // 超时强杀 t.process.Kill() } }