// plan_queue_test.go - 模块 20.3 异步计划队列测试 // // 测试覆盖: // FilePlanQueue: // - Submit: 正常提交,队列满拒绝,无步骤拒绝(命令层) // - Status: 查找已提交计划,不存在返回 ErrPlanNotFound // - Cancel: pending 直接取消,running 通过 context 取消,终态返回 ErrPlanTerminal // - List: 按提交时间倒序 // - RecoverPending: running→pending 重置 // - Close: 等待执行完成 // - generatePlanID: 格式 + 字典序 // - cleanupExpired: TTL 清理 // // PlanCommandServer: // - submit_plan → plan_id // - plan_status → QueuedPlan // - plan_cancel → ok // - plan_list → []QueuedPlan // - 未知命令返回 error // - queue=nil 返回 "queue not available" // - SendPlanCommand 辅助函数 package engine import ( "context" "encoding/json" "errors" "fmt" "os" "path/filepath" "sort" "strings" "sync" "testing" "time" ) // ───────────────────────────────────────────────────────────────────────────── // 测试辅助 // ───────────────────────────────────────────────────────────────────────────── // makeTempQueue 创建一个使用临时目录的 FilePlanQueue. // execFunc 为 nil 时队列仅做状态管理(不执行计划). func makeTempQueue(t *testing.T, execFunc PlanExecFunc) *FilePlanQueue { t.Helper() dir := t.TempDir() q, err := NewFilePlanQueue(dir, execFunc) if err != nil { t.Fatalf("NewFilePlanQueue: %v", err) } t.Cleanup(func() { _ = q.Close() }) return q } // sampleSteps 生成若干测试用 PlanStep. func sampleSteps(n int) []PlanStep { steps := make([]PlanStep, n) for i := range steps { steps[i] = PlanStep{ ID: fmt.Sprintf("step-%d", i+1), Description: fmt.Sprintf("Step %d description", i+1), } } return steps } // waitStatus 轮询直到计划达到期望状态(最多等 3 秒). func waitStatus(t *testing.T, q PlanQueue, planID string, want PlanStatus) *QueuedPlan { t.Helper() deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { plan, err := q.Status(planID) if err != nil { t.Fatalf("waitStatus: Status(%s): %v", planID, err) } if plan.Status == want { return plan } time.Sleep(20 * time.Millisecond) } plan, _ := q.Status(planID) t.Fatalf("waitStatus: plan %s never reached %s (current: %v)", planID, want, plan.Status) return nil } // ───────────────────────────────────────────────────────────────────────────── // generatePlanID 测试 // ───────────────────────────────────────────────────────────────────────────── func TestGeneratePlanID_Format(t *testing.T) { id, err := generatePlanID() if err != nil { t.Fatalf("generatePlanID: %v", err) } if !strings.HasPrefix(id, "plan-") { t.Errorf("expected prefix 'plan-', got %q", id) } // 格式:plan-{20位数字}-{8位hex} parts := strings.Split(id, "-") if len(parts) != 3 { t.Fatalf("expected 3 parts, got %d: %q", len(parts), id) } if len(parts[1]) != 20 { t.Errorf("timestamp part should be 20 digits, got %d: %q", len(parts[1]), parts[1]) } if len(parts[2]) != 8 { t.Errorf("random hex part should be 8 chars, got %d: %q", len(parts[2]), parts[2]) } } func TestGeneratePlanID_LexicographicOrder(t *testing.T) { // 快速生成多个 ID,验证字典序 == 时间序 ids := make([]string, 20) for i := range ids { id, err := generatePlanID() if err != nil { t.Fatalf("generatePlanID[%d]: %v", i, err) } ids[i] = id } sorted := make([]string, len(ids)) copy(sorted, ids) sort.Strings(sorted) for i := range ids { if ids[i] != sorted[i] { t.Errorf("position %d: generation order (%q) != sorted order (%q)", i, ids[i], sorted[i]) } } } // ───────────────────────────────────────────────────────────────────────────── // FilePlanQueue: 基本操作 // ───────────────────────────────────────────────────────────────────────────── func TestFilePlanQueue_SubmitAndStatus(t *testing.T) { q := makeTempQueue(t, nil) // 无执行函数,计划停在 pending steps := sampleSteps(3) planID, err := q.Submit(steps, PlanSubmitOptions{}) if err != nil { t.Fatalf("Submit: %v", err) } if planID == "" { t.Fatal("Submit returned empty planID") } plan, err := q.Status(planID) if err != nil { t.Fatalf("Status: %v", err) } // nil execFunc:runLoop 收到 pending 但 executePlan 立即返回 done(或跳过) // 实际:execFunc==nil 时 executePlan 直接标记 done // 等待最终状态 if plan.ID != planID { t.Errorf("plan.ID = %q, want %q", plan.ID, planID) } if len(plan.Steps) != 3 { t.Errorf("len(steps) = %d, want 3", len(plan.Steps)) } if plan.TimeoutSecs != planDefaultTimeoutSecs { t.Errorf("TimeoutSecs = %d, want %d", plan.TimeoutSecs, planDefaultTimeoutSecs) } } func TestFilePlanQueue_StatusNotFound(t *testing.T) { q := makeTempQueue(t, nil) _, err := q.Status("plan-00000000000000000000-nonexist") if !errors.Is(err, ErrPlanNotFound) { t.Errorf("expected ErrPlanNotFound, got %v", err) } } func TestFilePlanQueue_ExecFunc_Done(t *testing.T) { // execFunc 正常完成 stepsDone := make([]string, 0) var mu sync.Mutex exec := func(_ context.Context, plan *QueuedPlan, onStepDone func(string, error)) error { for _, s := range plan.Steps { onStepDone(s.ID, nil) mu.Lock() stepsDone = append(stepsDone, s.ID) mu.Unlock() } return nil } q := makeTempQueue(t, exec) steps := sampleSteps(2) planID, err := q.Submit(steps, PlanSubmitOptions{}) if err != nil { t.Fatalf("Submit: %v", err) } plan := waitStatus(t, q, planID, PlanStatusDone) if plan.Status != PlanStatusDone { t.Errorf("status = %q, want done", plan.Status) } if plan.FinishedAt == nil { t.Error("FinishedAt should be set") } mu.Lock() defer mu.Unlock() if len(stepsDone) != 2 { t.Errorf("onStepDone called %d times, want 2", len(stepsDone)) } } func TestFilePlanQueue_ExecFunc_Failed(t *testing.T) { exec := func(_ context.Context, _ *QueuedPlan, _ func(string, error)) error { return errors.New("exec failed: disk full") } q := makeTempQueue(t, exec) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{}) plan := waitStatus(t, q, planID, PlanStatusFailed) if plan.ErrorMsg != "exec failed: disk full" { t.Errorf("ErrorMsg = %q, want 'exec failed: disk full'", plan.ErrorMsg) } } func TestFilePlanQueue_ExecFunc_Timeout(t *testing.T) { // execFunc 阻塞直到 ctx 取消(超时触发) exec := func(ctx context.Context, _ *QueuedPlan, _ func(string, error)) error { <-ctx.Done() return ctx.Err() } q := makeTempQueue(t, exec) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{TimeoutSecs: 1}) plan := waitStatus(t, q, planID, PlanStatusFailed) if !strings.Contains(plan.ErrorMsg, "timeout") { t.Errorf("ErrorMsg should mention timeout, got %q", plan.ErrorMsg) } } // ───────────────────────────────────────────────────────────────────────────── // FilePlanQueue: Cancel // ───────────────────────────────────────────────────────────────────────────── func TestFilePlanQueue_CancelTerminal(t *testing.T) { // nil execFunc → plan 立即 done(runLoop 中 executePlan 读 pending 但 execFunc=nil 直接 done) // 所以用有阻塞的 execFunc 来确保 done 状态 exec := func(_ context.Context, _ *QueuedPlan, _ func(string, error)) error { return nil } q := makeTempQueue(t, exec) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{}) waitStatus(t, q, planID, PlanStatusDone) err := q.Cancel(planID) if !errors.Is(err, ErrPlanTerminal) { t.Errorf("expected ErrPlanTerminal after done, got %v", err) } } func TestFilePlanQueue_CancelRunning(t *testing.T) { started := make(chan struct{}) exec := func(ctx context.Context, _ *QueuedPlan, _ func(string, error)) error { close(started) <-ctx.Done() return ctx.Err() } q := makeTempQueue(t, exec) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{TimeoutSecs: 60}) <-started // 等 execFunc 开始 if err := q.Cancel(planID); err != nil { t.Fatalf("Cancel running plan: %v", err) } plan := waitStatus(t, q, planID, PlanStatusCancelled) if plan.Status != PlanStatusCancelled { t.Errorf("status = %q, want cancelled", plan.Status) } } func TestFilePlanQueue_CancelPending_Direct(t *testing.T) { // 创建队列但不会执行(execFunc 会阻塞第一个 plan,第二个停在 pending) blocker := make(chan struct{}) exec := func(ctx context.Context, _ *QueuedPlan, _ func(string, error)) error { select { case <-blocker: case <-ctx.Done(): } return nil } q := makeTempQueue(t, exec) // 先提交一个占住执行槽 planID1, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{TimeoutSecs: 10}) waitStatus(t, q, planID1, PlanStatusRunning) // 第二个会停在 pending(因为 runLoop 是串行的) planID2, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{TimeoutSecs: 10}) // 等一小段让 planID2 写入文件 time.Sleep(50 * time.Millisecond) if err := q.Cancel(planID2); err != nil { t.Fatalf("Cancel pending plan: %v", err) } plan, _ := q.Status(planID2) if plan.Status != PlanStatusCancelled { t.Errorf("status = %q, want cancelled", plan.Status) } // 释放第一个 plan close(blocker) } // ───────────────────────────────────────────────────────────────────────────── // FilePlanQueue: List // ───────────────────────────────────────────────────────────────────────────── func TestFilePlanQueue_List_ReverseOrder(t *testing.T) { exec := func(_ context.Context, _ *QueuedPlan, _ func(string, error)) error { return nil } q := makeTempQueue(t, exec) ids := make([]string, 3) for i := range ids { id, err := q.Submit(sampleSteps(1), PlanSubmitOptions{}) if err != nil { t.Fatalf("Submit[%d]: %v", i, err) } ids[i] = id time.Sleep(2 * time.Millisecond) // 保证时间戳不同 } // 等所有计划完成 for _, id := range ids { waitStatus(t, q, id, PlanStatusDone) } plans, err := q.List() if err != nil { t.Fatalf("List: %v", err) } if len(plans) < 3 { t.Fatalf("List returned %d plans, want >= 3", len(plans)) } // 验证倒序:plans[0] 是最新的(ID 最大) for i := 1; i < len(plans); i++ { if plans[i-1].ID < plans[i].ID { t.Errorf("List not in reverse order: plans[%d]=%s < plans[%d]=%s", i-1, plans[i-1].ID, i, plans[i].ID) } } } // ───────────────────────────────────────────────────────────────────────────── // FilePlanQueue: RecoverPending // ───────────────────────────────────────────────────────────────────────────── func TestFilePlanQueue_RecoverPending(t *testing.T) { // 精妙之处(CLEVER): 这里测试的是"行为"而非"中间状态". // RecoverPending 将 running→pending,runLoop 立即会捡起重新执行. // 测试不能假设中间的 pending 状态能被捕捉到(runLoop goroutine 并发). // 正确验证方式: // 1. 用阻塞 execFunc 让"步骤是否被 RecoverPending 正确重置"可被追踪 // 2. 验证崩溃遗留的计划确实被重新执行(最终到达 done) // 3. 验证 done 计划不被改动 dir := t.TempDir() // 追踪哪些步骤被执行了 var executedSteps []string var mu sync.Mutex execDone := make(chan struct{}) exec := func(_ context.Context, plan *QueuedPlan, onStepDone func(string, error)) error { defer close(execDone) for _, s := range plan.Steps { onStepDone(s.ID, nil) mu.Lock() executedSteps = append(executedSteps, s.ID) mu.Unlock() } return nil } // 手动写一个 running 状态的计划文件(模拟 daemon 崩溃遗留) id := "plan-00000000000000000001-aabbccdd" crashPlan := &QueuedPlan{ ID: id, Steps: sampleSteps(2), Status: PlanStatusRunning, // 崩溃时处于 running StepStatuses: map[string]StepExecStatus{ "step-1": StepExecDone, // 崩溃前已完成 "step-2": StepExecRunning, // 崩溃中断 }, SubmittedAt: time.Now().Add(-10 * time.Minute).UTC(), TimeoutSecs: 1800, } data, _ := json.Marshal(crashPlan) if err := os.WriteFile(filepath.Join(dir, id+".json"), data, 0o600); err != nil { t.Fatalf("write fixture: %v", err) } // 写一个 done 状态的计划(不应被 RecoverPending 改动) donePlan := &QueuedPlan{ ID: "plan-00000000000000000002-aabbccdd", Status: PlanStatusDone, SubmittedAt: time.Now().UTC(), TimeoutSecs: 1800, } doneData, _ := json.Marshal(donePlan) _ = os.WriteFile(filepath.Join(dir, donePlan.ID+".json"), doneData, 0o600) // 创建队列(runLoop 已启动) q, err := NewFilePlanQueue(dir, exec) if err != nil { t.Fatalf("NewFilePlanQueue: %v", err) } defer q.Close() // RecoverPending:将 running 的计划文件重置为 pending 并入队 if err := q.RecoverPending(); err != nil { t.Fatalf("RecoverPending: %v", err) } // 等待计划被重新执行完成(说明 RecoverPending 成功把计划重新入队) select { case <-execDone: case <-time.After(5 * time.Second): t.Fatal("RecoverPending plan was not re-executed within 5s") } // 验证计划最终到达 done final := waitStatus(t, q, id, PlanStatusDone) if final.Status != PlanStatusDone { t.Errorf("status = %q, want done", final.Status) } // 验证所有步骤都被重新执行 mu.Lock() n := len(executedSteps) mu.Unlock() if n != 2 { t.Errorf("executed %d steps, want 2 (all steps should be re-run)", n) } // done 计划不应被改动 doneRecovered, _ := q.Status(donePlan.ID) if doneRecovered.Status != PlanStatusDone { t.Errorf("done plan should stay done, got %q", doneRecovered.Status) } } // ───────────────────────────────────────────────────────────────────────────── // FilePlanQueue: 原子写入 + cleanupExpired // ───────────────────────────────────────────────────────────────────────────── func TestFilePlanQueue_AtomicWrite_NoTmpLeft(t *testing.T) { q := makeTempQueue(t, nil) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{}) // 等计划完成(nil execFunc → done) waitStatus(t, q, planID, PlanStatusDone) // 确认没有留下 .tmp 文件 entries, _ := os.ReadDir(q.dir) for _, e := range entries { if strings.HasSuffix(e.Name(), ".tmp") { t.Errorf("stray .tmp file found: %s", e.Name()) } } } func TestFilePlanQueue_CleanupExpired(t *testing.T) { q := makeTempQueue(t, nil) // 手动写入一个"很久以前"完成的计划文件 oldTime := time.Now().Add(-25 * time.Hour).UTC() id := "plan-00000000000000000099-deadbeef" plan := &QueuedPlan{ ID: id, Status: PlanStatusDone, SubmittedAt: oldTime, FinishedAt: &oldTime, TimeoutSecs: 1800, } data, _ := json.Marshal(plan) _ = os.WriteFile(filepath.Join(q.dir, id+".json"), data, 0o600) // 手动触发清理(不等 1 小时计时器) q.cleanupExpired() // 文件应已删除 if _, err := os.Stat(filepath.Join(q.dir, id+".json")); !os.IsNotExist(err) { t.Error("expired plan file should have been deleted") } } // ───────────────────────────────────────────────────────────────────────────── // FilePlanQueue: Close // ───────────────────────────────────────────────────────────────────────────── func TestFilePlanQueue_CloseIdempotent(t *testing.T) { q := makeTempQueue(t, nil) if err := q.Close(); err != nil { t.Fatalf("first Close: %v", err) } if err := q.Close(); err != nil { t.Fatalf("second Close (should be idempotent): %v", err) } } func TestFilePlanQueue_CloseWaitsForRunning(t *testing.T) { finished := make(chan struct{}) exec := func(ctx context.Context, _ *QueuedPlan, _ func(string, error)) error { // 模拟正在执行的计划(短暂,不会触发 Close 超时) select { case <-ctx.Done(): return ctx.Err() case <-time.After(100 * time.Millisecond): close(finished) return nil } } dir := t.TempDir() q, err := NewFilePlanQueue(dir, exec) if err != nil { t.Fatalf("NewFilePlanQueue: %v", err) } planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{TimeoutSecs: 60}) waitStatus(t, q, planID, PlanStatusRunning) if err := q.Close(); err != nil { t.Fatalf("Close: %v", err) } select { case <-finished: // 正常:Close 等待 execFunc 完成 case <-time.After(5 * time.Second): t.Error("Close returned before exec finished") } } // ───────────────────────────────────────────────────────────────────────────── // PlanCommandServer 测试 // ───────────────────────────────────────────────────────────────────────────── // startTestServer 创建并启动一个 PlanCommandServer(使用真实的 FilePlanQueue). func startTestServer(t *testing.T, execFunc PlanExecFunc) (*PlanCommandServer, *FilePlanQueue) { t.Helper() q := makeTempQueue(t, execFunc) // 用时间戳 + 测试名哈希做 sessionID sessionID := fmt.Sprintf("test%d", time.Now().UnixNano()) srv, err := NewPlanCommandServer(sessionID, q) if err != nil { t.Fatalf("NewPlanCommandServer: %v", err) } if err := srv.Start(); err != nil { t.Fatalf("Start: %v", err) } t.Cleanup(func() { _ = srv.Close() }) return srv, q } // sendCmd 是测试用的便捷发命令函数. func sendCmd(t *testing.T, sockPath string, req planCmdRequest) *planCmdResponse { t.Helper() resp, err := SendPlanCommand(context.Background(), sockPath, req) if err != nil { t.Fatalf("SendPlanCommand: %v", err) } return resp } func TestPlanCommandServer_SubmitPlan(t *testing.T) { srv, _ := startTestServer(t, nil) resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "submit_plan", Steps: sampleSteps(2), }) if !resp.Ok { t.Fatalf("submit_plan failed: %s", resp.Error) } if resp.PlanID == "" { t.Error("PlanID should not be empty") } if !strings.HasPrefix(resp.PlanID, "plan-") { t.Errorf("PlanID should start with 'plan-', got %q", resp.PlanID) } } func TestPlanCommandServer_SubmitPlan_EmptySteps(t *testing.T) { srv, _ := startTestServer(t, nil) resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "submit_plan", Steps: []PlanStep{}, // 空步骤 }) if resp.Ok { t.Error("submit_plan with empty steps should fail") } } func TestPlanCommandServer_PlanStatus(t *testing.T) { srv, q := startTestServer(t, nil) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{}) resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "plan_status", PlanID: planID, }) if !resp.Ok { t.Fatalf("plan_status failed: %s", resp.Error) } if resp.Plan == nil { t.Fatal("plan should not be nil") } if resp.Plan.ID != planID { t.Errorf("plan.ID = %q, want %q", resp.Plan.ID, planID) } } func TestPlanCommandServer_PlanStatus_NotFound(t *testing.T) { srv, _ := startTestServer(t, nil) resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "plan_status", PlanID: "plan-00000000000000000000-notexist", }) if resp.Ok { t.Error("plan_status for nonexistent plan should fail") } if !strings.Contains(resp.Error, "not found") { t.Errorf("error should mention 'not found', got %q", resp.Error) } } func TestPlanCommandServer_PlanCancel(t *testing.T) { // 使用阻塞 execFunc,让计划保持在 running 状态 started := make(chan struct{}) exec := func(ctx context.Context, _ *QueuedPlan, _ func(string, error)) error { close(started) <-ctx.Done() return ctx.Err() } srv, q := startTestServer(t, exec) planID, _ := q.Submit(sampleSteps(1), PlanSubmitOptions{TimeoutSecs: 60}) <-started // 等 exec 开始 resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "plan_cancel", PlanID: planID, }) if !resp.Ok { t.Fatalf("plan_cancel failed: %s", resp.Error) } // 等待状态变为 cancelled waitStatus(t, q, planID, PlanStatusCancelled) } func TestPlanCommandServer_PlanList(t *testing.T) { exec := func(_ context.Context, _ *QueuedPlan, _ func(string, error)) error { return nil } srv, q := startTestServer(t, exec) // 提交 3 个计划 for i := 0; i < 3; i++ { _, err := q.Submit(sampleSteps(1), PlanSubmitOptions{}) if err != nil { t.Fatalf("Submit[%d]: %v", i, err) } } resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "plan_list", }) if !resp.Ok { t.Fatalf("plan_list failed: %s", resp.Error) } if resp.Plans == nil { t.Fatal("plans should not be nil (even if empty)") } if len(resp.Plans) < 3 { t.Errorf("expected >= 3 plans, got %d", len(resp.Plans)) } } func TestPlanCommandServer_UnknownCommand(t *testing.T) { srv, _ := startTestServer(t, nil) resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "do_magic", }) if resp.Ok { t.Error("unknown command should return ok=false") } if !strings.Contains(resp.Error, "unknown command") { t.Errorf("error should mention 'unknown command', got %q", resp.Error) } } func TestPlanCommandServer_NilQueue(t *testing.T) { sessionID := fmt.Sprintf("niltest%d", time.Now().UnixNano()) srv, err := NewPlanCommandServer(sessionID, nil) if err != nil { t.Fatalf("NewPlanCommandServer: %v", err) } if err := srv.Start(); err != nil { t.Fatalf("Start: %v", err) } defer srv.Close() resp := sendCmd(t, srv.SockPath(), planCmdRequest{Command: "plan_list"}) if resp.Ok { t.Error("nil queue should return ok=false") } if !strings.Contains(resp.Error, "not available") { t.Errorf("error should mention 'not available', got %q", resp.Error) } } func TestPlanCommandServer_CloseIdempotent(t *testing.T) { sessionID := fmt.Sprintf("closetest%d", time.Now().UnixNano()) srv, err := NewPlanCommandServer(sessionID, nil) if err != nil { t.Fatalf("NewPlanCommandServer: %v", err) } if err := srv.Start(); err != nil { t.Fatalf("Start: %v", err) } if err := srv.Close(); err != nil { t.Fatalf("first Close: %v", err) } if err := srv.Close(); err != nil { t.Fatalf("second Close should be idempotent: %v", err) } } func TestPlanCommandServer_SockPathFormat(t *testing.T) { sessionID := fmt.Sprintf("pathtest%d", time.Now().UnixNano()) srv, _ := NewPlanCommandServer(sessionID, nil) sockPath := srv.SockPath() if !strings.Contains(sockPath, "flyto-plan-") { t.Errorf("SockPath should contain 'flyto-plan-', got %q", sockPath) } if !strings.HasSuffix(sockPath, ".sock") { t.Errorf("SockPath should end with '.sock', got %q", sockPath) } } func TestPlanCommandServer_PlanStatus_MissingPlanID(t *testing.T) { srv, _ := startTestServer(t, nil) resp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "plan_status", PlanID: "", // 缺少 plan_id }) if resp.Ok { t.Error("plan_status without plan_id should fail") } if !strings.Contains(resp.Error, "required") { t.Errorf("error should mention 'required', got %q", resp.Error) } } func TestPlanCommandServer_SubmitAndStatusEndToEnd(t *testing.T) { // 端到端测试:通过命令服务器提交计划,再通过命令服务器查询状态 exec := func(_ context.Context, plan *QueuedPlan, onStepDone func(string, error)) error { for _, s := range plan.Steps { onStepDone(s.ID, nil) } return nil } srv, q := startTestServer(t, exec) // 通过命令服务器提交 submitResp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "submit_plan", Steps: sampleSteps(3), TimeoutSecs: 30, }) if !submitResp.Ok { t.Fatalf("submit_plan: %s", submitResp.Error) } planID := submitResp.PlanID // 等待完成(通过 FilePlanQueue 直接轮询,避免命令服务器延迟) waitStatus(t, q, planID, PlanStatusDone) // 通过命令服务器查询最终状态 statusResp := sendCmd(t, srv.SockPath(), planCmdRequest{ Command: "plan_status", PlanID: planID, }) if !statusResp.Ok { t.Fatalf("plan_status: %s", statusResp.Error) } if statusResp.Plan.Status != PlanStatusDone { t.Errorf("final status = %q, want done", statusResp.Plan.Status) } if statusResp.Plan.StepsDone() != 3 { t.Errorf("StepsDone = %d, want 3", statusResp.Plan.StepsDone()) } }