inbox

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: None detected not legal advice Imports: 0 Imported by: 0

Documentation

Overview

Package inbox 提供 Agent 间消息传递的邮箱抽象.

每个 Agent 持有自己的 Inbox, 其他 Agent 通过 Router 向其 Send 消息. Send 非阻塞 (丢失优于卡死), Recv 阻塞 (推送优于轮询), Poll 非阻塞 (紧急检查). 内存实现适用单进程; UDS 实现跨进程, 为 Agent Teams 多进程协作预留.

API Consumption Shapes

The Inbox interface spans two Flyto API shapes (see `docs/api-reference.md` "API 消费形态 / API Consumption Patterns"):

  • Send: form 3 (synchronous callback). Non-blocking from the sender's perspective but a direct method call; router / Agent Teams pipeline drops on a full queue rather than blocking.
  • Recv / Poll: form 2 (pull). Consumer (the owning Agent) actively reads pending messages; Recv blocks until a message arrives or the context is cancelled, Poll returns immediately.

API 消费形态:

Inbox 接口跨越两种 Flyto API 形态 (见 `docs/api-reference.md` "API 消费 形态 / API Consumption Patterns"):

  • Send: 形态三 (同步回调). 对发送方是非阻塞语义但仍是直接方法调用; router / Agent Teams 管道在满队列时丢弃而不阻塞.
  • Recv / Poll: 形态二 (调取 pull). 消费者 (持有 Inbox 的 Agent) 主动 读待处理消息; Recv 阻塞直到有消息到或 ctx 取消, Poll 立即返回.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInboxClosed 收件箱已关闭.
	ErrInboxClosed = errors.New("inbox: closed")
	// ErrInboxFull 收件箱已满(背压信号).
	ErrInboxFull = fmt.Errorf("inbox: full (capacity %d)", memoryInboxCapacity)
)

错误定义.

Functions

This section is empty.

Types

type IdleNotificationPayload

type IdleNotificationPayload struct {
	IdleReason      string `json:"idle_reason"` // "available" | "interrupted" | "failed"
	Summary         string `json:"summary,omitempty"`
	CompletedTaskID string `json:"completed_task_id,omitempty"`
	CompletedStatus string `json:"completed_status,omitempty"` // "resolved" | "blocked" | "failed"
	FailureReason   string `json:"failure_reason,omitempty"`
}

IdleNotificationPayload 是空闲通知消息的载荷. Worker 完成任务或进入空闲状态时通知 Leader.

升华改进(ELEVATED): IdleReason 区分"完成"/"中断"/"失败"三种状态-- Leader 可以根据状态决定下一步动作(分配新任务/重试/告警). 替代方案:布尔 Completed 字段(无法区分中断和失败).

type Inbox

type Inbox interface {
	// Send 向此 Inbox 投递一条消息(非阻塞).
	// 如果 Inbox 已满或已关闭,返回错误.
	Send(msg *Message) error

	// Recv 阻塞等待下一条消息,ctx 取消时返回 ctx.Err().
	// Inbox 关闭后,Recv 返回 ErrInboxClosed.
	Recv(ctx context.Context) (*Message, error)

	// Poll 非阻塞尝试获取下一条消息,无消息返回 nil, nil.
	// Inbox 关闭后,Poll 返回 nil, ErrInboxClosed.
	Poll() (*Message, error)

	// Close 关闭 Inbox,所有等待中的 Recv 返回错误.
	// Close 是幂等的,多次调用不 panic.
	Close() error
}

Inbox 是消息收件箱接口. 每个 Agent 有自己的 Inbox,其他 Agent 通过 Router 向其 Send 消息.

升华改进(ELEVATED): 接口而非具体实现-- 内存实现(MemoryInbox)用于同进程 Agent 协作; 未来可扩展 RedisInbox(跨进程),GRPCInbox(跨节点)而无需修改上层代码. 替代方案:直接暴露 MemoryInbox(绑定实现,无法测试替换).

type MemoryInbox

type MemoryInbox struct {
	// contains filtered or unexported fields
}

MemoryInbox 用 buffered channel 实现的内存收件箱.

精妙之处(CLEVER): MemoryInbox 零外部依赖,完全用标准库实现-- 不依赖 Redis,Kafka 等基础设施,可嵌入任何 Go 程序. 替代方案:基于 sync.Mutex + []Message 的滑动队列(实现更复杂,性能相当).

func NewMemoryInbox

func NewMemoryInbox() *MemoryInbox

NewMemoryInbox 创建容量为 256 的内存收件箱.

func (*MemoryInbox) Close

func (m *MemoryInbox) Close() error

Close 关闭收件箱,广播关闭信号给所有等待中的 Recv. 幂等:多次调用不 panic.

func (*MemoryInbox) Len

func (m *MemoryInbox) Len() int

Len 返回当前积压的消息数量(仅供监控/测试使用).

func (*MemoryInbox) Poll

func (m *MemoryInbox) Poll() (*Message, error)

