// Package tasklist 实现 Agent Teams 的 shared task list 业务层. // // 模块定位: // // TaskList 让多个 Agent (Worker 或 Leader) 共享一张任务清单, 实现: // - 列出待办 (List): 看板视角 // - 添加任务 (Add): 任意 Agent 可发布新任务到共享队列 // - 抢单 (Claim): Worker 主动认领待办, 原子 CAS 避免重复抢 // - 完成 (Complete): 记录结果, 驱动 TaskCompleted 事件 (Observer 订阅) // - 标记失败 (Fail): 记录失败原因, 支持其他 Worker 重新抢单 // // 分层设计 (业务层 + 存储层): // // TaskList (业务层, 一份实现): 管 claim 状态机, 走 Store.CAS 原子更新. // Store (存储层, 多实现): 只负责 Get/CAS/List. // - MemoryStore: 同进程 dogfood / 测试, 零外部依赖. // - MarkdownStore (另见 markdown_store.go): 文件 + flock, 对齐 Anthropic // Claude Code v2.1.32 tasks.md 格式, 实现双向互操作. // - CustomStore: 消费层实现 Store 接口, 接入金融 DB / 医疗合规存储 / // 仓储 WMS / Redis 等. 引擎层零改动. // // 核心设计决策: // 1. 业务逻辑单一 (vs. "每个实现自带 claim 状态机"): // Claim 语义由 TaskList.Claim 唯一控制, Store 只保证原子 CAS -- // 所有后端 (memory / markdown / DB) 行为一致. // 精妙之处(CLEVER): 业务和存储分层对齐 database/sql + driver 标准模式, // Go 生态熟悉, 消费层零学习成本. // 替代方案: <每个实现内置 claim> - 否决: 行为漂移, Markdown 实现可能 // 和 Memory 实现微妙不一致. // 2. CAS 原子性 (vs. "把锁放在 TaskList 层"): // 不同 backend 的并发原语不同 (memory: mutex / markdown: flock / DB: // transaction), 由 Store 自己实现 CAS 契约, TaskList 只调用. // 跨行业扩展: 金融客户的 DB 事务原子性比文件锁强, 接 PostgresStore // 能复用 Flyto 全套业务逻辑, 同时享受 DB 级事务保证. // 3. Version 字段用 int (vs. "timestamp / hash"): // int 递增简单可预测, 并发 CAS 不丢更新; timestamp 有 skew 风险, // hash 需要内容哈希额外 CPU. MVP 简单够用. // // 跨行业对照: // // 领域 | 任务示例 | 典型 Store 选择 // -----------|------------------------------|------------------ // 编程 | "写单测 / code review" | MarkdownStore (互操作 Anthropic) // 金融 | "分析股票 / 风控审核" | CustomStore(PostgreSQL) // 医疗 | "诊断影像 / 审方" | CustomStore(HIPAA 加密存储) // 仓储 | "波次 / 拣货 / 盘点" | CustomStore(WMS DB 集成) // 法律 | "合同审阅 / 案例检索" | CustomStore(事务所文档管理) package tasklist import ( "context" "crypto/rand" "encoding/hex" "errors" "fmt" "time" ) // TaskStatus 是任务状态枚举. type TaskStatus string const ( // StatusPending 新建, 等待认领. StatusPending TaskStatus = "pending" // StatusClaimed 已被某 Worker 认领, 正在进行. StatusClaimed TaskStatus = "claimed" // StatusCompleted 成功完成, 有结果. StatusCompleted TaskStatus = "completed" // StatusFailed 失败, 有失败原因. // // 精妙之处(CLEVER): Failed 不等于删除 -- 允许其他 Worker 查看失败原因, // 通过 Claim 重新领取 (claim 会校验状态, 默认 Failed 不可重领, 消费层 // 可自定义策略). StatusFailed TaskStatus = "failed" ) // Task 是共享任务清单中的一项. // // 升华改进(ELEVATED): 字段覆盖编程/金融/医疗/仓储场景共性需求 -- // Subject (短标题) + Description (详情) 是所有任务系统通用结构; // ClaimedBy 可为 agent 名/员工工号/系统账号, 跨行业中立; // Result 是纯文本, 消费层可存 JSON/XML/富文本, 引擎不限制. type Task struct { // ID 唯一标识符. ID string `json:"id"` // Subject 短标题 (1-80 字). Subject string `json:"subject"` // Description 详细描述. Description string `json:"description"` // Status 当前状态. Status TaskStatus `json:"status"` // ClaimedBy 认领者名 (agent 名或其他标识). ClaimedBy string `json:"claimed_by,omitempty"` // Result 完成结果 (纯文本, 消费层可约定格式). Result string `json:"result,omitempty"` // FailReason 失败原因. FailReason string `json:"fail_reason,omitempty"` // CreatedAt 创建时间. CreatedAt time.Time `json:"created_at"` // ClaimedAt 认领时间 (未认领为零值). ClaimedAt time.Time `json:"claimed_at,omitempty"` // CompletedAt 完成时间 (未完成为零值). CompletedAt time.Time `json:"completed_at,omitempty"` // Version 乐观并发控制版本号, 每次修改 +1. // // 精妙之处(CLEVER): Store.CAS 用 Version 做乐观锁 -- // 读 → 业务计算 → 写 的模式下, 两个 Agent 并发 Claim 同一任务, // 只有一个 CAS 成功, 另一个收到 ErrConcurrentModification, 重试 // 或放弃, 避免重复认领. Version int `json:"version"` } // 错误定义. var ( // ErrTaskNotFound 任务不存在. ErrTaskNotFound = errors.New("tasklist: task not found") // ErrTaskAlreadyExists 任务 ID 冲突 (Add 时). ErrTaskAlreadyExists = errors.New("tasklist: task already exists") // ErrConcurrentModification 乐观锁冲突, 应重试. ErrConcurrentModification = errors.New("tasklist: concurrent modification (version mismatch)") // ErrNotClaimable 任务当前状态不可认领. ErrNotClaimable = errors.New("tasklist: task is not in a claimable state") // ErrAlreadyCompleted 任务已完成, 不可再操作. ErrAlreadyCompleted = errors.New("tasklist: task already completed") ) // Store 是任务存储层接口. // // 升华改进(ELEVATED): 接口只定义 CAS 原子性契约, 不定义 claim 业务逻辑 -- // business/storage 分层, 消费层接新 Store 只需实现 4 方法 (Get/CAS/List/Close). // 替代方案: <把 Claim 方法放在 Store 上> - 否决: 每个实现都要重写状态机, 行为漂移风险高. type Store interface { // Get 读取指定任务. 不存在返回 ErrTaskNotFound. Get(ctx context.Context, id string) (Task, error) // CAS 原子"比较并交换": 如果 id 对应任务的当前 Version 等于 expectedVersion, // 写入 newTask; 否则返回 ErrConcurrentModification. // // 特殊语义: expectedVersion == 0 表示"新增" -- 仅当 id 不存在时写入, // 已存在则返回 ErrTaskAlreadyExists. // // 原子性由实现保证: MemoryStore 走 mutex, MarkdownStore 走 flock, // DBStore 走 transaction. CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error // List 返回所有任务 (顺序由实现决定, 建议按 CreatedAt 升序). List(ctx context.Context) ([]Task, error) // Close 释放资源 (文件句柄 / 数据库连接等). 幂等. Close() error } // TaskList 是任务清单业务层, 管 claim/complete/fail 状态机. // // 精妙之处(CLEVER): 结构体只持有 Store 引用, 无其他状态 -- // 并发安全完全委托给 Store.CAS; TaskList 方法是纯业务逻辑, 易测试. type TaskList struct { store Store } // New 构造 TaskList. func New(store Store) *TaskList { return &TaskList{store: store} } // Store 返回底层 Store 引用 (仅供高级用例, 消费层通常不需要直接访问). func (tl *TaskList) Store() Store { return tl.store } // Add 添加新任务到清单. // 若 id 冲突返回 ErrTaskAlreadyExists (极低概率, ID 含随机后缀). // // 升华改进(ELEVATED): 返回完整 Task 对象 (含自动生成的 ID/CreatedAt/Version) -- // 调用方直接用返回值, 不需要 Get 一次. func (tl *TaskList) Add(ctx context.Context, subject, description string) (Task, error) { if subject == "" { return Task{}, fmt.Errorf("tasklist: subject is required") } id, err := generateTaskID() if err != nil { return Task{}, fmt.Errorf("tasklist: generate id: %w", err) } task := Task{ ID: id, Subject: subject, Description: description, Status: StatusPending, CreatedAt: time.Now(), Version: 1, } if err := tl.store.CAS(ctx, id, 0, task); err != nil { return Task{}, err } return task, nil } // List 返回清单内所有任务. func (tl *TaskList) List(ctx context.Context) ([]Task, error) { return tl.store.List(ctx) } // Get 读取单个任务. func (tl *TaskList) Get(ctx context.Context, id string) (Task, error) { return tl.store.Get(ctx, id) } // Claim 认领任务. 只有 Pending 状态的任务可被认领. // // 并发安全: 两个 Agent 并发 Claim 同一任务, 只有一个成功, 另一个收到 // ErrConcurrentModification (可重试 List + Claim 另一个). func (tl *TaskList) Claim(ctx context.Context, taskID, by string) (Task, error) { if by == "" { return Task{}, fmt.Errorf("tasklist: by (claimer name) is required") } current, err := tl.store.Get(ctx, taskID) if err != nil { return Task{}, err } if current.Status != StatusPending { return Task{}, fmt.Errorf("%w (current status: %s)", ErrNotClaimable, current.Status) } updated := current updated.Status = StatusClaimed updated.ClaimedBy = by updated.ClaimedAt = time.Now() updated.Version++ if err := tl.store.CAS(ctx, taskID, current.Version, updated); err != nil { return Task{}, err } return updated, nil } // Complete 标记任务为完成, 附带结果文本. // // 可从 Claimed / Pending 状态直接完成 (支持 Leader 直接标记, 不一定要先 Claim). // 已完成的任务再次调用返回 ErrAlreadyCompleted. func (tl *TaskList) Complete(ctx context.Context, taskID, result string) (Task, error) { current, err := tl.store.Get(ctx, taskID) if err != nil { return Task{}, err } if current.Status == StatusCompleted { return Task{}, ErrAlreadyCompleted } updated := current updated.Status = StatusCompleted updated.CompletedAt = time.Now() updated.Result = result updated.Version++ if err := tl.store.CAS(ctx, taskID, current.Version, updated); err != nil { return Task{}, err } return updated, nil } // Fail 标记任务为失败, 附带失败原因. // // 失败的任务默认不可被重新 Claim (Status != Pending), 消费层可自定义 // 策略在业务代码里调 Fail → Reopen (手动改状态). func (tl *TaskList) Fail(ctx context.Context, taskID, reason string) (Task, error) { current, err := tl.store.Get(ctx, taskID) if err != nil { return Task{}, err } if current.Status == StatusCompleted { return Task{}, ErrAlreadyCompleted } updated := current updated.Status = StatusFailed updated.FailReason = reason updated.Version++ if err := tl.store.CAS(ctx, taskID, current.Version, updated); err != nil { return Task{}, err } return updated, nil } // Close 关闭底层 Store. func (tl *TaskList) Close() error { return tl.store.Close() } // generateTaskID 生成任务 ID: 纳秒时间戳 + 随机 4 字节 hex (对齐 inbox.Message ID 风格). // // 精妙之处(CLEVER): 时间戳前缀 = ID 自带时序, List 返回按 ID 排序近似按创建时间排序, // 调试友好. func generateTaskID() (string, error) { buf := make([]byte, 4) if _, err := rand.Read(buf); err != nil { return "", err } return fmt.Sprintf("task-%d-%s", time.Now().UnixNano(), hex.EncodeToString(buf)), nil }