package tasklist // markdown_store.go 实现基于 markdown 文件 + flock 的 Store. // // 模块定位: // // MarkdownStore 是 tasklist.Store 接口的文件持久化实现, 对齐 Anthropic // Claude Code Agent Teams (2026-02 研究预览版) 的 tasks.md 文件协议, // 实现双向互操作: // - Flyto 写的 tasks.md 可被 Anthropic Claude Code 读取理解 // - Anthropic Claude Code 写的 tasks.md 可被 Flyto 读取继续处理 // // 跨行业定位: // // Markdown 不是编程独占 -- 广泛用于 Notion / Confluence / 会议纪要 / 法律 // 草稿 / 病历模板等. 本 Store 的价值是"人类可读文件 + 多工具兼容": // - 编程场景: 开发者直接 cat / vim / code tasks.md 查看 // - 法律场景: 律师用 Obsidian / Typora 可视化协同 // - 产品经理: Notion 导入导出 markdown 做任务看板 // // 不适合的场景 (应选 CustomStore 自行实现 Store 接口): // - 医疗合规 (HIPAA): 需加密存储, 明文 markdown 违规 // - 金融审计: 需事务型数据库 + 审计表, 文件系统无审计粒度 // - 高并发 SaaS: 文件锁争用瓶颈, 应走 Redis / PostgreSQL // // 核心设计决策: // 1. 每次操作全文读写 + flock (vs. append-only 日志): // CAS 需要基于最新状态判断 Version, append 模式难以原子拼读-改-写. // Full read-modify-write 简单可靠, 代价是 O(N) 任务时写放大. // MVP 场景任务数通常 <1000, 开销可接受. // 精妙之处(CLEVER): 每个操作成为一个"事务" (flock 时间窗口内), // 异常中断最多丢一次写; 已写入的文件内容始终一致. // 2. HTML 注释存元数据 (vs. YAML frontmatter / 隐藏字段): // - Anthropic Claude Code 用 HTML 注释存元数据 (兼容性第一) // - Markdown 渲染器忽略 HTML 注释, 不破坏人类阅读体验 // - 键值对紧凑 (task:ID status:X version:N), 解析简单 // 替代方案: - 否决: 对齐 Anthropic 优先. // 3. 状态 → checkbox 字符映射 (GitHub 习惯 + Anthropic 对齐): // pending → [ ] // claimed → [~] (Anthropic-style in-progress) // completed → [x] // failed → [!] // 精妙之处(CLEVER): 四种状态一眼可见, 人类浏览 tasks.md 不需要解析 // HTML 注释就能判断进度. // 4. flock advisory lock (vs. os.Rename 原子替换): // advisory flock 是 Linux 标准文件锁, 支持同进程/跨进程协调; // os.Rename 原子但无法阻塞读取方, 并发读可能读到中间态. // flock 的 lost update 风险很低 (与 Version CAS 叠加双重保护). // 替代方案: - 否决: 额外文件管理成本. // // 与 Anthropic 的格式差异 (文档化): // // Flyto 和 Anthropic Claude Code 在 MVP 阶段用同一套 HTML 注释 + checkbox, // 但 Flyto 元数据字段可能多出 flyto-version 等 Flyto 专属字段. Anthropic // 读取时会忽略未知字段, 不影响互操作. 反之 Flyto 读 Anthropic 写的文件, // 未识别字段存到 Task.Raw (未实现, 预留扩展点). import ( "bufio" "context" "fmt" "os" "path/filepath" "regexp" "sort" "strconv" "strings" "sync" "syscall" "time" ) // markdownHeader 是生成 tasks.md 文件的标题行 (人类友好的提示). const markdownHeader = `# Team Shared Tasks _This file is managed by Flyto Agent Teams (shared task list). Direct editing is allowed but concurrent modifications may conflict with running Agents. Anthropic Claude Code v2.1.32+ can read/write this format interoperably._ ` // 状态 ↔ checkbox 字符映射. var ( statusToChar = map[TaskStatus]string{ StatusPending: " ", StatusClaimed: "~", StatusCompleted: "x", StatusFailed: "!", } charToStatus = map[string]TaskStatus{ " ": StatusPending, "~": StatusClaimed, "x": StatusCompleted, "X": StatusCompleted, // 兼容大写 "!": StatusFailed, } ) // MarkdownStore 是文件持久化的 Store 实现. type MarkdownStore struct { path string // mu 是进程内锁 (flock 是跨进程锁, 但同进程多 goroutine 仍需互斥). mu sync.Mutex } // NewMarkdownStore 构造 MarkdownStore. 文件不存在会在第一次 CAS 时创建. // 目录必须已存在 (错误时第一次写入抛 I/O error). // // 使用示例: // // store := tasklist.NewMarkdownStore("/home/user/.flyto/teams/default/tasks.md") // tl := tasklist.New(store) func NewMarkdownStore(path string) *MarkdownStore { return &MarkdownStore{path: path} } // Path 返回底层文件路径 (调试 / 文档). func (s *MarkdownStore) Path() string { return s.path } // Get 读取任务. func (s *MarkdownStore) Get(ctx context.Context, id string) (Task, error) { s.mu.Lock() defer s.mu.Unlock() tasks, _, err := s.readLocked(false) if err != nil { return Task{}, err } task, ok := tasks[id] if !ok { return Task{}, ErrTaskNotFound } return task, nil } // CAS 原子比较并交换. func (s *MarkdownStore) CAS(ctx context.Context, id string, expectedVersion int, newTask Task) error { s.mu.Lock() defer s.mu.Unlock() tasks, fd, err := s.readLocked(true) if err != nil { return err } defer closeFD(fd) current, exists := tasks[id] if expectedVersion == 0 { if exists { return ErrTaskAlreadyExists } tasks[id] = newTask } else { if !exists { return ErrTaskNotFound } if current.Version != expectedVersion { return ErrConcurrentModification } tasks[id] = newTask } return s.writeLocked(fd, tasks) } // List 返回所有任务 (按 CreatedAt 升序). func (s *MarkdownStore) List(ctx context.Context) ([]Task, error) { s.mu.Lock() defer s.mu.Unlock() tasks, _, err := s.readLocked(false) if err != nil { return nil, err } out := make([]Task, 0, len(tasks)) for _, t := range tasks { out = append(out, t) } sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt.Before(out[j].CreatedAt) }) return out, nil } // Close 无操作 (每次 CAS 自行 open/close fd). 返回 nil. func (s *MarkdownStore) Close() error { return nil } // readLocked 打开文件并 flock, 解析内容. // // exclusive=true 时获取写锁 (用于 CAS); false 时获取读锁 (Get/List). // 返回的 fd 在写模式下需要 caller 调 closeFD; 读模式下函数内部已关闭, 返回 nil fd. func (s *MarkdownStore) readLocked(exclusive bool) (map[string]Task, *os.File, error) { // 确保父目录存在 (CAS 第一次调用时可能还没创建). if exclusive { if err := os.MkdirAll(filepath.Dir(s.path), 0o755); err != nil { return nil, nil, fmt.Errorf("markdown store: mkdir parent: %w", err) } } flag := os.O_RDONLY if exclusive { flag = os.O_RDWR | os.O_CREATE } f, err := os.OpenFile(s.path, flag, 0o644) if err != nil { if !exclusive && os.IsNotExist(err) { // 文件不存在时, 视为空清单 (List / Get 操作友好降级) return make(map[string]Task), nil, nil } return nil, nil, fmt.Errorf("markdown store: open %s: %w", s.path, err) } lockOp := syscall.LOCK_SH if exclusive { lockOp = syscall.LOCK_EX } if err := syscall.Flock(int(f.Fd()), lockOp); err != nil { f.Close() return nil, nil, fmt.Errorf("markdown store: flock: %w", err) } tasks, err := parseMarkdown(f) if err != nil { unlockAndClose(f) return nil, nil, err } if !exclusive { unlockAndClose(f) return tasks, nil, nil } return tasks, f, nil } // writeLocked 写入完整 markdown 内容, 保持文件锁直到 close. func (s *MarkdownStore) writeLocked(f *os.File, tasks map[string]Task) error { if _, err := f.Seek(0, 0); err != nil { return fmt.Errorf("markdown store: seek: %w", err) } if err := f.Truncate(0); err != nil { return fmt.Errorf("markdown store: truncate: %w", err) } buf := renderMarkdown(tasks) if _, err := f.WriteString(buf); err != nil { return fmt.Errorf("markdown store: write: %w", err) } return f.Sync() } // closeFD 解锁并关闭 fd. func closeFD(f *os.File) { if f != nil { unlockAndClose(f) } } func unlockAndClose(f *os.File) { _ = syscall.Flock(int(f.Fd()), syscall.LOCK_UN) _ = f.Close() } // --- 解析 --- // metaLineRe 匹配元数据 HTML 注释行: // var metaLineRe = regexp.MustCompile(`^$`) var metaKVRe = regexp.MustCompile(`(\w+):(\S+)`) var checkboxLineRe = regexp.MustCompile(`^- \[([ x~!X])\]\s*(.*)$`) var descLineRe = regexp.MustCompile(`^>\s*(.*)$`) // parseMarkdown 解析文件内容为 tasks map. // // 精妙之处(CLEVER): 状态机式逐行解析 -- meta 注释 → checkbox → 描述三行 // 构成一条 task record. 遇到不匹配行跳过 (兼容手动编辑 + 未来 Anthropic // 可能新增的格式). func parseMarkdown(r *os.File) (map[string]Task, error) { tasks := make(map[string]Task) scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) var pendingMeta map[string]string var pendingID string flushPending := func(subject string) { if pendingID == "" || pendingMeta == nil { return } task := Task{ ID: pendingID, Subject: subject, Status: TaskStatus(pendingMeta["status"]), ClaimedBy: pendingMeta["claimed_by"], Result: unescapeMetaValue(pendingMeta["result"]), FailReason: unescapeMetaValue(pendingMeta["fail_reason"]), Description: unescapeMetaValue(pendingMeta["description"]), } if v, ok := pendingMeta["version"]; ok { n, _ := strconv.Atoi(v) task.Version = n } if v, ok := pendingMeta["created_at"]; ok { task.CreatedAt, _ = time.Parse(time.RFC3339Nano, v) } if v, ok := pendingMeta["claimed_at"]; ok { task.ClaimedAt, _ = time.Parse(time.RFC3339Nano, v) } if v, ok := pendingMeta["completed_at"]; ok { task.CompletedAt, _ = time.Parse(time.RFC3339Nano, v) } if task.Status == "" { task.Status = StatusPending } tasks[pendingID] = task pendingMeta = nil pendingID = "" } for scanner.Scan() { line := scanner.Text() if m := metaLineRe.FindStringSubmatch(line); m != nil { pendingID = m[1] pendingMeta = parseMetaKV(m[2]) continue } if m := checkboxLineRe.FindStringSubmatch(line); m != nil { if pendingID == "" { // checkbox without meta: infer minimal task (兼容 Anthropic / 手工) pendingID = inferTaskID(m[2]) pendingMeta = map[string]string{} } status := charToStatus[m[1]] if status == "" { status = StatusPending } if pendingMeta["status"] == "" { pendingMeta["status"] = string(status) } subject := strings.TrimSpace(m[2]) flushPending(subject) continue } } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("markdown store: scan: %w", err) } return tasks, nil } // parseMetaKV 把 "status:claimed claimed_by:worker-1 version:2" 解析成 map. // 元数据 value 不含空格; 含空格的值 (description/result) 用 \s escape. func parseMetaKV(s string) map[string]string { out := make(map[string]string) for _, m := range metaKVRe.FindAllStringSubmatch(s, -1) { out[m[1]] = m[2] } return out } // inferTaskID 给没有 meta 的 checkbox 生成一个 stub ID (导入 Anthropic 手写文件时用). func inferTaskID(subject string) string { h := strings.ReplaceAll(subject, " ", "-") if len(h) > 40 { h = h[:40] } return fmt.Sprintf("imported-%s-%d", h, time.Now().UnixNano()) } // unescapeMetaValue 把 \s → space, \n → newline. func unescapeMetaValue(v string) string { v = strings.ReplaceAll(v, `\s`, " ") v = strings.ReplaceAll(v, `\n`, "\n") return v } func escapeMetaValue(v string) string { v = strings.ReplaceAll(v, "\n", `\n`) v = strings.ReplaceAll(v, " ", `\s`) return v } // --- 渲染 --- // renderMarkdown 把 tasks 序列化为 markdown 文本. // // 精妙之处(CLEVER): 按 CreatedAt 排序输出 -- 文件内容可预测, 稳定的 diff // 利于 git 协作 / code review / 审计. func renderMarkdown(tasks map[string]Task) string { list := make([]Task, 0, len(tasks)) for _, t := range tasks { list = append(list, t) } sort.Slice(list, func(i, j int) bool { return list[i].CreatedAt.Before(list[j].CreatedAt) }) var b strings.Builder b.WriteString(markdownHeader) for _, t := range list { b.WriteString(renderTask(t)) b.WriteString("\n") } return b.String() } func renderTask(t Task) string { var b strings.Builder // 元数据注释行 b.WriteString("\n") // Checkbox 行 char := statusToChar[t.Status] if char == "" { char = " " } b.WriteString("- [") b.WriteString(char) b.WriteString("] ") b.WriteString(t.Subject) b.WriteString("\n") return b.String() } // 编译时接口检查. var _ Store = (*MarkdownStore)(nil)