tasklist

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: None detected not legal advice Imports: 0 Imported by: 0

Documentation

Overview

Package tasklist provides the task list abstraction used by Agent Teams for work assignment, progress tracking, and result aggregation. The Store interface is consumer-pluggable so the same task list semantics work over in-memory (CLI / test), filesystem (single-process daemon), or remote (SaaS multi-tenant) backends.

API Consumption Shapes

Store spans two Flyto API shapes (see `docs/api-reference.md` section "API 消费形态 / API Consumption Patterns"):

  • Mutations (Create / Update / Delete / Assign / etc.): form 3 (synchronous callback). Engine / Leader SubAgent calls synchronously while orchestrating the team; the Store implementation persists and returns error on failure.
  • Queries (Get / List / FindByStatus): form 2 (pull). Consumer reads current state for UI rendering, leader-side scheduling decisions, or audit.

Package tasklist 提供 Agent Teams 的任务列表抽象: 工作分配 / 进度追踪 / 结果聚合. Store 接口面向消费者可插拔, 同一套任务列表语义可跑在内存 (CLI / 测试) / 文件系统 (单进程 daemon) / 远端 (SaaS 多租户) 后端上.

API 消费形态

Store 跨越两种 Flyto API 形态 (见 `docs/api-reference.md` "API 消费形态 / API Consumption Patterns" 章节):

  • 写 (Create / Update / Delete / Assign 等): 形态三 (同步回调). 引擎 / Leader SubAgent 在编排 team 时同步调; Store 实现持久化, 失败返 error.
  • 读 (Get / List / FindByStatus): 形态二 (调取 pull). 消费者读当前状态 供 UI 渲染 / leader 调度决策 / 审计.

Package tasklist 实现 Agent Teams 的 shared task list 业务层.

模块定位:

TaskList 让多个 Agent (Worker 或 Leader) 共享一张任务清单, 实现:
- 列出待办 (List): 看板视角
- 添加任务 (Add): 任意 Agent 可发布新任务到共享队列
- 抢单 (Claim): Worker 主动认领待办, 原子 CAS 避免重复抢
- 完成 (Complete): 记录结果, 驱动 TaskCompleted 事件 (Observer 订阅)
- 标记失败 (Fail): 记录失败原因, 支持其他 Worker 重新抢单

分层设计 (业务层 + 存储层):

TaskList (业务层, 一份实现): 管 claim 状态机, 走 Store.CAS 原子更新.
Store (存储层, 多实现): 只负责 Get/CAS/List.
- MemoryStore: 同进程 dogfood / 测试, 零外部依赖.
- MarkdownStore (另见 markdown_store.go): 文件 + flock, 对齐 Anthropic
  Claude Code v2.1.32 tasks.md 格式, 实现双向互操作.
- CustomStore: 消费层实现 Store 接口, 接入金融 DB / 医疗合规存储 /
  仓储 WMS / Redis 等. 引擎层零改动.

核心设计决策:

  1. 业务逻辑单一 (vs. "每个实现自带 claim 状态机"): Claim 语义由 TaskList.Claim 唯一控制, Store 只保证原子 CAS -- 所有后端 (memory / markdown / DB) 行为一致. 精妙之处(CLEVER): 业务和存储分层对齐 database/sql + driver 标准模式, Go 生态熟悉, 消费层零学习成本. 替代方案: <每个实现内置 claim> - 否决: 行为漂移, Markdown 实现可能 和 Memory 实现微妙不一致.
  2. CAS 原子性 (vs. "把锁放在 TaskList 层"): 不同 backend 的并发原语不同 (memory: mutex / markdown: flock / DB: transaction), 由 Store 自己实现 CAS 契约, TaskList 只调用. 跨行业扩展: 金融客户的 DB 事务原子性比文件锁强, 接 PostgresStore 能复用 Flyto 全套业务逻辑, 同时享受 DB 级事务保证.
  3. Version 字段用 int (vs. "timestamp / hash"): int 递增简单可预测, 并发 CAS 不丢更新; timestamp 有 skew 风险, hash 需要内容哈希额外 CPU. MVP 简单够用.

跨行业对照:

