// plan_queue.go -- 异步计划队列(模块 20.3). // // 解决的问题: // 交互式 PlanMode(模块 17)要求用户在场审批并等待执行完成. // 对于"极大的计划要干很久"的场景(重构 100 个文件,批量数据迁移), // 发起方(Agent 或人)不能被阻塞:提交后应继续做其他事,异步轮询结果. // // 这与 SubAgent 的本质区别: // SubAgent = 同步 fork,Orchestrator goroutine 阻塞等待子 Agent 完成. // PlanQueue = 异步队列,提交立即返回,daemon 后台执行,任何时候都能查状态. // // 核心设计决策: // // 1. 状态文件持久化(~/.flyto/plans/{id}.json) // Daemon 崩溃重启后,pending/running 状态可恢复(RecoverPending). // 客户端不需要维持连接--直接读文件轮询,或通过 PlanCommandServer 查询. // 反向思考:内存队列更简单,但一次崩溃就丢失所有待执行计划. // // 2. 原子写入(write-then-rename) // 防止崩溃产生截断的 JSON(复用 compact_persist.go 的模式). // // 3. PlanExecFunc 注入(依赖反转) // FilePlanQueue 不导入 Engine,只持有一个 func. // Engine 在初始化时注入自己的执行逻辑,测试可用 mock. // 替代方案:FilePlanQueue 直接持有 *Engine 指针(循环依赖,测试困难). // // 4. per-plan 超时 + context 取消 // 执行超时后 context 被取消,execFunc 负责响应取消信号. // 默认 30 分钟(1800 秒),可通过 PlanSubmitOptions 覆盖. // // 5. TTL 清理(24 小时) // done/failed/cancelled 的计划文件 24 小时后自动删除. // 后台 goroutine 每小时扫描一次,防止 ~/.flyto/plans/ 无限膨胀. // // 对应早期方案:早期实现 无此功能,这是纯升华设计(见讨论 20.3). // 业界参考:Celery AsyncResult,Temporal WorkflowRun,GitHub Actions run_id 模式. package engine import ( "context" "crypto/rand" "encoding/hex" "encoding/json" "fmt" "os" "path/filepath" "strings" "sync" "time" ) // ───────────────────────────────────────────────────────────────────── // PlanStatus 状态机 // ───────────────────────────────────────────────────────────────────── // PlanStatus 是异步计划的生命周期状态. // // 状态转换: // // pending → running → done // pending → cancelled // running → done // running → failed // running → cancelled // // 精妙之处(CLEVER): 状态机只有单向转换--done/failed/cancelled 是终态, // 不能回退.RecoverPending 将 running 重置为 pending(唯一的"逆向"操作), // 仅在 daemon 启动时执行,代表"崩溃恢复"而非正常状态转换. type PlanStatus string const ( PlanStatusPending PlanStatus = "pending" PlanStatusRunning PlanStatus = "running" PlanStatusDone PlanStatus = "done" PlanStatusFailed PlanStatus = "failed" PlanStatusCancelled PlanStatus = "cancelled" ) // planResultTTL 是终态计划文件的保留时长.超过后后台清理 goroutine 删除文件. // 精妙之处(CLEVER): 24 小时足够用户隔夜查看结果,又不让 ~/.flyto/plans/ 无限膨胀. const planResultTTL = 24 * time.Hour // planDefaultTimeoutSecs 是计划执行的默认超时(秒). // 30 分钟足以覆盖大多数重构/迁移任务,同时防止挂起的计划永久占用 daemon. const planDefaultTimeoutSecs = 1800 // planQueueCapacity 是内存队列的缓冲容量. // 超过此数量的并发提交会被拒绝(返回 ErrQueueFull). // 64 是经验值:覆盖批量自动化场景,同时防止无限接受. const planQueueCapacity = 64 // ───────────────────────────────────────────────────────────────────── // 核心数据结构 // ───────────────────────────────────────────────────────────────────── // StepExecStatus 是单个步骤的执行状态. type StepExecStatus string const ( StepExecPending StepExecStatus = "pending" StepExecRunning StepExecStatus = "running" StepExecDone StepExecStatus = "done" StepExecSkipped StepExecStatus = "skipped" // 因依赖失败而跳过 StepExecFailed StepExecStatus = "failed" ) // QueuedPlan 是一个已提交等待异步执行的计划. // 此结构体序列化为 JSON 文件(~/.flyto/plans/{id}.json),是跨进程通信的唯一载体. type QueuedPlan struct { // ID 全局唯一标识符,格式 "plan-{timestamp_ns}-{random_hex}". // 精妙之处(CLEVER): 时间戳前缀保证按提交顺序排序(ls 即可看到顺序). ID string `json:"id"` // Steps 计划的结构化步骤列表(来自 PlanStep,复用模块 17 的类型). Steps []PlanStep `json:"steps"` // Status 当前状态(状态机见上方注释). Status PlanStatus `json:"status"` // SubmittedAt 提交时间(UTC). SubmittedAt time.Time `json:"submitted_at"` // StartedAt 开始执行时间,nil 表示尚未开始. StartedAt *time.Time `json:"started_at,omitempty"` // FinishedAt 完成时间(成功/失败/取消),nil 表示尚未完成. FinishedAt *time.Time `json:"finished_at,omitempty"` // ErrorMsg 失败原因(仅 failed 状态有值). ErrorMsg string `json:"error,omitempty"` // TimeoutSecs 单次执行超时秒数(默认 planDefaultTimeoutSecs). TimeoutSecs int `json:"timeout_secs"` // StepStatuses 各步骤的执行状态,key = PlanStep.ID. // 升华改进(ELEVATED): 早期实现 PlanMode 没有 per-step 状态追踪, // 只有整体进度.我们按步骤粒度记录,客户端可以显示精确进度条. StepStatuses map[string]StepExecStatus `json:"step_statuses,omitempty"` // CurrentStepID 当前正在执行的步骤 ID. CurrentStepID string `json:"current_step_id,omitempty"` } // IsTerminal 返回该计划是否处于终态(不可再变更). func (p *QueuedPlan) IsTerminal() bool { switch p.Status { case PlanStatusDone, PlanStatusFailed, PlanStatusCancelled: return true default: return false } } // StepsDone 返回已完成的步骤数. func (p *QueuedPlan) StepsDone() int { n := 0 for _, s := range p.StepStatuses { if s == StepExecDone || s == StepExecSkipped { n++ } } return n } // ───────────────────────────────────────────────────────────────────── // PlanQueue 接口 // ───────────────────────────────────────────────────────────────────── // PlanSubmitOptions 是计划提交的可选参数. type PlanSubmitOptions struct { // TimeoutSecs 单次执行超时(秒),0 使用默认值 planDefaultTimeoutSecs. TimeoutSecs int } // PlanExecFunc 是计划执行函数的类型. // // 调用方(通常是 Engine)注入此函数,实现具体的步骤执行逻辑. // FilePlanQueue 不感知执行细节(只管队列和状态). // // 参数: // - ctx:带超时的 context,execFunc 必须响应取消信号 // - plan:待执行的计划(含 Steps) // - onStepDone:每个步骤完成时回调,stepID 是完成的步骤 ID,err 非 nil 表示步骤失败 // // 升华改进(ELEVATED): onStepDone 回调让 FilePlanQueue 实时更新状态文件, // 客户端轮询时能看到逐步进度,而非只有最终结果. // 替代方案:execFunc 只返回整体 error(无中间状态,轮询看不到进度). type PlanExecFunc func(ctx context.Context, plan *QueuedPlan, onStepDone func(stepID string, err error)) error // PlanQueue 是异步计划队列接口. // // 实现:FilePlanQueue(文件持久化). // 扩展:可实现 RedisPlanQueue(多副本共享),MemoryPlanQueue(测试). type PlanQueue interface { // Submit 提交一个计划进入队列,立即返回 planID. // 如果队列已满返回 ErrPlanQueueFull. Submit(steps []PlanStep, opts PlanSubmitOptions) (planID string, err error) // Status 查询计划当前状态.planID 不存在返回 ErrPlanNotFound. Status(planID string) (*QueuedPlan, error) // Cancel 取消一个 pending 或 running 的计划. // 已是终态的计划返回 ErrPlanTerminal. Cancel(planID string) error // List 返回所有计划的列表(按提交时间倒序). List() ([]*QueuedPlan, error) // RecoverPending 在 daemon 启动时调用:扫描文件目录, // 将状态为 running 的计划(daemon 崩溃前的遗留)重置为 pending 并重新入队. RecoverPending() error // Close 关闭队列,等待当前执行中的计划完成(或超时). Close() error } // ErrPlanNotFound 计划 ID 不存在. var ErrPlanNotFound = fmt.Errorf("plan queue: plan not found") // ErrPlanTerminal 计划已处于终态,无法取消. var ErrPlanTerminal = fmt.Errorf("plan queue: plan already in terminal state") // ErrPlanQueueFull 队列已满,拒绝新提交. var ErrPlanQueueFull = fmt.Errorf("plan queue: queue is full") // ───────────────────────────────────────────────────────────────────── // FilePlanQueue 实现 // ───────────────────────────────────────────────────────────────────── // FilePlanQueue 将计划状态持久化为 JSON 文件,后台 goroutine 串行执行计划. // // 文件布局: // // {dir}/ // plan-{ts}-{rand}.json ← 每个计划一个文件 // // 并发安全:mu 保护内存中的 cancelFuncs 映射;文件操作使用原子写入. type FilePlanQueue struct { dir string // 计划文件目录(默认 ~/.flyto/plans/) execFunc PlanExecFunc // 注入的执行函数(nil = 仅做状态管理,不执行) pending chan string // plan ID 的内存信号队列(容量 planQueueCapacity) mu sync.Mutex cancelFuncs map[string]context.CancelFunc // planID → cancel(仅 running 状态有) closeOnce sync.Once closeCh chan struct{} wg sync.WaitGroup } // NewFilePlanQueue 创建 FilePlanQueue. // // dir:计划文件目录,空字符串使用默认路径 ~/.flyto/plans/. // execFunc:执行函数,nil 表示仅管理状态(适合测试或只做状态服务的 replica). func NewFilePlanQueue(dir string, execFunc PlanExecFunc) (*FilePlanQueue, error) { if dir == "" { home, err := os.UserHomeDir() if err != nil { return nil, fmt.Errorf("plan_queue: get home dir: %w", err) } dir = filepath.Join(home, ".flyto", "plans") } if err := os.MkdirAll(dir, 0o700); err != nil { return nil, fmt.Errorf("plan_queue: mkdir %s: %w", dir, err) } q := &FilePlanQueue{ dir: dir, execFunc: execFunc, pending: make(chan string, planQueueCapacity), cancelFuncs: make(map[string]context.CancelFunc), closeCh: make(chan struct{}), } // 启动后台执行循环 q.wg.Add(1) go q.runLoop() // 启动 TTL 清理循环 q.wg.Add(1) go q.cleanupLoop() return q, nil } // Submit 提交计划,立即返回 planID. func (q *FilePlanQueue) Submit(steps []PlanStep, opts PlanSubmitOptions) (string, error) { if opts.TimeoutSecs <= 0 { opts.TimeoutSecs = planDefaultTimeoutSecs } id, err := generatePlanID() if err != nil { return "", fmt.Errorf("plan_queue: generate id: %w", err) } // 初始化步骤状态 statuses := make(map[string]StepExecStatus, len(steps)) for _, s := range steps { statuses[s.ID] = StepExecPending } plan := &QueuedPlan{ ID: id, Steps: steps, Status: PlanStatusPending, SubmittedAt: time.Now().UTC(), TimeoutSecs: opts.TimeoutSecs, StepStatuses: statuses, } if err := q.saveAtomic(plan); err != nil { return "", fmt.Errorf("plan_queue: save plan: %w", err) } // 发送 ID 到内存队列(非阻塞) // 精妙之处(CLEVER): select + default 实现非阻塞发送. // 文件已写入(持久化成功),即使 channel 满导致 ErrPlanQueueFull, // 计划仍然在磁盘上.RecoverPending 或下次 daemon 重启时会重新入队. // 替代方案:阻塞发送(Submit 可能阻塞客户端,体验差). select { case q.pending <- id: default: // channel 满:计划已写入文件,下次 RecoverPending 会捡起它 return "", ErrPlanQueueFull } return id, nil } // Status 查询计划状态(直接读文件,无锁). func (q *FilePlanQueue) Status(planID string) (*QueuedPlan, error) { return q.load(planID) } // Cancel 取消计划. // // pending 状态:直接改为 cancelled 并写文件. // running 状态:调用 context.CancelFunc 中断执行,runLoop 会写 cancelled 状态. func (q *FilePlanQueue) Cancel(planID string) error { plan, err := q.load(planID) if err != nil { return err } if plan.IsTerminal() { return ErrPlanTerminal } if plan.Status == PlanStatusRunning { // 中断正在执行的计划 q.mu.Lock() cancel, ok := q.cancelFuncs[planID] q.mu.Unlock() if ok { cancel() // context 取消 → execFunc 应退出 → runLoop 写 cancelled 状态 } return nil } // pending 状态:直接写 cancelled now := time.Now().UTC() plan.Status = PlanStatusCancelled plan.FinishedAt = &now return q.saveAtomic(plan) } // List 返回所有计划,按提交时间倒序. func (q *FilePlanQueue) List() ([]*QueuedPlan, error) { entries, err := os.ReadDir(q.dir) if err != nil { return nil, fmt.Errorf("plan_queue: list dir: %w", err) } var plans []*QueuedPlan for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { continue } id := strings.TrimSuffix(e.Name(), ".json") plan, err := q.load(id) if err != nil { continue // 损坏的文件跳过 } plans = append(plans, plan) } // 按提交时间倒序(最新的在前) // 精妙之处(CLEVER): plan ID 前缀是时间戳纳秒,字典序等价于时间序. // 反向遍历 entries(目录项已按文件名字典序排序)即可,无需 sort.Slice. // 前提:plan ID 格式 "plan-{ns}-{hex}",ns 零填充保证字典序正确. reversed := make([]*QueuedPlan, len(plans)) for i, p := range plans { reversed[len(plans)-1-i] = p } return reversed, nil } // RecoverPending 在 daemon 启动时调用,恢复崩溃前处于 running 状态的计划. // // 升华改进(ELEVATED): daemon 崩溃时 running 状态的计划是"进行中但结果未知", // 早期实现 无此机制(无 daemon 模式). // 我们将其重置为 pending 重新入队,保证 at-least-once 执行语义. // 替代方案:标记为 failed(更保守,但浪费了已完成的工作). func (q *FilePlanQueue) RecoverPending() error { entries, err := os.ReadDir(q.dir) if err != nil { if os.IsNotExist(err) { return nil // 目录不存在,没有需要恢复的计划 } return fmt.Errorf("plan_queue: recover scan: %w", err) } recovered := 0 for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { continue } id := strings.TrimSuffix(e.Name(), ".json") plan, err := q.load(id) if err != nil { continue } if plan.Status == PlanStatusRunning { // 重置为 pending,清除运行时状态 plan.Status = PlanStatusPending plan.StartedAt = nil plan.CurrentStepID = "" // 将所有 running 步骤重置为 pending for k, v := range plan.StepStatuses { if v == StepExecRunning { plan.StepStatuses[k] = StepExecPending } } if err := q.saveAtomic(plan); err != nil { continue } // 重新入队(非阻塞,channel 满时计划仍在文件里) select { case q.pending <- id: default: } recovered++ } } if recovered > 0 { fmt.Fprintf(os.Stderr, "plan_queue: recovered %d pending plans from previous crash\n", recovered) } return nil } // Close 关闭队列,等待当前执行的计划完成(或超时 30 秒). func (q *FilePlanQueue) Close() error { q.closeOnce.Do(func() { close(q.closeCh) }) // 等待后台 goroutine 退出(最多 30 秒) done := make(chan struct{}) go func() { q.wg.Wait() close(done) }() select { case <-done: case <-time.After(30 * time.Second): // 超时:强制取消所有 running 计划 q.mu.Lock() for _, cancel := range q.cancelFuncs { cancel() } q.mu.Unlock() <-done } return nil } // ───────────────────────────────────────────────────────────────────── // 内部方法 // ───────────────────────────────────────────────────────────────────── // runLoop 是后台执行 goroutine.串行处理 pending channel 中的计划 ID. // // 精妙之处(CLEVER): 串行执行(一次只跑一个计划)是默认策略. // 原因:并行执行的计划可能操作同一批文件,产生冲突(类似两个 git branch 同时修改同一文件). // 串行是最安全的默认值.将来可以加 "file lock 检查" 来判断两个计划是否可以安全并行. // 替代方案:并行执行(更快但可能产生冲突,需要上层协调). func (q *FilePlanQueue) runLoop() { defer q.wg.Done() for { select { case <-q.closeCh: return case id := <-q.pending: q.executePlan(id) } } } // cleanupLoop 每小时扫描一次,删除超过 planResultTTL 的终态计划文件. func (q *FilePlanQueue) cleanupLoop() { defer q.wg.Done() ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for { select { case <-q.closeCh: return case <-ticker.C: q.cleanupExpired() } } } // cleanupExpired 删除超过 TTL 的终态计划文件. func (q *FilePlanQueue) cleanupExpired() { entries, err := os.ReadDir(q.dir) if err != nil { return } cutoff := time.Now().Add(-planResultTTL) for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { continue } id := strings.TrimSuffix(e.Name(), ".json") plan, err := q.load(id) if err != nil { continue } if !plan.IsTerminal() { continue } if plan.FinishedAt != nil && plan.FinishedAt.Before(cutoff) { _ = os.Remove(q.planPath(id)) } } } // executePlan 执行单个计划.由 runLoop goroutine 调用. func (q *FilePlanQueue) executePlan(id string) { plan, err := q.load(id) if err != nil { return // 文件损坏或已删除,跳过 } // 跳过已取消或其他终态的计划 if plan.Status != PlanStatusPending { return } // 标记为 running now := time.Now().UTC() plan.Status = PlanStatusRunning plan.StartedAt = &now if err := q.saveAtomic(plan); err != nil { return } // 创建带超时的 context,保存 cancel 函数供 Cancel() 调用 timeout := time.Duration(plan.TimeoutSecs) * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) q.mu.Lock() q.cancelFuncs[id] = cancel q.mu.Unlock() defer func() { cancel() q.mu.Lock() delete(q.cancelFuncs, id) q.mu.Unlock() }() // 执行(如果没有注入 execFunc,直接标记为 done) var execErr error if q.execFunc != nil { execErr = q.execFunc(ctx, plan, func(stepID string, stepErr error) { // onStepDone 回调:更新步骤状态并立即持久化 // 这样客户端轮询时能看到逐步进度,而非只有最终结果 reloaded, err := q.load(id) if err != nil { return } if stepErr != nil { reloaded.StepStatuses[stepID] = StepExecFailed } else { reloaded.StepStatuses[stepID] = StepExecDone } reloaded.CurrentStepID = stepID _ = q.saveAtomic(reloaded) }) } // 写入终态 finished, err := q.load(id) if err != nil { return } finishedAt := time.Now().UTC() finished.FinishedAt = &finishedAt finished.CurrentStepID = "" switch { case ctx.Err() == context.DeadlineExceeded: finished.Status = PlanStatusFailed finished.ErrorMsg = fmt.Sprintf("execution timeout after %d seconds", plan.TimeoutSecs) case ctx.Err() == context.Canceled: // 精妙之处(CLEVER): context.Canceled 意味着 Cancel() 被调用,而非超时. // 区分这两者对于用户理解很重要("我取消了" vs "超时了"). finished.Status = PlanStatusCancelled case execErr != nil: finished.Status = PlanStatusFailed finished.ErrorMsg = execErr.Error() default: finished.Status = PlanStatusDone // 将所有仍是 pending 的步骤标记为 done(execFunc 可能没有逐步回调) for k, v := range finished.StepStatuses { if v == StepExecPending || v == StepExecRunning { finished.StepStatuses[k] = StepExecDone } } } _ = q.saveAtomic(finished) } // saveAtomic 原子写入计划文件(write-then-rename). // // 精妙之处(CLEVER): 与 compact_persist.go 复用同一模式-- // 写 .tmp 文件后 rename,防止 daemon 崩溃产生截断的 JSON. // POSIX rename 是原子操作,读者要么看到旧文件要么看到新文件. func (q *FilePlanQueue) saveAtomic(plan *QueuedPlan) error { data, err := json.MarshalIndent(plan, "", " ") if err != nil { return fmt.Errorf("plan_queue: marshal: %w", err) } path := q.planPath(plan.ID) tmp := path + ".tmp" if err := os.WriteFile(tmp, data, 0o600); err != nil { return fmt.Errorf("plan_queue: write tmp: %w", err) } if err := os.Rename(tmp, path); err != nil { _ = os.Remove(tmp) return fmt.Errorf("plan_queue: rename: %w", err) } return nil } // load 从文件加载计划. func (q *FilePlanQueue) load(planID string) (*QueuedPlan, error) { data, err := os.ReadFile(q.planPath(planID)) if os.IsNotExist(err) { return nil, ErrPlanNotFound } if err != nil { return nil, fmt.Errorf("plan_queue: read %s: %w", planID, err) } var plan QueuedPlan if err := json.Unmarshal(data, &plan); err != nil { return nil, fmt.Errorf("plan_queue: unmarshal %s: %w", planID, err) } return &plan, nil } // planPath 返回计划文件的完整路径. func (q *FilePlanQueue) planPath(planID string) string { return filepath.Join(q.dir, planID+".json") } // ───────────────────────────────────────────────────────────────────── // ID 生成 // ───────────────────────────────────────────────────────────────────── // generatePlanID 生成计划唯一 ID. // // 格式:plan-{timestamp_ns_zero_padded}-{8_random_hex} // // 示例:plan-001736000000000000000-a1b2c3d4 // // 精妙之处(CLEVER): 零填充的纳秒时间戳(20位)保证字典序 == 时间序. // 这让 ls 命令直接显示提交顺序,无需解析文件内容. // 替代方案:UUID v4(纯随机,调试时无法直观判断顺序). func generatePlanID() (string, error) { buf := make([]byte, 4) if _, err := rand.Read(buf); err != nil { return "", err } // 零填充到 20 位保证字典序正确(纳秒时间戳 < 10^19,20 位足够) return fmt.Sprintf("plan-%020d-%s", time.Now().UnixNano(), hex.EncodeToString(buf)), nil }