package engine // plan_command_server.go -- 计划命令服务器(模块 20.3 组件). // // 模块定位: // // PlanCommandServer 是 PlanQueue 的 UDS 网络入口. // 任何进程(Agent,CLI 客户端,远程脚本)通过 Unix Domain Socket 连接, // 发送 JSON 命令,获取 JSON 响应,无需与 Engine 共享内存空间. // // 与 UDSServer(inbox)的区别: // // UDSServer (inbox) = fire-and-forget,工具推送进度,不等响应. // PlanCommandServer = 请求-响应,客户端发命令,服务端必须回复,然后关闭连接. // 两者使用不同的 socket 文件(flyto-{sid}.sock vs flyto-plan-{sid}.sock). // // 协议: // // 请求:单条 JSON 行,字段 "command" 决定分发逻辑. // 响应:单条 JSON 行,必含 "ok"(bool) 字段;失败时附 "error"(string). // // submit_plan → {"ok": true, "plan_id": "plan-..."} // plan_status → {"ok": true, "plan": {...QueuedPlan...}} // plan_cancel → {"ok": true} // plan_list → {"ok": true, "plans": [...QueuedPlan...]} // // 安全: // // socket 文件权限 0600,只有同 UID 的进程可连接. // 每个命令最大读取 1MB 请求(防止超大 Steps 列表 OOM). // // 精妙之处(CLEVER): 每条命令 = 一次连接(无复用),与 UDSServer 保持一致-- // // 服务端无需维护"哪个连接在做哪个命令"的状态机. // 但不同于 UDSServer fire-and-forget--这里服务端在关闭连接前必须写回响应. // 替代方案:长连接+流水线(实现复杂,客户端需要匹配 request/response 序号). // // 升华改进(ELEVATED): sessionID 隔离-- // // socket 文件名含 sessionID,多 daemon 并行时互不干扰. // 替代方案:全局固定路径(/tmp/flyto-plan.sock,多实例冲突). import ( "context" "encoding/json" "errors" "fmt" "io" "net" "os" "path/filepath" "sync" "sync/atomic" ) // planCmdMaxRequestBytes 是单条请求的最大字节数(1MB). // // 精妙之处(CLEVER): 1MB 覆盖有上百个步骤的超大计划(每步 ~5KB × 200 = 1MB), // 同时防止恶意客户端发送 GB 级请求导致 OOM. // 替代方案:无限制(生产中已有工具误发超大 JSON 导致 OOM 的案例). const planCmdMaxRequestBytes = 1 << 20 // 1 MiB // ───────────────────────────────────────────────────────────────────────────── // 请求/响应结构体 // ───────────────────────────────────────────────────────────────────────────── // planCmdRequest 是客户端发送的命令请求. // // command 字段决定分发逻辑,其余字段按 command 按需填写: // // submit_plan : Steps + TimeoutSecs // plan_status : PlanID // plan_cancel : PlanID // plan_list : (无额外字段) type planCmdRequest struct { Command string `json:"command"` Steps []PlanStep `json:"steps,omitempty"` TimeoutSecs int `json:"timeout_secs,omitempty"` PlanID string `json:"plan_id,omitempty"` } // planCmdResponse 是服务端发回的响应. // 始终包含 Ok 字段;失败时附 Error;成功时附命令相关的数据字段. type planCmdResponse struct { Ok bool `json:"ok"` Error string `json:"error,omitempty"` PlanID string `json:"plan_id,omitempty"` Plan *QueuedPlan `json:"plan,omitempty"` Plans []*QueuedPlan `json:"plans,omitempty"` } // ───────────────────────────────────────────────────────────────────────────── // PlanCommandServer // ───────────────────────────────────────────────────────────────────────────── // PlanCommandServer 是 PlanQueue 的 Unix Domain Socket 请求-响应服务端. // // 生命周期: // 1. NewPlanCommandServer(sessionID, queue) → 创建实例 // 2. Start() → 开始监听(后台 acceptLoop goroutine) // 3. SockPath() → 获取 socket 路径(注入到客户端环境变量) // 4. Close() → 停止监听,清理 socket 文件 type PlanCommandServer struct { sockPath string queue PlanQueue listener net.Listener mu sync.Mutex // 保护 listener 字段 started atomic.Bool // 防重复 Start closed atomic.Bool // 防重复 Close once sync.Once // Close 幂等 done chan struct{} // acceptLoop 退出信号 } // planMaxSockPathLen 是 UDS socket 路径长度上限, 对齐 macOS 最严值(104 字节). // Linux 是 108, 按 macOS 为准. 超过时 net.Listen("unix", path) 返回 // "invalid argument", 所以主动检测并 fallback 到 os.TempDir(). // 与 inbox/uds_server.go maxSockPathLen 保持一致. const planMaxSockPathLen = 104 // resolvePlanSockPath 选择 socket 文件路径. // // 精妙之处(CLEVER): 两阶段策略, 与 inbox/uds_server.go resolveSockPath 对齐-- // // 阶段 1: os.UserCacheDir()/flyto/flyto-plan-.sock // 用户私有 cache 目录, 只有当前用户可访问, 符合最小暴露原则. // 阶段 2: os.TempDir()/flyto-plan-.sock // 触发条件: UserCacheDir 不可用或路径超 planMaxSockPathLen. // os.TempDir 尊重 $TMPDIR, 比硬编码 /tmp 安全(运维可重定向到 // $XDG_RUNTIME_DIR 等 user-private tmpfs). // // 替代方案: 硬编码 /tmp(多用户机器 sessionID 元数据对所有用户可见, // 且存在 TOCTOU symlink 攻击面). 已于 2026-04-15 修复. func resolvePlanSockPath(safeSuffix string) string { if cacheDir, err := os.UserCacheDir(); err == nil { flytoDir := filepath.Join(cacheDir, "flyto") if err := os.MkdirAll(flytoDir, 0700); err == nil { candidate := filepath.Join(flytoDir, "flyto-plan-"+safeSuffix+".sock") if len(candidate) <= planMaxSockPathLen { return candidate } } } // Fallback: os.TempDir() respects $TMPDIR, unlike hardcoded /tmp. return filepath.Join(os.TempDir(), fmt.Sprintf("flyto-plan-%s.sock", safeSuffix)) } // NewPlanCommandServer 创建计划命令服务器. // // sessionID 用于生成唯一的 socket 文件名, 路径由 resolvePlanSockPath 决定 // (优先用户私有 cache 目录, fallback 到 os.TempDir). // 与 UDSServer 使用不同前缀(flyto-plan- vs flyto-), 避免路径冲突. // queue 是底层计划队列, nil 会导致所有命令返回 "queue not available". func NewPlanCommandServer(sessionID string, queue PlanQueue) (*PlanCommandServer, error) { if sessionID == "" { return nil, errors.New("plan_command_server: sessionID must not be empty") } // 历史包袱(LEGACY): UDS socket 路径限制 104 字节(macOS sun_path). // sessionID 截取前 maxSessionIDSuffix 字节, 与 UDSServer 保持一致的安全策略. const maxSessionIDSuffix = 40 safeSuffix := sessionID if len(safeSuffix) > maxSessionIDSuffix { safeSuffix = safeSuffix[:maxSessionIDSuffix] } sockPath := resolvePlanSockPath(safeSuffix) return &PlanCommandServer{ sockPath: sockPath, queue: queue, done: make(chan struct{}), }, nil } // Start 开始监听 UDS.最多调用一次,必须在 Close() 之前. func (s *PlanCommandServer) Start() error { if s.closed.Load() { return errors.New("plan_command_server: already closed") } if !s.started.CompareAndSwap(false, true) { return errors.New("plan_command_server: already started") } // 清理上次崩溃遗留的 socket 文件,避免 "address already in use". _ = os.Remove(s.sockPath) ln, err := net.Listen("unix", s.sockPath) if err != nil { s.started.Store(false) return fmt.Errorf("plan_command_server: listen %s: %w", s.sockPath, err) } // 精妙之处(CLEVER): 设置 socket 文件权限为 0600-- // net.Listen 创建的 socket 默认是 0777(受 umask 过滤后通常是 0755). // 显式 Chmod 确保只有同 UID 的进程可读写,防止同机其他用户发命令. // 替代方案:依赖 umask(行为不可预测,不同系统默认值不同). if err := os.Chmod(s.sockPath, 0o600); err != nil { _ = ln.Close() s.started.Store(false) return fmt.Errorf("plan_command_server: chmod %s: %w", s.sockPath, err) } s.mu.Lock() s.listener = ln s.mu.Unlock() go s.acceptLoop() return nil } // SockPath 返回 socket 文件路径. // 客户端通过 FLYTO_PLAN_SOCK 环境变量获取此路径并连接. func (s *PlanCommandServer) SockPath() string { return s.sockPath } // Close 停止服务器,清理 socket 文件.幂等,多次调用安全. func (s *PlanCommandServer) Close() error { var closeErr error s.once.Do(func() { s.closed.Store(true) s.mu.Lock() ln := s.listener s.mu.Unlock() if ln != nil { if err := ln.Close(); err != nil { closeErr = fmt.Errorf("plan_command_server: close listener: %w", err) } } // 等待 acceptLoop 退出(它会在 Accept 返回错误后退出) <-s.done _ = os.Remove(s.sockPath) }) return closeErr } // ───────────────────────────────────────────────────────────────────────────── // 内部方法 // ───────────────────────────────────────────────────────────────────────────── // acceptLoop 是主接受循环. // 每个连接分配独立 goroutine(连接生命周期很短:一次命令就关闭). func (s *PlanCommandServer) acceptLoop() { defer close(s.done) s.mu.Lock() ln := s.listener s.mu.Unlock() for { conn, err := ln.Accept() if err != nil { if s.closed.Load() { return // 正常关闭 } // 非致命错误:继续接受下一个连接 continue } go s.handleConn(conn) } } // handleConn 处理单个连接:读请求 → 分发 → 写响应 → 关闭. // // 精妙之处(CLEVER): 请求-响应完全同步-- // // 服务端在 writeResponse 之后关闭连接,客户端只需读到 EOF 就知道响应完整. // 无需 Content-Length 或 newline 分帧:每条连接只有一次请求和一次响应. // 替代方案:长连接 + JSON 行分帧(客户端需处理半包/多包,实现复杂). func (s *PlanCommandServer) handleConn(conn net.Conn) { defer conn.Close() // 读请求(限制大小防 OOM) limited := io.LimitReader(conn, planCmdMaxRequestBytes) var req planCmdRequest if err := json.NewDecoder(limited).Decode(&req); err != nil { s.writeResponse(conn, planCmdResponse{ Ok: false, Error: fmt.Sprintf("invalid request: %v", err), }) return } resp := s.dispatch(req) s.writeResponse(conn, resp) } // dispatch 分发命令到 PlanQueue 对应方法. func (s *PlanCommandServer) dispatch(req planCmdRequest) planCmdResponse { if s.queue == nil { return planCmdResponse{Ok: false, Error: "plan queue not available"} } switch req.Command { case "submit_plan": return s.cmdSubmitPlan(req) case "plan_status": return s.cmdPlanStatus(req) case "plan_cancel": return s.cmdPlanCancel(req) case "plan_list": return s.cmdPlanList() default: return planCmdResponse{ Ok: false, Error: fmt.Sprintf("unknown command: %q", req.Command), } } } // cmdSubmitPlan 处理 submit_plan 命令. func (s *PlanCommandServer) cmdSubmitPlan(req planCmdRequest) planCmdResponse { if len(req.Steps) == 0 { return planCmdResponse{Ok: false, Error: "steps must not be empty"} } planID, err := s.queue.Submit(req.Steps, PlanSubmitOptions{ TimeoutSecs: req.TimeoutSecs, }) if err != nil { // 精妙之处(CLEVER): 区分"队列满"和"其他错误"-- // 队列满是正常的流量控制,客户端可以稍后重试. // 其他错误(磁盘满,权限等)是系统性问题,需要人工介入. // 错误信息直接透传,让客户端日志可区分. return planCmdResponse{Ok: false, Error: err.Error()} } return planCmdResponse{Ok: true, PlanID: planID} } // cmdPlanStatus 处理 plan_status 命令. func (s *PlanCommandServer) cmdPlanStatus(req planCmdRequest) planCmdResponse { if req.PlanID == "" { return planCmdResponse{Ok: false, Error: "plan_id is required"} } plan, err := s.queue.Status(req.PlanID) if err != nil { if errors.Is(err, ErrPlanNotFound) { return planCmdResponse{Ok: false, Error: "plan not found"} } return planCmdResponse{Ok: false, Error: err.Error()} } return planCmdResponse{Ok: true, Plan: plan} } // cmdPlanCancel 处理 plan_cancel 命令. func (s *PlanCommandServer) cmdPlanCancel(req planCmdRequest) planCmdResponse { if req.PlanID == "" { return planCmdResponse{Ok: false, Error: "plan_id is required"} } if err := s.queue.Cancel(req.PlanID); err != nil { if errors.Is(err, ErrPlanNotFound) { return planCmdResponse{Ok: false, Error: "plan not found"} } if errors.Is(err, ErrPlanTerminal) { return planCmdResponse{Ok: false, Error: "plan already in terminal state"} } return planCmdResponse{Ok: false, Error: err.Error()} } return planCmdResponse{Ok: true} } // cmdPlanList 处理 plan_list 命令. func (s *PlanCommandServer) cmdPlanList() planCmdResponse { plans, err := s.queue.List() if err != nil { return planCmdResponse{Ok: false, Error: err.Error()} } if plans == nil { plans = []*QueuedPlan{} // 返回空数组而非 null } return planCmdResponse{Ok: true, Plans: plans} } // writeResponse 将响应 JSON 写入连接. // 写失败静默忽略(连接可能已被对方关闭). func (s *PlanCommandServer) writeResponse(conn net.Conn, resp planCmdResponse) { data, err := json.Marshal(resp) if err != nil { // marshal 失败(理论上不可能,所有字段类型都可序列化) // 兜底:写一个最小错误响应 data = []byte(`{"ok":false,"error":"internal marshal error"}`) } // 写完整的 JSON 后加换行,方便客户端用 bufio.Scanner 按行读取 _, _ = conn.Write(append(data, '\n')) } // ───────────────────────────────────────────────────────────────────────────── // Client 辅助(供测试和 CLI 客户端使用) // ───────────────────────────────────────────────────────────────────────────── // SendPlanCommand 是连接到 PlanCommandServer 并发送单条命令的辅助函数. // // 升华改进(ELEVATED): 将 "建连→写请求→读响应→断开" 封装为一步, // CLI 脚本和测试可以直接调用,无需手动管理连接生命周期. // 替代方案:让调用方自己 net.Dial + json.Encode + json.Decode(模板代码过多). func SendPlanCommand(ctx context.Context, sockPath string, req planCmdRequest) (*planCmdResponse, error) { var d net.Dialer conn, err := d.DialContext(ctx, "unix", sockPath) if err != nil { return nil, fmt.Errorf("plan_command: dial %s: %w", sockPath, err) } defer conn.Close() if err := json.NewEncoder(conn).Encode(req); err != nil { return nil, fmt.Errorf("plan_command: write request: %w", err) } // 通知服务端请求已结束(单工:我们写完就关写端) // 精妙之处(CLEVER): CloseWrite 让服务端的 json.Decoder 读到 EOF, // 而连接的读方向仍然打开以接收响应. // 替代方案:整条连接 Close(导致服务端 write 失败,读不到响应). if tc, ok := conn.(*net.UnixConn); ok { _ = tc.CloseWrite() } var resp planCmdResponse if err := json.NewDecoder(conn).Decode(&resp); err != nil { return nil, fmt.Errorf("plan_command: read response: %w", err) } return &resp, nil }