Poll 非阻塞尝试获取消息. 无消息返回 nil, nil;Inbox 关闭返回 nil, ErrInboxClosed.

func (*MemoryInbox) Recv

func (m *MemoryInbox) Recv(ctx context.Context) (*Message, error)

Recv 阻塞等待下一条消息,ctx 取消时返回 ctx.Err(). Inbox 关闭后返回 ErrInboxClosed.

func (*MemoryInbox) Send

func (m *MemoryInbox) Send(msg *Message) error

Send 非阻塞投递消息. 若 Inbox 已关闭返回 ErrInboxClosed,若已满返回 ErrInboxFull.

type Message

type Message struct {
	ID        string          `json:"id"`
	Type      MessageType     `json:"type"`
	From      string          `json:"from"` // 发送方 agent 名称
	To        string          `json:"to"`   // 接收方 agent 名称
	Timestamp time.Time       `json:"timestamp"`
	Payload   json.RawMessage `json:"payload"` // 类型特定的数据
}

Message 是统一的消息格式. 所有 Agent 间通信都通过此结构体传递,保证格式统一可追踪.

升华改进(ELEVATED): 统一消息格式使得 Inbox 可以无感知地 做消息持久化,审计,重放--运维工具的基础设施. 跨行业扩展:金融场景的交易指令,医疗场景的诊断请求都可复用此格式. 替代方案:每种消息类型独立 channel(类型安全但无法统一路由/审计).

func NewMessage

func NewMessage(from, to string, msgType MessageType, payload any) (*Message, error)

NewMessage 构造一条消息,自动生成 ID 和 Timestamp.

payload 会被 JSON 序列化为 json.RawMessage; 若 payload 本身已是 json.RawMessage,直接使用(避免二次编码).

精妙之处(CLEVER): payload 已是 json.RawMessage 时直接赋值-- 避免 json.Marshal(json.RawMessage) 再次编码导致字节串被当作 base64 处理的陷阱. 替代方案:始终 json.Marshal(会导致 RawMessage 被双重编码).

type MessageType

type MessageType string

MessageType 是消息类型枚举.

const (
	// MsgPermissionRequest Worker 向 Leader 申请工具执行权限
	MsgPermissionRequest MessageType = "permission_request"
	// MsgPermissionResponse Leader 向 Worker 回应权限请求
	MsgPermissionResponse MessageType = "permission_response"
	// MsgIdleNotification Worker 完成或空闲时通知 Leader
	MsgIdleNotification MessageType = "idle_notification"
	// MsgTaskAssignment Leader 向 Worker 分配新任务
	MsgTaskAssignment MessageType = "task_assignment"
	// MsgShutdownRequest 请求关闭 Agent
	MsgShutdownRequest MessageType = "shutdown_request"
	// MsgShutdownApproved 批准关闭请求
	MsgShutdownApproved MessageType = "shutdown_approved"
	// MsgModeSetRequest 请求切换运行模式
	MsgModeSetRequest MessageType = "mode_set_request"
	// MsgTeamPermUpdate 更新 Team 权限配置
	MsgTeamPermUpdate MessageType = "team_permission_update"
)

type PermissionRequestPayload

type PermissionRequestPayload struct {
	RequestID   string         `json:"request_id"`
	ToolName    string         `json:"tool_name"`
	ToolUseID   string         `json:"tool_use_id"`
	Description string         `json:"description"`
	Input       map[string]any `json:"input"`
}

PermissionRequestPayload 是权限请求消息的载荷. Worker 工具执行前向 Leader 申请权限时使用.

type PermissionResponsePayload

type PermissionResponsePayload struct {
	RequestID    string         `json:"request_id"`
	Approved     bool           `json:"approved"`
	UpdatedInput map[string]any `json:"updated_input,omitempty"`
	Reason       string         `json:"reason,omitempty"`
}

PermissionResponsePayload 是权限响应消息的载荷. Leader 审批/拒绝 Worker 的权限请求时使用.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router 管理 agentName → Inbox 的映射.

升华改进(ELEVATED): Router 是 Agent 协作的中枢-- 未来可在 Send 前做消息审计,优先级排序,流量控制. 替代方案:每个 Agent 直接持有对方的 Inbox 引用(耦合高,无法集中管理).

func NewRouter

func NewRouter() *Router

NewRouter 创建 Router,默认使用 MemoryInbox.

func NewRouterWithFactory

func NewRouterWithFactory(factory func() Inbox) *Router

NewRouterWithFactory 使用自定义 Inbox 工厂创建 Router(主要用于测试).

func (*Router) AgentNames

func (r *Router) AgentNames() []string

AgentNames 返回当前注册的所有 Agent 名称(按任意顺序,仅供调试).

func (*Router) Close

func (r *Router) Close() error

Close 关闭所有 Inbox. Router 生命周期结束时调用,确保所有等待中的 Recv goroutine 能退出.

精妙之处(CLEVER): 先快照再关闭,避免持锁时调用 Close(可能导致死锁 如果 Close 内部也需要加锁的话). 替代方案:持写锁逐一关闭(风险:Close 内部有锁时死锁).

func (*Router) Inbox

func (r *Router) Inbox(agentName string) Inbox

Inbox 获取指定 Agent 的收件箱,不存在则自动创建.

精妙之处(CLEVER): 双重检查锁(读锁快路径 + 写锁慢路径)-- 大多数情况下 Inbox 已存在,读锁足够; 只有首次访问才需要写锁,争用极少. 替代方案:始终持有写锁(性能损失大).

func (*Router) Remove

func (r *Router) Remove(agentName string)

Remove 移除指定 Agent 的 Inbox 记录(不关闭 Inbox). 适用于 Agent 退出但 Inbox 还有等待者的场景.

注意:调用者负责在适当时机调用 box.Close().

func (*Router) RemoveAndClose

func (r *Router) RemoveAndClose(agentName string)

RemoveAndClose 移除并关闭指定 Agent 的 Inbox.

func (*Router) Send

func (r *Router) Send(to string, msg *Message) error

Send 向指定 Agent 发送消息. 若目标 Agent 不存在,自动创建其 Inbox.

type TaskAssignmentPayload

type TaskAssignmentPayload struct {
	TaskID      string `json:"task_id"`
	Subject     string `json:"subject"`
	Description string `json:"description"`
	AssignedBy  string `json:"assigned_by"`
}

TaskAssignmentPayload 是任务分配消息的载荷. Leader 向 Worker 分配具体任务时使用.

type UDSInboxMessage

type UDSInboxMessage struct {
	Type      string          `json:"type"`                  // "progress", "log", "result"
	ToolUseID string          `json:"tool_use_id,omitempty"` // 关联的工具调用 ID(可选)
	Data      string          `json:"data"`                  // 消息主体内容
	Meta      json.RawMessage `json:"meta,omitempty"`        // 额外元数据(可选)
}

UDSInboxMessage 是工具子进程发送给引擎的消息格式.

精妙之处(CLEVER): 这是独立于 inbox.Message 的新类型--

inbox.Message 是 Agent 间的富类型消息(含 From/To/ID 路由字段),
UDSInboxMessage 是工具进程的轻量消息(工具不知道路由,只说"我的进度是 X").
两者在不同层次服务不同目的,不能混用.
替代方案:复用 inbox.Message(工具进程需要理解路由,耦合过重).

type UDSServer

type UDSServer struct {
	// contains filtered or unexported fields
}

UDSServer 是 Unix Domain Socket 消息接收服务端.

生命周期:

  1. NewUDSServer(sessionID) → 创建实例,生成 socket 路径
  2. Start() → 开始监听(后台 goroutine 处理连接)
  3. Messages() → 消费消息 channel
  4. Close() → 停止监听,删除 socket 文件

func NewUDSServer

func NewUDSServer(sessionID string) (*UDSServer, error)

NewUDSServer 创建 UDS 服务端. sessionID 用于生成唯一的 socket 文件名,避免多实例冲突.

精妙之处(CLEVER): 使用 sessionID 而非随机数--

sessionID 已经是全局唯一的(包含时间戳+随机后缀),复用它避免二次随机生成.
同时 socket 路径可预测(知道 sessionID 就能找到 socket),方便调试和测试.
替代方案:纯随机路径(每次都不一样,调试时需要读 FLYTO_SESSION_SOCK 才能找到).

func (*UDSServer) Close

func (s *UDSServer) Close() error

Close 停止服务器并清理 socket 文件.幂等,多次调用安全.

func (*UDSServer) Messages

func (s *UDSServer) Messages() <-chan UDSInboxMessage

Messages 返回只读消息 channel. 消费者从此 channel 接收工具子进程发来的消息.

精妙之处(CLEVER): 返回只读 channel(<-chan)--

编译器保证消费者不能向 channel 写入,防止误用.
服务端内部用双向 channel(chan),对外暴露只读视图.
替代方案:暴露双向 channel(允许外部写入,可能破坏状态机).

func (*UDSServer) Send

func (s *UDSServer) Send(msg UDSInboxMessage)

Send 向 msgs channel 投递一条消息(非阻塞). 供引擎内部组件(如 MonitorTool)直接注入消息,无需经过 UDS 连接. channel 满时静默丢弃(fire-and-forget 语义).

精妙之处(CLEVER): 只有 UDSServer 自身和引擎内部可以 Send--

外部消费者只能通过 Messages() 读取;
通过工具子进程(UDS 连接)发送的消息由 handleConn 调用内部同逻辑.
替代方案:暴露双向 channel(任何人都能写,包括意外的外部调用).

func (*UDSServer) SockPath

func (s *UDSServer) SockPath() string

SockPath 返回 socket 文件路径. 工具子进程需要通过 FLYTO_SESSION_SOCK 环境变量获得此路径.

func (*UDSServer) Start

func (s *UDSServer) Start() error

Start 开始监听 UDS. 必须在 Close() 之前最多调用一次.后台 goroutine 处理所有连接.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL