package examples_test import ( "context" "encoding/json" "fmt" "log" "os" "sort" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/inbox" "git.flytoex.net/yuanwei/flyto-agent/pkg/tasklist" ) // Example_debate demonstrates Agent Teams peer-to-peer messaging primitives // without calling a real LLM. // // Two workers (proposer/critic) exchange messages through inbox.Router, // simulating a proposal debate. Replace agent names and content strings // to adapt to finance / medical / logistics / legal scenarios. func Example_debate() { ctx := context.Background() router := inbox.NewRouter() defer router.Close() const ( proposer = "proposer" critic = "critic" leader = "leader" ) mustSend(router, proposer, critic, "建议采用方案 A: Redis 做 task list. 低延迟, 成熟.") msg := mustRecvFrom(ctx, router, critic) fmt.Printf("[critic from=%s] %s\n", msg.From, payloadContent(msg)) mustSend(router, critic, proposer, "反对. Redis 非持久化, SaaS 需要审计表. 方案 B: PostgreSQL.") msg = mustRecvFrom(ctx, router, proposer) fmt.Printf("[proposer from=%s] %s\n", msg.From, payloadContent(msg)) mustSend(router, proposer, critic, "折中: Postgres 主存 + Redis 缓存, CDC 同步.") msg = mustRecvFrom(ctx, router, critic) fmt.Printf("[critic from=%s] %s\n", msg.From, payloadContent(msg)) mustSend(router, critic, leader, "达成折中方案, 请裁决.") msg = mustRecvFrom(ctx, router, leader) fmt.Printf("[leader from=%s] %s\n", msg.From, payloadContent(msg)) } // Example_tasklistMarkdown demonstrates MarkdownStore cross-tool interop. // // The tasks.md file format is compatible with Claude Code v2.1.32: // files written by Flyto can be read by Claude Code and vice versa. func Example_tasklistMarkdown() { ctx := context.Background() path := "/tmp/flyto_example_tasks.md" defer os.Remove(path) tl := tasklist.New(tasklist.NewMarkdownStore(path)) defer tl.Close() var ids []string for _, s := range []string{"分析日志找错误模式", "编写 login 模块单元测试"} { t, err := tl.Add(ctx, s, "") if err != nil { fmt.Println("error:", err) return } ids = append(ids, t.ID) fmt.Printf("[add] %s\n", t.Subject) } if _, err := tl.Claim(ctx, ids[0], "worker-A"); err != nil { fmt.Println("error:", err) return } fmt.Println("[worker-A claimed]") done, err := tl.Complete(ctx, ids[1], "8 edge cases covered") if err != nil { fmt.Println("error:", err) return } fmt.Printf("[worker-B completed] %s\n", done.Subject) } // Example_tasklistCustom demonstrates plugging in a custom Store backend // (finance compliance audit scenario). // // Implement tasklist.Store (Get / CAS / List / Close) to connect any // storage: PostgreSQL, HIPAA-encrypted store, WMS wave records, etc. func Example_tasklistCustom() { ctx := context.Background() store := newFinanceAuditStore() tl := tasklist.New(store) defer tl.Close() trade, err := tl.Add(ctx, "审核: 买入 AAPL 10000 股", "价格 $180, 客户 XYZ-001") if err != nil { fmt.Println("error:", err) return } if _, err := tl.Claim(ctx, trade.ID, "risk-analyst-03"); err != nil { fmt.Println("error:", err) return } done, err := tl.Complete(ctx, trade.ID, "批准: VaR=2.1% 在阈值内") if err != nil { fmt.Println("error:", err) return } fmt.Printf("completed: %s\n", done.Result) for i, e := range store.auditLog() { fmt.Printf("%d. op=%s\n", i+1, e.operation) } } // --- debate helpers --- func mustSend(router *inbox.Router, from, to, content string) { msg, err := inbox.NewMessage(from, to, inbox.MsgTaskAssignment, map[string]string{"content": content}) if err != nil { log.Fatal(err) } if err := router.Send(to, msg); err != nil { log.Fatal(err) } } func payloadContent(msg *inbox.Message) string { var p map[string]string if err := json.Unmarshal(msg.Payload, &p); err != nil { return string(msg.Payload) } return p["content"] } func mustRecvFrom(ctx context.Context, router *inbox.Router, to string) *inbox.Message { msg, err := router.Inbox(to).Recv(ctx) if err != nil { log.Fatalf("recv %s: %v", to, err) } return msg } // --- financeAuditStore: tasklist.Store implementation example --- type auditEntry struct { operation string taskID string } type financeAuditStore struct { mu sync.Mutex data map[string]tasklist.Task log []auditEntry } func newFinanceAuditStore() *financeAuditStore { return &financeAuditStore{data: make(map[string]tasklist.Task)} } func (s *financeAuditStore) Get(_ context.Context, id string) (tasklist.Task, error) { s.mu.Lock() defer s.mu.Unlock() t, ok := s.data[id] if !ok { return tasklist.Task{}, tasklist.ErrTaskNotFound } return t, nil } func (s *financeAuditStore) CAS(_ context.Context, id string, expectedVersion int, newTask tasklist.Task) error { s.mu.Lock() defer s.mu.Unlock() current, exists := s.data[id] if expectedVersion == 0 { if exists { return tasklist.ErrTaskAlreadyExists } s.data[id] = newTask s.log = append(s.log, auditEntry{"CREATE", id}) return nil } if !exists { return tasklist.ErrTaskNotFound } if current.Version != expectedVersion { return tasklist.ErrConcurrentModification } s.data[id] = newTask op := "UPDATE" switch newTask.Status { case tasklist.StatusClaimed: op = "CLAIM" case tasklist.StatusCompleted: op = "COMPLETE" case tasklist.StatusFailed: op = "FAIL" } s.log = append(s.log, auditEntry{op, id}) return nil } func (s *financeAuditStore) List(_ context.Context) ([]tasklist.Task, error) { s.mu.Lock() defer s.mu.Unlock() out := make([]tasklist.Task, 0, len(s.data)) for _, t := range s.data { out = append(out, t) } sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt.Before(out[j].CreatedAt) }) return out, nil } func (s *financeAuditStore) Close() error { return nil } func (s *financeAuditStore) auditLog() []auditEntry { return s.log } // compile-time interface check var _ tasklist.Store = (*financeAuditStore)(nil) // suppress unused import warning for time (used in auditEntry if extended) var _ = time.Now