// tasklist_test.go 验证 TaskList 业务层 + MemoryStore 行为. // // 测试覆盖: // - Add / Get / List 基础 CRUD // - Claim 认领 + 重复认领 (并发 CAS 冲突) // - Complete 完成 + 已完成的二次调用 // - Fail 失败 + 失败不可被 Complete // - 并发 Claim 竞争 (只有一个 goroutine 成功) package tasklist import ( "context" "errors" "sync" "sync/atomic" "testing" ) func TestTaskList_AddListGet(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) defer tl.Close() task, err := tl.Add(ctx, "单测任务 1", "这是第一条测试任务") if err != nil { t.Fatalf("Add error: %v", err) } if task.Status != StatusPending { t.Errorf("expected status pending, got %s", task.Status) } if task.Version != 1 { t.Errorf("expected version 1, got %d", task.Version) } got, err := tl.Get(ctx, task.ID) if err != nil { t.Fatalf("Get error: %v", err) } if got.Subject != task.Subject { t.Errorf("subject mismatch: %s vs %s", got.Subject, task.Subject) } all, err := tl.List(ctx) if err != nil { t.Fatalf("List error: %v", err) } if len(all) != 1 { t.Errorf("expected 1 task, got %d", len(all)) } } func TestTaskList_ClaimFlow(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) task, err := tl.Add(ctx, "可认领任务", "") if err != nil { t.Fatalf("Add: %v", err) } claimed, err := tl.Claim(ctx, task.ID, "worker-A") if err != nil { t.Fatalf("Claim: %v", err) } if claimed.Status != StatusClaimed { t.Errorf("expected claimed, got %s", claimed.Status) } if claimed.ClaimedBy != "worker-A" { t.Errorf("claimed_by mismatch: %s", claimed.ClaimedBy) } if claimed.Version != 2 { t.Errorf("expected version 2 after claim, got %d", claimed.Version) } // 重复 claim 同一任务应该失败 (不在 Pending 状态) if _, err := tl.Claim(ctx, task.ID, "worker-B"); !errors.Is(err, ErrNotClaimable) { t.Errorf("expected ErrNotClaimable, got %v", err) } } func TestTaskList_CompleteFlow(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) task, _ := tl.Add(ctx, "完成流程", "") _, _ = tl.Claim(ctx, task.ID, "worker-1") completed, err := tl.Complete(ctx, task.ID, "任务已完成, 结果 OK") if err != nil { t.Fatalf("Complete: %v", err) } if completed.Status != StatusCompleted { t.Errorf("expected completed, got %s", completed.Status) } if completed.Result != "任务已完成, 结果 OK" { t.Errorf("result mismatch: %s", completed.Result) } // 已完成的任务再次 Complete 返回 ErrAlreadyCompleted if _, err := tl.Complete(ctx, task.ID, "again"); !errors.Is(err, ErrAlreadyCompleted) { t.Errorf("expected ErrAlreadyCompleted, got %v", err) } } func TestTaskList_FailFlow(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) task, _ := tl.Add(ctx, "失败流程", "") failed, err := tl.Fail(ctx, task.ID, "网络中断") if err != nil { t.Fatalf("Fail: %v", err) } if failed.Status != StatusFailed { t.Errorf("expected failed, got %s", failed.Status) } if failed.FailReason != "网络中断" { t.Errorf("fail reason: %s", failed.FailReason) } // 失败后不可再 Claim if _, err := tl.Claim(ctx, task.ID, "worker"); !errors.Is(err, ErrNotClaimable) { t.Errorf("expected ErrNotClaimable after Fail, got %v", err) } } // TestTaskList_ConcurrentClaim 验证并发 Claim 下 CAS 原子性 -- // 100 个 goroutine 同时抢 1 个任务, 只有 1 个应该成功. func TestTaskList_ConcurrentClaim(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) task, _ := tl.Add(ctx, "并发抢单", "") const N = 100 var wg sync.WaitGroup var successCount int32 var conflictCount int32 var notClaimableCount int32 for i := range N { wg.Add(1) go func(idx int) { defer wg.Done() _, err := tl.Claim(ctx, task.ID, "worker-"+string(rune('A'+idx%26))) switch { case err == nil: atomic.AddInt32(&successCount, 1) case errors.Is(err, ErrConcurrentModification): atomic.AddInt32(&conflictCount, 1) case errors.Is(err, ErrNotClaimable): atomic.AddInt32(¬ClaimableCount, 1) default: t.Errorf("unexpected error: %v", err) } }(i) } wg.Wait() if successCount != 1 { t.Errorf("expected exactly 1 successful claim, got %d", successCount) } if int(successCount+conflictCount+notClaimableCount) != N { t.Errorf("counts do not sum to N=%d (success=%d conflict=%d not_claimable=%d)", N, successCount, conflictCount, notClaimableCount) } } func TestMemoryStore_CAS_NewAndExists(t *testing.T) { ctx := context.Background() s := NewMemoryStore() // 新增 task := Task{ID: "t1", Subject: "first", Status: StatusPending, Version: 1} if err := s.CAS(ctx, "t1", 0, task); err != nil { t.Fatalf("CAS new: %v", err) } // 再次新增同 ID 应冲突 if err := s.CAS(ctx, "t1", 0, task); !errors.Is(err, ErrTaskAlreadyExists) { t.Errorf("expected ErrTaskAlreadyExists, got %v", err) } // 版本不匹配 updated := task updated.Subject = "updated" updated.Version = 2 if err := s.CAS(ctx, "t1", 99, updated); !errors.Is(err, ErrConcurrentModification) { t.Errorf("expected ErrConcurrentModification, got %v", err) } // 正确版本匹配 if err := s.CAS(ctx, "t1", 1, updated); err != nil { t.Errorf("CAS with correct version: %v", err) } got, _ := s.Get(ctx, "t1") if got.Subject != "updated" { t.Errorf("update not applied, got %s", got.Subject) } } func TestTaskList_AddEmptySubject(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) if _, err := tl.Add(ctx, "", "desc"); err == nil { t.Error("expected error for empty subject") } } func TestTaskList_ClaimEmptyBy(t *testing.T) { ctx := context.Background() tl := New(NewMemoryStore()) task, _ := tl.Add(ctx, "t", "") if _, err := tl.Claim(ctx, task.ID, ""); err == nil { t.Error("expected error for empty by") } }