领域       | 任务示例                     | 典型 Store 选择
-----------|------------------------------|------------------
编程       | "写单测 / code review"       | MarkdownStore (互操作 Anthropic)
金融       | "分析股票 / 风控审核"        | CustomStore(PostgreSQL)
医疗       | "诊断影像 / 审方"            | CustomStore(HIPAA 加密存储)
仓储       | "波次 / 拣货 / 盘点"         | CustomStore(WMS DB 集成)
法律       | "合同审阅 / 案例检索"        | CustomStore(事务所文档管理)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskNotFound 任务不存在.
	ErrTaskNotFound = errors.New("tasklist: task not found")
	// ErrTaskAlreadyExists 任务 ID 冲突 (Add 时).
	ErrTaskAlreadyExists = errors.New("tasklist: task already exists")
	// ErrConcurrentModification 乐观锁冲突, 应重试.
	ErrConcurrentModification = errors.New("tasklist: concurrent modification (version mismatch)")
	// ErrNotClaimable 任务当前状态不可认领.
	ErrNotClaimable = errors.New("tasklist: task is not in a claimable state")
	// ErrAlreadyCompleted 任务已完成, 不可再操作.
	ErrAlreadyCompleted = errors.New("tasklist: task already completed")
)

错误定义.

Functions

This section is empty.

Types

type MarkdownStore

type MarkdownStore struct {
	// contains filtered or unexported fields
}

MarkdownStore 是文件持久化的 Store 实现.

func NewMarkdownStore

func NewMarkdownStore(path string) *MarkdownStore

NewMarkdownStore 构造 MarkdownStore. 文件不存在会在第一次 CAS 时创建. 目录必须已存在 (错误时第一次写入抛 I/O error).

使用示例:

store := tasklist.NewMarkdownStore("/home/user/.flyto/teams/default/tasks.md")
tl := tasklist.New(store)

func (*MarkdownStore) CAS

func (s *MarkdownStore) CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error

CAS 原子比较并交换.

func (*MarkdownStore) Close

func (s *MarkdownStore) Close() error

Close 无操作 (每次 CAS 自行 open/close fd). 返回 nil.

func (*MarkdownStore) Get

func (s *MarkdownStore) Get(ctx context.Context, id string) (Task, error)

Get 读取任务.

func (*MarkdownStore) List

func (s *MarkdownStore) List(ctx context.Context) ([]Task, error)

List 返回所有任务 (按 CreatedAt 升序).

func (*MarkdownStore) Path

func (s *MarkdownStore) Path() string

Path 返回底层文件路径 (调试 / 文档).

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore 是 tasklist.Store 的同进程实现.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore 创建空的内存 Store.

func (*MemoryStore) CAS

func (s *MemoryStore) CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error

CAS 原子比较并交换.

expectedVersion == 0 语义: 新增任务, 仅当 id 不存在时成功. 否则: 现有 Version 必须等于 expectedVersion 才写入, 不等返回 ErrConcurrentModification.

func (*MemoryStore) Close

func (s *MemoryStore) Close() error

Close 无操作返回 nil (内存 Store 无需清理).

func (*MemoryStore) Get

func (s *MemoryStore) Get(ctx context.Context, id string) (Task, error)

Get 读取任务. 不存在返回 ErrTaskNotFound.

func (*MemoryStore) List

func (s *MemoryStore) List(ctx context.Context) ([]Task, error)

List 返回所有任务 (按 CreatedAt 升序).

精妙之处(CLEVER): 返回切片副本而非 map values -- 调用方可自由排序 / 过滤, 不影响内部状态; map 遍历顺序不稳定, 副本排序后语义明确.

type Store

type Store interface {
	// Get 读取指定任务. 不存在返回 ErrTaskNotFound.
	Get(ctx context.Context, id string) (Task, error)

	// CAS 原子"比较并交换": 如果 id 对应任务的当前 Version 等于 expectedVersion,
	// 写入 newTask; 否则返回 ErrConcurrentModification.
	//
	// 特殊语义: expectedVersion == 0 表示"新增" -- 仅当 id 不存在时写入,
	// 已存在则返回 ErrTaskAlreadyExists.
	//
	// 原子性由实现保证: MemoryStore 走 mutex, MarkdownStore 走 flock,
	// DBStore 走 transaction.
	CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error

	// List 返回所有任务 (顺序由实现决定, 建议按 CreatedAt 升序).
	List(ctx context.Context) ([]Task, error)

	// Close 释放资源 (文件句柄 / 数据库连接等). 幂等.
	Close() error
}

Store 是任务存储层接口.

升华改进(ELEVATED): 接口只定义 CAS 原子性契约, 不定义 claim 业务逻辑 -- business/storage 分层, 消费层接新 Store 只需实现 4 方法 (Get/CAS/List/Close). 替代方案: <把 Claim 方法放在 Store 上> - 否决: 每个实现都要重写状态机, 行为漂移风险高.

type Task

