Documentation
¶
Overview ¶
Package tasklist provides the task list abstraction used by Agent Teams for work assignment, progress tracking, and result aggregation. The Store interface is consumer-pluggable so the same task list semantics work over in-memory (CLI / test), filesystem (single-process daemon), or remote (SaaS multi-tenant) backends.
API Consumption Shapes ¶
Store spans two Flyto API shapes (see `docs/api-reference.md` section "API 消费形态 / API Consumption Patterns"):
- Mutations (Create / Update / Delete / Assign / etc.): form 3 (synchronous callback). Engine / Leader SubAgent calls synchronously while orchestrating the team; the Store implementation persists and returns error on failure.
- Queries (Get / List / FindByStatus): form 2 (pull). Consumer reads current state for UI rendering, leader-side scheduling decisions, or audit.
Package tasklist 提供 Agent Teams 的任务列表抽象: 工作分配 / 进度追踪 / 结果聚合. Store 接口面向消费者可插拔, 同一套任务列表语义可跑在内存 (CLI / 测试) / 文件系统 (单进程 daemon) / 远端 (SaaS 多租户) 后端上.
API 消费形态 ¶
Store 跨越两种 Flyto API 形态 (见 `docs/api-reference.md` "API 消费形态 / API Consumption Patterns" 章节):
- 写 (Create / Update / Delete / Assign 等): 形态三 (同步回调). 引擎 / Leader SubAgent 在编排 team 时同步调; Store 实现持久化, 失败返 error.
- 读 (Get / List / FindByStatus): 形态二 (调取 pull). 消费者读当前状态 供 UI 渲染 / leader 调度决策 / 审计.
Package tasklist 实现 Agent Teams 的 shared task list 业务层.
模块定位:
TaskList 让多个 Agent (Worker 或 Leader) 共享一张任务清单, 实现: - 列出待办 (List): 看板视角 - 添加任务 (Add): 任意 Agent 可发布新任务到共享队列 - 抢单 (Claim): Worker 主动认领待办, 原子 CAS 避免重复抢 - 完成 (Complete): 记录结果, 驱动 TaskCompleted 事件 (Observer 订阅) - 标记失败 (Fail): 记录失败原因, 支持其他 Worker 重新抢单
分层设计 (业务层 + 存储层):
TaskList (业务层, 一份实现): 管 claim 状态机, 走 Store.CAS 原子更新. Store (存储层, 多实现): 只负责 Get/CAS/List. - MemoryStore: 同进程 dogfood / 测试, 零外部依赖. - MarkdownStore (另见 markdown_store.go): 文件 + flock, 对齐 Anthropic Claude Code v2.1.32 tasks.md 格式, 实现双向互操作. - CustomStore: 消费层实现 Store 接口, 接入金融 DB / 医疗合规存储 / 仓储 WMS / Redis 等. 引擎层零改动.
核心设计决策:
- 业务逻辑单一 (vs. "每个实现自带 claim 状态机"): Claim 语义由 TaskList.Claim 唯一控制, Store 只保证原子 CAS -- 所有后端 (memory / markdown / DB) 行为一致. 精妙之处(CLEVER): 业务和存储分层对齐 database/sql + driver 标准模式, Go 生态熟悉, 消费层零学习成本. 替代方案: <每个实现内置 claim> - 否决: 行为漂移, Markdown 实现可能 和 Memory 实现微妙不一致.
- CAS 原子性 (vs. "把锁放在 TaskList 层"): 不同 backend 的并发原语不同 (memory: mutex / markdown: flock / DB: transaction), 由 Store 自己实现 CAS 契约, TaskList 只调用. 跨行业扩展: 金融客户的 DB 事务原子性比文件锁强, 接 PostgresStore 能复用 Flyto 全套业务逻辑, 同时享受 DB 级事务保证.
- Version 字段用 int (vs. "timestamp / hash"): int 递增简单可预测, 并发 CAS 不丢更新; timestamp 有 skew 风险, hash 需要内容哈希额外 CPU. MVP 简单够用.
跨行业对照:
领域 | 任务示例 | 典型 Store 选择 -----------|------------------------------|------------------ 编程 | "写单测 / code review" | MarkdownStore (互操作 Anthropic) 金融 | "分析股票 / 风控审核" | CustomStore(PostgreSQL) 医疗 | "诊断影像 / 审方" | CustomStore(HIPAA 加密存储) 仓储 | "波次 / 拣货 / 盘点" | CustomStore(WMS DB 集成) 法律 | "合同审阅 / 案例检索" | CustomStore(事务所文档管理)
Index ¶
- Variables
- type MarkdownStore
- func (s *MarkdownStore) CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error
- func (s *MarkdownStore) Close() error
- func (s *MarkdownStore) Get(ctx context.Context, id string) (Task, error)
- func (s *MarkdownStore) List(ctx context.Context) ([]Task, error)
- func (s *MarkdownStore) Path() string
- type MemoryStore
- type Store
- type Task
- type TaskList
- func (tl *TaskList) Add(ctx context.Context, subject, description string) (Task, error)
- func (tl *TaskList) Claim(ctx context.Context, taskID, by string) (Task, error)
- func (tl *TaskList) Close() error
- func (tl *TaskList) Complete(ctx context.Context, taskID, result string) (Task, error)
- func (tl *TaskList) Fail(ctx context.Context, taskID, reason string) (Task, error)
- func (tl *TaskList) Get(ctx context.Context, id string) (Task, error)
- func (tl *TaskList) List(ctx context.Context) ([]Task, error)
- func (tl *TaskList) Store() Store
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskNotFound 任务不存在. ErrTaskNotFound = errors.New("tasklist: task not found") // ErrTaskAlreadyExists 任务 ID 冲突 (Add 时). ErrTaskAlreadyExists = errors.New("tasklist: task already exists") // ErrConcurrentModification 乐观锁冲突, 应重试. ErrConcurrentModification = errors.New("tasklist: concurrent modification (version mismatch)") // ErrNotClaimable 任务当前状态不可认领. ErrNotClaimable = errors.New("tasklist: task is not in a claimable state") // ErrAlreadyCompleted 任务已完成, 不可再操作. ErrAlreadyCompleted = errors.New("tasklist: task already completed") )
错误定义.
Functions ¶
This section is empty.
Types ¶
type MarkdownStore ¶
type MarkdownStore struct {
// contains filtered or unexported fields
}
MarkdownStore 是文件持久化的 Store 实现.
func NewMarkdownStore ¶
func NewMarkdownStore(path string) *MarkdownStore
NewMarkdownStore 构造 MarkdownStore. 文件不存在会在第一次 CAS 时创建. 目录必须已存在 (错误时第一次写入抛 I/O error).
使用示例:
store := tasklist.NewMarkdownStore("/home/user/.flyto/teams/default/tasks.md")
tl := tasklist.New(store)
func (*MarkdownStore) CAS ¶
func (s *MarkdownStore) CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error
CAS 原子比较并交换.
func (*MarkdownStore) Close ¶
func (s *MarkdownStore) Close() error
Close 无操作 (每次 CAS 自行 open/close fd). 返回 nil.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore 是 tasklist.Store 的同进程实现.
func (*MemoryStore) CAS ¶
CAS 原子比较并交换.
expectedVersion == 0 语义: 新增任务, 仅当 id 不存在时成功. 否则: 现有 Version 必须等于 expectedVersion 才写入, 不等返回 ErrConcurrentModification.
type Store ¶
type Store interface {
// Get 读取指定任务. 不存在返回 ErrTaskNotFound.
Get(ctx context.Context, id string) (Task, error)
// CAS 原子"比较并交换": 如果 id 对应任务的当前 Version 等于 expectedVersion,
// 写入 newTask; 否则返回 ErrConcurrentModification.
//
// 特殊语义: expectedVersion == 0 表示"新增" -- 仅当 id 不存在时写入,
// 已存在则返回 ErrTaskAlreadyExists.
//
// 原子性由实现保证: MemoryStore 走 mutex, MarkdownStore 走 flock,
// DBStore 走 transaction.
CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error
// List 返回所有任务 (顺序由实现决定, 建议按 CreatedAt 升序).
List(ctx context.Context) ([]Task, error)
// Close 释放资源 (文件句柄 / 数据库连接等). 幂等.
Close() error
}
Store 是任务存储层接口.
升华改进(ELEVATED): 接口只定义 CAS 原子性契约, 不定义 claim 业务逻辑 -- business/storage 分层, 消费层接新 Store 只需实现 4 方法 (Get/CAS/List/Close). 替代方案: <把 Claim 方法放在 Store 上> - 否决: 每个实现都要重写状态机, 行为漂移风险高.
type Task ¶
type Task struct {
// ID 唯一标识符.
ID string `json:"id"`
// Subject 短标题 (1-80 字).
Subject string `json:"subject"`
// Description 详细描述.
Description string `json:"description"`
// Status 当前状态.
Status TaskStatus `json:"status"`
// ClaimedBy 认领者名 (agent 名或其他标识).
ClaimedBy string `json:"claimed_by,omitempty"`
// Result 完成结果 (纯文本, 消费层可约定格式).
Result string `json:"result,omitempty"`
// FailReason 失败原因.
FailReason string `json:"fail_reason,omitempty"`
// CreatedAt 创建时间.
CreatedAt time.Time `json:"created_at"`
// ClaimedAt 认领时间 (未认领为零值).
ClaimedAt time.Time `json:"claimed_at,omitempty"`
// CompletedAt 完成时间 (未完成为零值).
CompletedAt time.Time `json:"completed_at,omitempty"`
// Version 乐观并发控制版本号, 每次修改 +1.
//
// 精妙之处(CLEVER): Store.CAS 用 Version 做乐观锁 --
// 读 → 业务计算 → 写 的模式下, 两个 Agent 并发 Claim 同一任务,
// 只有一个 CAS 成功, 另一个收到 ErrConcurrentModification, 重试
// 或放弃, 避免重复认领.
Version int `json:"version"`
}
Task 是共享任务清单中的一项.
升华改进(ELEVATED): 字段覆盖编程/金融/医疗/仓储场景共性需求 -- Subject (短标题) + Description (详情) 是所有任务系统通用结构; ClaimedBy 可为 agent 名/员工工号/系统账号, 跨行业中立; Result 是纯文本, 消费层可存 JSON/XML/富文本, 引擎不限制.
type TaskList ¶
type TaskList struct {
// contains filtered or unexported fields
}
TaskList 是任务清单业务层, 管 claim/complete/fail 状态机.
精妙之处(CLEVER): 结构体只持有 Store 引用, 无其他状态 -- 并发安全完全委托给 Store.CAS; TaskList 方法是纯业务逻辑, 易测试.
func (*TaskList) Add ¶
Add 添加新任务到清单. 若 id 冲突返回 ErrTaskAlreadyExists (极低概率, ID 含随机后缀).
升华改进(ELEVATED): 返回完整 Task 对象 (含自动生成的 ID/CreatedAt/Version) -- 调用方直接用返回值, 不需要 Get 一次.
func (*TaskList) Claim ¶
Claim 认领任务. 只有 Pending 状态的任务可被认领.
并发安全: 两个 Agent 并发 Claim 同一任务, 只有一个成功, 另一个收到 ErrConcurrentModification (可重试 List + Claim 另一个).
func (*TaskList) Complete ¶
Complete 标记任务为完成, 附带结果文本.
可从 Claimed / Pending 状态直接完成 (支持 Leader 直接标记, 不一定要先 Claim). 已完成的任务再次调用返回 ErrAlreadyCompleted.
func (*TaskList) Fail ¶
Fail 标记任务为失败, 附带失败原因.
失败的任务默认不可被重新 Claim (Status != Pending), 消费层可自定义 策略在业务代码里调 Fail → Reopen (手动改状态).
type TaskStatus ¶
type TaskStatus string
TaskStatus 是任务状态枚举.
const ( // StatusPending 新建, 等待认领. StatusPending TaskStatus = "pending" // StatusClaimed 已被某 Worker 认领, 正在进行. StatusClaimed TaskStatus = "claimed" // StatusCompleted 成功完成, 有结果. StatusCompleted TaskStatus = "completed" // StatusFailed 失败, 有失败原因. // // 精妙之处(CLEVER): Failed 不等于删除 -- 允许其他 Worker 查看失败原因, // 通过 Claim 重新领取 (claim 会校验状态, 默认 Failed 不可重领, 消费层 // 可自定义策略). StatusFailed TaskStatus = "failed" )