Documentation
¶
Overview ¶
Package inbox 提供 Agent 间消息传递的邮箱抽象.
每个 Agent 持有自己的 Inbox, 其他 Agent 通过 Router 向其 Send 消息. Send 非阻塞 (丢失优于卡死), Recv 阻塞 (推送优于轮询), Poll 非阻塞 (紧急检查). 内存实现适用单进程; UDS 实现跨进程, 为 Agent Teams 多进程协作预留.
API Consumption Shapes ¶
The Inbox interface spans two Flyto API shapes (see `docs/api-reference.md` "API 消费形态 / API Consumption Patterns"):
- Send: form 3 (synchronous callback). Non-blocking from the sender's perspective but a direct method call; router / Agent Teams pipeline drops on a full queue rather than blocking.
- Recv / Poll: form 2 (pull). Consumer (the owning Agent) actively reads pending messages; Recv blocks until a message arrives or the context is cancelled, Poll returns immediately.
API 消费形态:
Inbox 接口跨越两种 Flyto API 形态 (见 `docs/api-reference.md` "API 消费 形态 / API Consumption Patterns"):
- Send: 形态三 (同步回调). 对发送方是非阻塞语义但仍是直接方法调用; router / Agent Teams 管道在满队列时丢弃而不阻塞.
- Recv / Poll: 形态二 (调取 pull). 消费者 (持有 Inbox 的 Agent) 主动 读待处理消息; Recv 阻塞直到有消息到或 ctx 取消, Poll 立即返回.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInboxClosed 收件箱已关闭. ErrInboxClosed = errors.New("inbox: closed") // ErrInboxFull 收件箱已满(背压信号). ErrInboxFull = fmt.Errorf("inbox: full (capacity %d)", memoryInboxCapacity) )
错误定义.
Functions ¶
This section is empty.
Types ¶
type IdleNotificationPayload ¶
type IdleNotificationPayload struct {
IdleReason string `json:"idle_reason"` // "available" | "interrupted" | "failed"
Summary string `json:"summary,omitempty"`
CompletedTaskID string `json:"completed_task_id,omitempty"`
CompletedStatus string `json:"completed_status,omitempty"` // "resolved" | "blocked" | "failed"
FailureReason string `json:"failure_reason,omitempty"`
}
IdleNotificationPayload 是空闲通知消息的载荷. Worker 完成任务或进入空闲状态时通知 Leader.
升华改进(ELEVATED): IdleReason 区分"完成"/"中断"/"失败"三种状态-- Leader 可以根据状态决定下一步动作(分配新任务/重试/告警). 替代方案:布尔 Completed 字段(无法区分中断和失败).
type Inbox ¶
type Inbox interface {
// Send 向此 Inbox 投递一条消息(非阻塞).
// 如果 Inbox 已满或已关闭,返回错误.
Send(msg *Message) error
// Recv 阻塞等待下一条消息,ctx 取消时返回 ctx.Err().
// Inbox 关闭后,Recv 返回 ErrInboxClosed.
Recv(ctx context.Context) (*Message, error)
// Poll 非阻塞尝试获取下一条消息,无消息返回 nil, nil.
// Inbox 关闭后,Poll 返回 nil, ErrInboxClosed.
Poll() (*Message, error)
// Close 关闭 Inbox,所有等待中的 Recv 返回错误.
// Close 是幂等的,多次调用不 panic.
Close() error
}
Inbox 是消息收件箱接口. 每个 Agent 有自己的 Inbox,其他 Agent 通过 Router 向其 Send 消息.
升华改进(ELEVATED): 接口而非具体实现-- 内存实现(MemoryInbox)用于同进程 Agent 协作; 未来可扩展 RedisInbox(跨进程),GRPCInbox(跨节点)而无需修改上层代码. 替代方案:直接暴露 MemoryInbox(绑定实现,无法测试替换).
type MemoryInbox ¶
type MemoryInbox struct {
// contains filtered or unexported fields
}
MemoryInbox 用 buffered channel 实现的内存收件箱.
精妙之处(CLEVER): MemoryInbox 零外部依赖,完全用标准库实现-- 不依赖 Redis,Kafka 等基础设施,可嵌入任何 Go 程序. 替代方案:基于 sync.Mutex + []Message 的滑动队列(实现更复杂,性能相当).
func (*MemoryInbox) Close ¶
func (m *MemoryInbox) Close() error
Close 关闭收件箱,广播关闭信号给所有等待中的 Recv. 幂等:多次调用不 panic.
func (*MemoryInbox) Poll ¶
func (m *MemoryInbox) Poll() (*Message, error)
Poll 非阻塞尝试获取消息. 无消息返回 nil, nil;Inbox 关闭返回 nil, ErrInboxClosed.
func (*MemoryInbox) Recv ¶
func (m *MemoryInbox) Recv(ctx context.Context) (*Message, error)
Recv 阻塞等待下一条消息,ctx 取消时返回 ctx.Err(). Inbox 关闭后返回 ErrInboxClosed.
func (*MemoryInbox) Send ¶
func (m *MemoryInbox) Send(msg *Message) error
Send 非阻塞投递消息. 若 Inbox 已关闭返回 ErrInboxClosed,若已满返回 ErrInboxFull.
type Message ¶
type Message struct {
ID string `json:"id"`
Type MessageType `json:"type"`
From string `json:"from"` // 发送方 agent 名称
To string `json:"to"` // 接收方 agent 名称
Timestamp time.Time `json:"timestamp"`
Payload json.RawMessage `json:"payload"` // 类型特定的数据
}
Message 是统一的消息格式. 所有 Agent 间通信都通过此结构体传递,保证格式统一可追踪.
升华改进(ELEVATED): 统一消息格式使得 Inbox 可以无感知地 做消息持久化,审计,重放--运维工具的基础设施. 跨行业扩展:金融场景的交易指令,医疗场景的诊断请求都可复用此格式. 替代方案:每种消息类型独立 channel(类型安全但无法统一路由/审计).
func NewMessage ¶
func NewMessage(from, to string, msgType MessageType, payload any) (*Message, error)
NewMessage 构造一条消息,自动生成 ID 和 Timestamp.
payload 会被 JSON 序列化为 json.RawMessage; 若 payload 本身已是 json.RawMessage,直接使用(避免二次编码).
精妙之处(CLEVER): payload 已是 json.RawMessage 时直接赋值-- 避免 json.Marshal(json.RawMessage) 再次编码导致字节串被当作 base64 处理的陷阱. 替代方案:始终 json.Marshal(会导致 RawMessage 被双重编码).
type MessageType ¶
type MessageType string
MessageType 是消息类型枚举.
const ( // MsgPermissionRequest Worker 向 Leader 申请工具执行权限 MsgPermissionRequest MessageType = "permission_request" // MsgPermissionResponse Leader 向 Worker 回应权限请求 MsgPermissionResponse MessageType = "permission_response" // MsgIdleNotification Worker 完成或空闲时通知 Leader MsgIdleNotification MessageType = "idle_notification" // MsgTaskAssignment Leader 向 Worker 分配新任务 MsgTaskAssignment MessageType = "task_assignment" // MsgShutdownRequest 请求关闭 Agent MsgShutdownRequest MessageType = "shutdown_request" // MsgShutdownApproved 批准关闭请求 MsgShutdownApproved MessageType = "shutdown_approved" // MsgModeSetRequest 请求切换运行模式 MsgModeSetRequest MessageType = "mode_set_request" // MsgTeamPermUpdate 更新 Team 权限配置 MsgTeamPermUpdate MessageType = "team_permission_update" )
type PermissionRequestPayload ¶
type PermissionRequestPayload struct {
RequestID string `json:"request_id"`
ToolName string `json:"tool_name"`
ToolUseID string `json:"tool_use_id"`
Description string `json:"description"`
Input map[string]any `json:"input"`
}
PermissionRequestPayload 是权限请求消息的载荷. Worker 工具执行前向 Leader 申请权限时使用.
type PermissionResponsePayload ¶
type PermissionResponsePayload struct {
RequestID string `json:"request_id"`
Approved bool `json:"approved"`
UpdatedInput map[string]any `json:"updated_input,omitempty"`
Reason string `json:"reason,omitempty"`
}
PermissionResponsePayload 是权限响应消息的载荷. Leader 审批/拒绝 Worker 的权限请求时使用.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router 管理 agentName → Inbox 的映射.
升华改进(ELEVATED): Router 是 Agent 协作的中枢-- 未来可在 Send 前做消息审计,优先级排序,流量控制. 替代方案:每个 Agent 直接持有对方的 Inbox 引用(耦合高,无法集中管理).
func NewRouterWithFactory ¶
NewRouterWithFactory 使用自定义 Inbox 工厂创建 Router(主要用于测试).
func (*Router) AgentNames ¶
AgentNames 返回当前注册的所有 Agent 名称(按任意顺序,仅供调试).
func (*Router) Close ¶
Close 关闭所有 Inbox. Router 生命周期结束时调用,确保所有等待中的 Recv goroutine 能退出.
精妙之处(CLEVER): 先快照再关闭,避免持锁时调用 Close(可能导致死锁 如果 Close 内部也需要加锁的话). 替代方案:持写锁逐一关闭(风险:Close 内部有锁时死锁).
func (*Router) Inbox ¶
Inbox 获取指定 Agent 的收件箱,不存在则自动创建.
精妙之处(CLEVER): 双重检查锁(读锁快路径 + 写锁慢路径)-- 大多数情况下 Inbox 已存在,读锁足够; 只有首次访问才需要写锁,争用极少. 替代方案:始终持有写锁(性能损失大).
func (*Router) Remove ¶
Remove 移除指定 Agent 的 Inbox 记录(不关闭 Inbox). 适用于 Agent 退出但 Inbox 还有等待者的场景.
注意:调用者负责在适当时机调用 box.Close().
func (*Router) RemoveAndClose ¶
RemoveAndClose 移除并关闭指定 Agent 的 Inbox.
type TaskAssignmentPayload ¶
type TaskAssignmentPayload struct {
TaskID string `json:"task_id"`
Subject string `json:"subject"`
Description string `json:"description"`
AssignedBy string `json:"assigned_by"`
}
TaskAssignmentPayload 是任务分配消息的载荷. Leader 向 Worker 分配具体任务时使用.
type UDSInboxMessage ¶
type UDSInboxMessage struct {
Type string `json:"type"` // "progress", "log", "result"
ToolUseID string `json:"tool_use_id,omitempty"` // 关联的工具调用 ID(可选)
Data string `json:"data"` // 消息主体内容
Meta json.RawMessage `json:"meta,omitempty"` // 额外元数据(可选)
}
UDSInboxMessage 是工具子进程发送给引擎的消息格式.
精妙之处(CLEVER): 这是独立于 inbox.Message 的新类型--
inbox.Message 是 Agent 间的富类型消息(含 From/To/ID 路由字段), UDSInboxMessage 是工具进程的轻量消息(工具不知道路由,只说"我的进度是 X"). 两者在不同层次服务不同目的,不能混用. 替代方案:复用 inbox.Message(工具进程需要理解路由,耦合过重).
type UDSServer ¶
type UDSServer struct {
// contains filtered or unexported fields
}
UDSServer 是 Unix Domain Socket 消息接收服务端.
生命周期:
- NewUDSServer(sessionID) → 创建实例,生成 socket 路径
- Start() → 开始监听(后台 goroutine 处理连接)
- Messages() → 消费消息 channel
- Close() → 停止监听,删除 socket 文件
func NewUDSServer ¶
NewUDSServer 创建 UDS 服务端. sessionID 用于生成唯一的 socket 文件名,避免多实例冲突.
精妙之处(CLEVER): 使用 sessionID 而非随机数--
sessionID 已经是全局唯一的(包含时间戳+随机后缀),复用它避免二次随机生成. 同时 socket 路径可预测(知道 sessionID 就能找到 socket),方便调试和测试. 替代方案:纯随机路径(每次都不一样,调试时需要读 FLYTO_SESSION_SOCK 才能找到).
func (*UDSServer) Messages ¶
func (s *UDSServer) Messages() <-chan UDSInboxMessage
Messages 返回只读消息 channel. 消费者从此 channel 接收工具子进程发来的消息.
精妙之处(CLEVER): 返回只读 channel(<-chan)--
编译器保证消费者不能向 channel 写入,防止误用. 服务端内部用双向 channel(chan),对外暴露只读视图. 替代方案:暴露双向 channel(允许外部写入,可能破坏状态机).
func (*UDSServer) Send ¶
func (s *UDSServer) Send(msg UDSInboxMessage)
Send 向 msgs channel 投递一条消息(非阻塞). 供引擎内部组件(如 MonitorTool)直接注入消息,无需经过 UDS 连接. channel 满时静默丢弃(fire-and-forget 语义).
精妙之处(CLEVER): 只有 UDSServer 自身和引擎内部可以 Send--
外部消费者只能通过 Messages() 读取; 通过工具子进程(UDS 连接)发送的消息由 handleConn 调用内部同逻辑. 替代方案:暴露双向 channel(任何人都能写,包括意外的外部调用).