// plugin_tool.go - Plugin 声明式 tool 注册 (manifest.tools 字段 → tools.Tool 实现). // // 模块定位: // 给 plugin 作者提供**除 MCP server 之外的第二条扩展 tool 路径**: // 在 plugin.json 的 tools 字段里直接声明一个 tool + 它的执行 command, // plugin loader 在加载时把每个声明翻译成实现 tools.Tool interface 的 // pluginShellTool 实例, 存放在 Plugin.Tools 里, 消费层 (Host / engine) // 再把它们注册到全局 tools.Registry. // // # 为什么同时需要 MCP server 和 declarative tool 两条路径? // // **MCP server** 适合: 多 tool / 需要状态 / 复杂协议 / 已有 MCP SDK 的场景. // 例如一个 "GitHub 集成" plugin 会有 list_issues / create_issue / add_comment // 等十几个 tool, 还要维护 OAuth 会话, 用 MCP server 最合适. // // **Declarative tool** 适合: 单 tool / 无状态 / 一个 shell 脚本搞定的场景. // 例如 "读取当前 git 分支名" 这种 tool, 写个 `git branch --show-current` // 就是全部逻辑, 再去起一个 MCP server 是过度设计. // // 两条路径**可以共存**: 一个 plugin 的 plugin.json 可以同时声明 mcpServers // 和 tools, 最终都通过 tools.Registry 暴露给 agent. // // # 执行语义 (Unix filter 范式) // // pluginShellTool.Execute 的行为: // 1. 创建 timeout-bounded context (默认 30s, 可在 manifest 覆盖) // 2. exec.CommandContext(Command, Args...), cmd.Dir = pluginDir (或 WorkDir) // 3. cmd.Env = 白名单 OS env (PATH/HOME/LANG/LC_ALL) ++ plugin 声明的 Env // (plugin 值覆盖白名单同名变量, ${VAR} 展开宿主 env, 见 pkg/execenv) // 4. cmd.Stdin = JSON 输入 (字节流, 让子进程自己 parse) // 5. cmd.Stdout → 捕获为 Result.Output // 6. cmd.Stderr → 失败时 append 到 Result.Output 方便 debug // 7. exit code == 0 → Result{IsError: false} // 8. exit code != 0 → Result{IsError: true}, 包含 stderr 内容 // 9. context 超时 → Result{IsError: true, Output: "timed out after ..."} // 10. exec 启动本身失败 (command not found) → return error (而非 Result) // // 精妙之处 (CLEVER): stdin=JSON / stdout=结果 / exit code=状态 是 Unix // filter 经典范式, 任何语言都能用最少 10 行代码实现一个合法的 Flyto // plugin tool. 替代方案: <环境变量传 input> - 否决, env var 在复杂嵌套 // JSON 场景下丑陋且有长度限制. <命令行参数传 input> - 否决, shell 转义地狱. // // # 命名空间 // // Tool 的最终 Name 是 `pluginName:toolName` (冒号分隔), 和 skills 的 // 命名空间规则一致, 避免不同 plugin 之间的同名冲突. tools.Registry 的 // Register 方法遇到同名直接返回错误, 是严格模式不是后覆盖先的宽松模式. // // # 工作目录 // // WorkDir 默认为 pluginDir (plugin 目录), 这样 plugin 作者写 "./script.py" // 之类的相对路径会 just work. 可以在 manifest 里覆盖为相对于 pluginDir 的 // 其他目录, 绝对路径也支持. // // # 未来扩展 (不在本次实现) // // - Streaming output: 目前 stdout 全部捕获后再返回, 不支持流式. // 需要时改 cmd.Stdout = io.MultiWriter(buf, progressAdapter). // - Async tool (长 running): 目前 30s 默认超时是同步模型. 可加一个 // AsyncMode 字段让 tool 启动后立即返回 job handle, 之后轮询. // - Input validation: 目前 InputSchema 只是数据字段, 不做 JSON Schema // validation. 消费层自己校验或信任 LLM 生成的参数合法. package plugin import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "path/filepath" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // PluginToolDef 是 plugin.json 的 tools 字段里的一个 tool 声明. // // 每个声明在 plugin 加载时被 loadPluginTools 翻译成一个 pluginShellTool // 并加入 Plugin.Tools 列表, 供消费层 (Host 或 engine 层) 注册到全局 // tools.Registry. // // JSON 示例: // // { // "name": "git_branch", // "description": "Return the current git branch name", // "command": "git", // "args": ["branch", "--show-current"], // "timeout_seconds": 5, // "read_only": true, // "permission_class": "readonly" // } type PluginToolDef struct { // Name 是 tool 的本地名称 (不含 plugin 前缀). // 最终注册到 Registry 的名字是 "pluginName:Name". // 必填, 空值会被 loadPluginTools 跳过. Name string `json:"name"` // Description 是给 LLM 的 tool 描述. 必填 (否则 LLM 不知道该不该调). Description string `json:"description"` // InputSchema 是 tool 参数的 JSON Schema. 可选, 缺省为空对象 schema // ({"type":"object","properties":{},"additionalProperties":true}), // 即"接受任意 JSON 对象"- 适合无参数或参数非常灵活的 tool. InputSchema json.RawMessage `json:"input_schema,omitempty"` // Command 是要执行的命令 (可以是 $PATH 里的名字如 "python", 或绝对/ // 相对路径如 "/usr/bin/env" / "./script.sh"). 必填. // 相对路径会相对于 WorkDir (默认为 pluginDir) 解析, 由 exec.Command 处理. Command string `json:"command"` // Args 是传给 Command 的命令行参数 (不含 command 本身). Args []string `json:"args,omitempty"` // Env 是要注入子进程环境变量的 key/value. 会叠加到白名单 OS env // (PATH/HOME/LANG/LC_ALL) 之上, 同名变量以 Env 里的为准. Value 支持 // ${HOST_VAR} / $HOST_VAR 展开宿主 env, 未设置的引用会导致 tool error. // 常用于传 API Key / endpoint / plugin config. 详见 pkg/execenv 包. Env map[string]string `json:"env,omitempty"` // WorkDir 是子进程的工作目录. 可选: // - 空字符串 (默认): 使用 pluginDir // - 相对路径: 相对于 pluginDir 解析 // - 绝对路径: 直接使用 WorkDir string `json:"work_dir,omitempty"` // TimeoutSeconds 是子进程执行的超时时间 (秒). <=0 或未指定 → 30s 默认. // 超时后进程被 SIGKILL, Execute 返回 IsError=true 的 Result. TimeoutSeconds int `json:"timeout_seconds,omitempty"` // PermissionClass 是权限检查类型 (参见 tools.Metadata.PermissionClass). // 空字符串默认 "bash" (命令执行类, 子命令分割 + 危险命令检测). PermissionClass string `json:"permission_class,omitempty"` // ConcurrencySafe 声明本 tool 是否可以与其他 tool 并发执行. // 保守默认 false (和 tools.GetMetadata 的 fallback 一致). ConcurrencySafe bool `json:"concurrency_safe,omitempty"` // ReadOnly 声明本 tool 是否只读 (无副作用). // 保守默认 false. ReadOnly bool `json:"read_only,omitempty"` // Destructive 声明本 tool 是否会造成不可逆的破坏 // (rm / DROP TABLE / kubectl delete 等). // 保守默认 false. Destructive bool `json:"destructive,omitempty"` } // DefaultPluginToolTimeout 是 PluginToolDef.TimeoutSeconds 未指定时的默认超时. // 30 秒对"快执行的 shell 命令"场景足够, 长 running 的 tool 应显式声明更长值 // (或使用 MCP server 路径, 它支持更丰富的生命周期管理). const DefaultPluginToolTimeout = 30 * time.Second // emptyObjectSchema 是 PluginToolDef.InputSchema 未指定时的默认 schema. // 语义是"接受任意 object", 适合无参数 tool. var emptyObjectSchema = json.RawMessage(`{"type":"object","properties":{},"additionalProperties":true}`) // pluginShellTool 实现 tools.Tool interface, 通过 Executor.Command 执行声明的 // shell 命令. 每次 Execute 启动一个新子进程 (无状态), 走 Unix filter 范式 // (stdin JSON / stdout 结果 / exit code 状态). // // executor 字段是方案 β 严格 DI 的体现 (M1 commit 7b): 构造时必须传非 nil // Executor, 本地模式传 execenv.DefaultExecutor{}, 云端模式传 // platform/common/internal/sandbox.Backend. nil 会在 NewPluginShellTool 里 panic, // 不做静默降级. type pluginShellTool struct { name string description string schema json.RawMessage command string args []string env map[string]string workDir string timeout time.Duration metadata tools.Metadata executor execenv.Executor } // Name 返回命名空间化后的 tool 名 ("pluginName:toolName"). func (t *pluginShellTool) Name() string { return t.name } // Description 返回静态描述 (不依赖 ctx, 声明式 tool 没有动态描述场景). func (t *pluginShellTool) Description(ctx context.Context) string { return t.description } // InputSchema 返回 manifest 声明的 schema (或默认空对象 schema). func (t *pluginShellTool) InputSchema() json.RawMessage { return t.schema } // Metadata 实现 tools.MetadataProvider interface, 让编排器和权限引擎识别 // tool 的并发/只读/破坏性/权限类等元数据. func (t *pluginShellTool) Metadata() tools.Metadata { return t.metadata } // Execute 执行 tool: 启动子进程, 传 JSON stdin, 收 stdout 作为结果. // // 精妙之处 (CLEVER): proc.Run() 的返回值和 ctx.Err() 必须同时检查. Run // 返回非 nil 时进程可能是正常退出 (非零 exit code) 或被 SIGKILL, 只有 // 对比 ctx.Err() == DeadlineExceeded 才能区分"命令失败"和"超时取消". // 替代方案: <用 Start + Wait 手动管理进程> - 否决, 代码更长且易漏清理. // // M1 commit 7b 起走 executor.Command 统一抽象, 进程组隔离 + ctx-cancel // 杀整棵树的生命周期由 Spec.IsolateProcessGroup 字段声明, 不再手工写 // SysProcAttr / cmd.Cancel / WaitDelay — 那些已经在 DefaultExecutor 里 // 封装. 云端 backend 将把 IsolateProcessGroup 映射到 microVM 天然边界. // // exit code 通过 duck-type `interface{ ExitCode() int }` 提取, 不假设 // 底层一定是 *exec.ExitError — 云端 backend 返回的 error 类型可能不同, // 但都会实现 ExitCode() int 方法. 这是 commit 10/11 以来稳定的套路. func (t *pluginShellTool) Execute(ctx context.Context, input json.RawMessage, progress tools.ProgressFunc) (*tools.Result, error) { // 为本次执行建一个带超时的子 context if t.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, t.timeout) defer cancel() } // Env 隔离 (见 pkg/execenv 文档): 不继承完整 os.Environ(), 只透传 // PATH / HOME / LANG / LC_ALL 四个必要变量 + plugin 声明的 Env. // 目的是防止 Flyto 主进程的 ANTHROPIC_API_KEY / OPENAI_API_KEY / // AWS_SECRET_ACCESS_KEY 等敏感 env 泄漏到不可信的 plugin tool 子进程. // // ${VAR} 展开: plugin 声明的 env 值支持 ${HOST_VAR} 语法引用宿主 env, // 未 set 的引用返回 tool error (启动期失败 > 静默降级为空字符串). // 详见 execenv.ExpandEnvMap 文档. expandedEnv, err := execenv.ExpandEnvMap(t.env) if err != nil { return &tools.Result{ Output: fmt.Sprintf("plugin tool %q: resolve env failed: %s", t.name, err), IsError: true, }, nil } // stdin: JSON 输入 (空输入时为 nil, 子进程读 stdin 会立即 EOF) var stdin io.Reader if len(input) > 0 { stdin = bytes.NewReader(input) } var stdout, stderr bytes.Buffer proc := t.executor.Command(ctx, execenv.Spec{ Class: execenv.ClassPluginTool, Path: t.command, Args: t.args, Env: execenv.MinimalEnvMap(expandedEnv), Stdin: stdin, Stdout: &stdout, Stderr: &stderr, WorkDir: t.workDir, IsolateProcessGroup: true, }) err = proc.Run() // 优先检查超时 (ctx.Err() 先于 exit error 判断, 否则 exit error 会掩盖超时原因) if errors.Is(ctx.Err(), context.DeadlineExceeded) { return &tools.Result{ Output: fmt.Sprintf("plugin tool %q timed out after %s\nstderr: %s", t.name, t.timeout, stderr.String()), IsError: true, }, nil } if err != nil { if ec, ok := err.(interface{ ExitCode() int }); ok { // 子进程正常启动但以非零 exit code 退出: 返回 IsError Result return &tools.Result{ Output: fmt.Sprintf("exit code %d\nstdout: %s\nstderr: %s", ec.ExitCode(), stdout.String(), stderr.String()), IsError: true, }, nil } // 启动失败 (command not found / permission denied): 返回 Go error // 而非 Result, 让上层区分"tool 执行出错" vs "tool 环境不可用" return nil, fmt.Errorf("plugin tool %q exec failed: %w", t.name, err) } return &tools.Result{ Output: stdout.String(), IsError: false, }, nil } // NewPluginShellTool 从 PluginToolDef 声明构造一个实现 tools.Tool interface 的实例. // // 公开 factory, 允许消费层在 SDK 嵌入场景里手动构造 tool (而不是走 plugin // loader 流程). 参数 pluginName 用于 tool 名命名空间化, 空字符串则不加前缀. // // 参数: // - def: PluginToolDef 声明 (来自 plugin.json 或手动构造) // - pluginDir: plugin 目录, 用于默认 WorkDir 和解析相对路径 // - pluginName: plugin 名, 用于命名空间化 (空则裸 tool 名) // - executor: 子进程启动抽象 (本地 = execenv.DefaultExecutor{}, 云端 = // platform sandbox.Backend). **必填, nil 直接 panic** — // 方案 β 严格 DI, 不做静默降级. // // 默认值填充: // - InputSchema 空 → emptyObjectSchema ({"type":"object",...}) // - WorkDir 空 → pluginDir // - WorkDir 相对 → filepath.Join(pluginDir, WorkDir) // - TimeoutSeconds <=0 → DefaultPluginToolTimeout (30s) // - PermissionClass 空 → "bash" (shell 命令是天然的 bash class) func NewPluginShellTool(def PluginToolDef, pluginDir, pluginName string, executor execenv.Executor) tools.Tool { if executor == nil { panic("plugin.NewPluginShellTool: executor must not be nil (方案 β 严格 DI, 本地模式传 execenv.DefaultExecutor{})") } name := def.Name if pluginName != "" { name = pluginName + ":" + def.Name } schema := def.InputSchema if len(schema) == 0 { schema = emptyObjectSchema } workDir := def.WorkDir switch { case workDir == "": workDir = pluginDir case !filepath.IsAbs(workDir): workDir = filepath.Join(pluginDir, workDir) } timeout := time.Duration(def.TimeoutSeconds) * time.Second if timeout <= 0 { timeout = DefaultPluginToolTimeout } permClass := def.PermissionClass if permClass == "" { permClass = "bash" } return &pluginShellTool{ name: name, description: def.Description, schema: schema, command: def.Command, args: def.Args, env: def.Env, workDir: workDir, timeout: timeout, metadata: tools.Metadata{ ConcurrencySafe: def.ConcurrencySafe, ReadOnly: def.ReadOnly, Destructive: def.Destructive, PermissionClass: permClass, }, executor: executor, } } // 说明: 原本定义在此的 minimalExecEnv 已迁移到 pkg/execenv 包, 由 // plugin tool 和 MCP stdio transport 共享. 迁移理由: 两者对子进程的 env // 策略必须一致, 否则会在 Flyto 目标场景 (B2B SaaS, 飞驼云仓等) 下形成 // "一个受控一个泄漏"的双轨事故. 详见 pkg/execenv/env.go 文件头. // loadPluginTools 把 manifest 里的 PluginToolDef 声明列表翻译成 tools.Tool // 运行时实例列表. 跳过非法声明 (Name 或 Command 为空), 不返回错误避免单个 // tool 错误阻断整个 plugin 加载 (和 MCPServers 的降级策略一致). // // executor 透传给 NewPluginShellTool, 由 loadPluginWithErrors → Host → // NewHost 链路注入. nil 在 NewPluginShellTool 里 panic, 这里不重复检查. // // 返回 nil slice (而非空 slice) 是 Go 惯例, 表示"没有 tool", 消费层用 len() 判断. func loadPluginTools(defs []PluginToolDef, pluginDir, pluginName string, executor execenv.Executor) []tools.Tool { if len(defs) == 0 { return nil } out := make([]tools.Tool, 0, len(defs)) for _, def := range defs { if def.Name == "" || def.Command == "" { continue } out = append(out, NewPluginShellTool(def, pluginDir, pluginName, executor)) } return out }