package builtin // tasklist_tools.go 实现 Agent Teams shared task list 的 4 个内置工具. // // 模块定位: // // 让 Agent (Worker 或 Leader) 通过工具调用操作共享任务清单: // - add_shared_task 发布新任务到清单 (任意角色可用) // - list_shared_tasks 查看当前任务状态 (按状态过滤可选) // - claim_shared_task 认领 Pending 任务 (Worker 主动抢单) // - complete_shared_task 标记任务完成并附带结果 // // 命名约定: // // 所有工具带 "shared_" 前缀, 与引擎内部 Task 工具 (task_create / task_list / // task_update -- 见 builtin/task.go) 明确区分: // - task_* = 单 Agent 的本地待办 (仅当前 Agent 可见) // - shared_task_* = 多 Agent 共享的 Team 级清单 (TaskList 接口支持的存储后端) // // 核心设计决策: // 1. Context 注入 TaskList (对齐 send_message / AgentExecutor): // 每个 Worker 的 ctx 携带 Team 级 TaskList 引用, 工具从 ctx 读. // 非 Team 场景 ctx 无此值, 工具返回 "not in a Team" 错误. // 跨行业扩展: 金融/医疗/仓储客户用自己的 Store (PostgresStore / // HIPAAStore / WMSStore), TaskList 统一业务层一致调用. // 2. list_shared_tasks 默认不过滤: // Worker 通常想看"哪些可抢", 但过滤由模型决定 (LLM 灵活组合), // 工具输出全量 + status 过滤参数. 避免隐藏信息导致模型误判. // 3. 返回结构化 JSON (vs. 自然语言文本): // 工具输出含 {id, subject, status, claimed_by, ...} JSON, 模型可解析; // LLM 对 JSON 的处理能力比纯文本列表更稳定, 支持后续程序化回调. // 4. claim 失败不致命: // 并发冲突 (ErrConcurrentModification) 返回提示"请重试或选其他任务", // 模型可再调 list_shared_tasks 后选另一个; 不 panic / 不终止. import ( "context" "encoding/json" "fmt" "strings" "git.flytoex.net/yuanwei/flyto-agent/pkg/tasklist" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // TaskListProvider 是 tasklist 工具的后端接口. // engine 包通过 context 注入实际 TaskList 实例. type TaskListProvider interface { SharedTaskList() *tasklist.TaskList // ActorName 返回当前调用方的 agent 名 (用于 Claim by 字段). ActorName() string } // taskListProviderKey 是 context 注入 provider 的 key. type taskListProviderKey struct{} // WithTaskListProvider 在 ctx 中注入 TaskListProvider. // 由 engine.Team 在 runWorker / ContextForLeader 中调用. func WithTaskListProvider(ctx context.Context, provider TaskListProvider) context.Context { return context.WithValue(ctx, taskListProviderKey{}, provider) } // taskListProviderFromContext 从 ctx 读取 provider. func taskListProviderFromContext(ctx context.Context) TaskListProvider { v := ctx.Value(taskListProviderKey{}) if v == nil { return nil } p, _ := v.(TaskListProvider) return p } // --- add_shared_task --- type addSharedTaskInput struct { Subject string `json:"subject"` Description string `json:"description,omitempty"` } type addSharedTaskTool struct{} // NewAddSharedTaskTool 创建 add_shared_task 工具. func NewAddSharedTaskTool() tools.Tool { return &addSharedTaskTool{} } func (t *addSharedTaskTool) Name() string { return "add_shared_task" } func (t *addSharedTaskTool) Description(ctx context.Context) string { return `Add a task to the Team's shared task list (Agent Teams coordination). Use this to publish a new task that any teammate can claim and work on. Parameters: - subject (string, required): Short task title (1-80 chars) - description (string, optional): Detailed description / requirements Returns JSON: {"id": "...", "subject": "...", "status": "pending", "version": 1} If no Team context is active, returns an error.` } func (t *addSharedTaskTool) InputSchema() json.RawMessage { return json.RawMessage(`{ "type": "object", "properties": { "subject": {"type": "string", "description": "Short task title"}, "description": {"type": "string", "description": "Detailed description"} }, "required": ["subject"] }`) } func (t *addSharedTaskTool) Execute(ctx context.Context, input json.RawMessage, progress tools.ProgressFunc) (*tools.Result, error) { tl, err := getTaskList(ctx) if err != nil { return errorResult(err), nil } var req addSharedTaskInput if err := json.Unmarshal(input, &req); err != nil { return errorResult(fmt.Errorf("invalid input: %w", err)), nil } if strings.TrimSpace(req.Subject) == "" { return errorResult(fmt.Errorf("subject is required")), nil } task, err := tl.Add(ctx, req.Subject, req.Description) if err != nil { return errorResult(err), nil } return taskResult(task), nil } func (t *addSharedTaskTool) Metadata() tools.Metadata { return tools.Metadata{ ConcurrencySafe: true, AuditOperation: "write", PermissionClass: "generic", SearchHint: "add team task", } } // --- list_shared_tasks --- type listSharedTasksInput struct { // Status 过滤 (可选): "pending" / "claimed" / "completed" / "failed" / ""=全部 Status string `json:"status,omitempty"` } type listSharedTasksTool struct{} // NewListSharedTasksTool 创建 list_shared_tasks 工具. func NewListSharedTasksTool() tools.Tool { return &listSharedTasksTool{} } func (t *listSharedTasksTool) Name() string { return "list_shared_tasks" } func (t *listSharedTasksTool) Description(ctx context.Context) string { return `List tasks in the Team's shared task list (optionally filtered by status). Use this to see what's pending / who's working on what / what's done. Parameters: - status (string, optional): Filter by status ("pending"/"claimed"/"completed"/"failed"). Empty = all. Returns JSON array of tasks, each with {id, subject, status, claimed_by, ...}.` } func (t *listSharedTasksTool) InputSchema() json.RawMessage { return json.RawMessage(`{ "type": "object", "properties": { "status": { "type": "string", "enum": ["", "pending", "claimed", "completed", "failed"] } } }`) } func (t *listSharedTasksTool) Execute(ctx context.Context, input json.RawMessage, progress tools.ProgressFunc) (*tools.Result, error) { tl, err := getTaskList(ctx) if err != nil { return errorResult(err), nil } var req listSharedTasksInput if len(input) > 0 { _ = json.Unmarshal(input, &req) } all, err := tl.List(ctx) if err != nil { return errorResult(err), nil } var filtered []tasklist.Task if req.Status == "" { filtered = all } else { want := tasklist.TaskStatus(req.Status) for _, task := range all { if task.Status == want { filtered = append(filtered, task) } } } data, err := json.MarshalIndent(filtered, "", " ") if err != nil { return errorResult(err), nil } return &tools.Result{Output: string(data)}, nil } func (t *listSharedTasksTool) Metadata() tools.Metadata { return tools.Metadata{ ConcurrencySafe: true, ReadOnly: true, AuditOperation: "read", PermissionClass: "readonly", SearchHint: "list team tasks", } } // --- claim_shared_task --- type claimSharedTaskInput struct { TaskID string `json:"task_id"` // By 可选, 默认使用 ctx 携带的 actor name. By string `json:"by,omitempty"` } type claimSharedTaskTool struct{} // NewClaimSharedTaskTool 创建 claim_shared_task 工具. func NewClaimSharedTaskTool() tools.Tool { return &claimSharedTaskTool{} } func (t *claimSharedTaskTool) Name() string { return "claim_shared_task" } func (t *claimSharedTaskTool) Description(ctx context.Context) string { return `Claim a pending task in the Team's shared task list. Only 'pending' tasks can be claimed. After claim, status becomes 'claimed' and you become the owner (ClaimedBy field). Other Agents attempting to claim the same task concurrently will fail (ErrConcurrentModification or ErrNotClaimable). Parameters: - task_id (string, required): ID of the task to claim - by (string, optional): Claimer name (defaults to this Agent's name) Returns JSON with updated task. If claim fails (already claimed / not pending), the error message suggests re-listing and choosing another task.` } func (t *claimSharedTaskTool) InputSchema() json.RawMessage { return json.RawMessage(`{ "type": "object", "properties": { "task_id": {"type": "string"}, "by": {"type": "string", "description": "Claimer name (optional)"} }, "required": ["task_id"] }`) } func (t *claimSharedTaskTool) Execute(ctx context.Context, input json.RawMessage, progress tools.ProgressFunc) (*tools.Result, error) { provider := taskListProviderFromContext(ctx) if provider == nil || provider.SharedTaskList() == nil { return errorResult(fmt.Errorf("not in a Team context (no TaskListProvider)")), nil } var req claimSharedTaskInput if err := json.Unmarshal(input, &req); err != nil { return errorResult(fmt.Errorf("invalid input: %w", err)), nil } if req.TaskID == "" { return errorResult(fmt.Errorf("task_id is required")), nil } by := req.By if by == "" { by = provider.ActorName() } if by == "" { by = "anonymous" } task, err := provider.SharedTaskList().Claim(ctx, req.TaskID, by) if err != nil { return errorResult(err), nil } return taskResult(task), nil } func (t *claimSharedTaskTool) Metadata() tools.Metadata { return tools.Metadata{ ConcurrencySafe: true, AuditOperation: "write", PermissionClass: "generic", SearchHint: "claim team task", } } // --- complete_shared_task --- type completeSharedTaskInput struct { TaskID string `json:"task_id"` Result string `json:"result"` } type completeSharedTaskTool struct{} // NewCompleteSharedTaskTool 创建 complete_shared_task 工具. func NewCompleteSharedTaskTool() tools.Tool { return &completeSharedTaskTool{} } func (t *completeSharedTaskTool) Name() string { return "complete_shared_task" } func (t *completeSharedTaskTool) Description(ctx context.Context) string { return `Mark a shared task as completed with a result. Parameters: - task_id (string, required): ID of the task - result (string, required): Result text (free-form, can be markdown/json/xml) Returns JSON with updated task (status=completed, completed_at set). Already-completed tasks return an error.` } func (t *completeSharedTaskTool) InputSchema() json.RawMessage { return json.RawMessage(`{ "type": "object", "properties": { "task_id": {"type": "string"}, "result": {"type": "string", "description": "Result text"} }, "required": ["task_id", "result"] }`) } func (t *completeSharedTaskTool) Execute(ctx context.Context, input json.RawMessage, progress tools.ProgressFunc) (*tools.Result, error) { tl, err := getTaskList(ctx) if err != nil { return errorResult(err), nil } var req completeSharedTaskInput if err := json.Unmarshal(input, &req); err != nil { return errorResult(fmt.Errorf("invalid input: %w", err)), nil } if req.TaskID == "" { return errorResult(fmt.Errorf("task_id is required")), nil } task, err := tl.Complete(ctx, req.TaskID, req.Result) if err != nil { return errorResult(err), nil } return taskResult(task), nil } func (t *completeSharedTaskTool) Metadata() tools.Metadata { return tools.Metadata{ ConcurrencySafe: true, AuditOperation: "write", PermissionClass: "generic", SearchHint: "complete team task", } } // --- helpers --- func getTaskList(ctx context.Context) (*tasklist.TaskList, error) { provider := taskListProviderFromContext(ctx) if provider == nil { return nil, fmt.Errorf("not in a Team context (no TaskListProvider)") } tl := provider.SharedTaskList() if tl == nil { return nil, fmt.Errorf("Team has no shared TaskList configured") } return tl, nil } func errorResult(err error) *tools.Result { return &tools.Result{Output: err.Error(), IsError: true} } func taskResult(t tasklist.Task) *tools.Result { data, err := json.MarshalIndent(t, "", " ") if err != nil { return errorResult(err) } return &tools.Result{Output: string(data)} } // 编译时接口检查. var _ tools.Tool = (*addSharedTaskTool)(nil) var _ tools.Tool = (*listSharedTasksTool)(nil) var _ tools.Tool = (*claimSharedTaskTool)(nil) var _ tools.Tool = (*completeSharedTaskTool)(nil)