type Task struct {
	// ID 唯一标识符.
	ID string `json:"id"`
	// Subject 短标题 (1-80 字).
	Subject string `json:"subject"`
	// Description 详细描述.
	Description string `json:"description"`
	// Status 当前状态.
	Status TaskStatus `json:"status"`
	// ClaimedBy 认领者名 (agent 名或其他标识).
	ClaimedBy string `json:"claimed_by,omitempty"`
	// Result 完成结果 (纯文本, 消费层可约定格式).
	Result string `json:"result,omitempty"`
	// FailReason 失败原因.
	FailReason string `json:"fail_reason,omitempty"`
	// CreatedAt 创建时间.
	CreatedAt time.Time `json:"created_at"`
	// ClaimedAt 认领时间 (未认领为零值).
	ClaimedAt time.Time `json:"claimed_at,omitempty"`
	// CompletedAt 完成时间 (未完成为零值).
	CompletedAt time.Time `json:"completed_at,omitempty"`
	// Version 乐观并发控制版本号, 每次修改 +1.
	//
	// 精妙之处(CLEVER): Store.CAS 用 Version 做乐观锁 --
	// 读 → 业务计算 → 写 的模式下, 两个 Agent 并发 Claim 同一任务,
	// 只有一个 CAS 成功, 另一个收到 ErrConcurrentModification, 重试
	// 或放弃, 避免重复认领.
	Version int `json:"version"`
}

Task 是共享任务清单中的一项.

升华改进(ELEVATED): 字段覆盖编程/金融/医疗/仓储场景共性需求 -- Subject (短标题) + Description (详情) 是所有任务系统通用结构; ClaimedBy 可为 agent 名/员工工号/系统账号, 跨行业中立; Result 是纯文本, 消费层可存 JSON/XML/富文本, 引擎不限制.

type TaskList

type TaskList struct {
	// contains filtered or unexported fields
}

TaskList 是任务清单业务层, 管 claim/complete/fail 状态机.

精妙之处(CLEVER): 结构体只持有 Store 引用, 无其他状态 -- 并发安全完全委托给 Store.CAS; TaskList 方法是纯业务逻辑, 易测试.

func New

func New(store Store) *TaskList

New 构造 TaskList.

func (*TaskList) Add

func (tl *TaskList) Add(ctx context.Context, subject, description string) (Task, error)

Add 添加新任务到清单. 若 id 冲突返回 ErrTaskAlreadyExists (极低概率, ID 含随机后缀).

升华改进(ELEVATED): 返回完整 Task 对象 (含自动生成的 ID/CreatedAt/Version) -- 调用方直接用返回值, 不需要 Get 一次.

func (*TaskList) Claim

func (tl *TaskList) Claim(ctx context.Context, taskID, by string) (Task, error)

Claim 认领任务. 只有 Pending 状态的任务可被认领.

并发安全: 两个 Agent 并发 Claim 同一任务, 只有一个成功, 另一个收到 ErrConcurrentModification (可重试 List + Claim 另一个).

func (*TaskList) Close

func (tl *TaskList) Close() error

Close 关闭底层 Store.

func (*TaskList) Complete

func (tl *TaskList) Complete(ctx context.Context, taskID, result string) (Task, error)

Complete 标记任务为完成, 附带结果文本.

可从 Claimed / Pending 状态直接完成 (支持 Leader 直接标记, 不一定要先 Claim). 已完成的任务再次调用返回 ErrAlreadyCompleted.

func (*TaskList) Fail

func (tl *TaskList) Fail(ctx context.Context, taskID, reason string) (Task, error)

Fail 标记任务为失败, 附带失败原因.

失败的任务默认不可被重新 Claim (Status != Pending), 消费层可自定义 策略在业务代码里调 Fail → Reopen (手动改状态).

func (*TaskList) Get

func (tl *TaskList) Get(ctx context.Context, id string) (Task, error)

Get 读取单个任务.

func (*TaskList) List

func (tl *TaskList) List(ctx context.Context) ([]Task, error)

List 返回清单内所有任务.

func (*TaskList) Store

func (tl *TaskList) Store() Store

Store 返回底层 Store 引用 (仅供高级用例, 消费层通常不需要直接访问).

type TaskStatus

type TaskStatus string

TaskStatus 是任务状态枚举.

const (
	// StatusPending 新建, 等待认领.
	StatusPending TaskStatus = "pending"
	// StatusClaimed 已被某 Worker 认领, 正在进行.
	StatusClaimed TaskStatus = "claimed"
	// StatusCompleted 成功完成, 有结果.
	StatusCompleted TaskStatus = "completed"
	// StatusFailed 失败, 有失败原因.
	//
	// 精妙之处(CLEVER): Failed 不等于删除 -- 允许其他 Worker 查看失败原因,
	// 通过 Claim 重新领取 (claim 会校验状态, 默认 Failed 不可重领, 消费层
	// 可自定义策略).
	StatusFailed TaskStatus = "failed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL