bridge

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: None detected not legal advice Imports: 0 Imported by: 0

Documentation

Overview

Package bridge 定义 Flyto Agent Platform Layer 的 Bridge 基础设施.

Bridge 解决的核心问题:客户端(IDE 插件/Web/IoT 设备)与引擎之间的网络通信协议. 引擎本身不知道网络,Bridge 是引擎事件流到远程客户端的"最后一公里".

架构位置:

客户端 (IDE/Web/Device)
        ↕  BridgeTransport (SSE/WS/LongPoll)
   Flyto Platform
        └─ DaemonManager (pkg/daemon)
             └─ engine.Session (flyto-agent/pkg/engine)

核心抽象:

  • SessionConn:一个客户端连接的抽象,隔离传输协议细节
  • BridgeTransport:SessionConn 的工厂 + HTTP 入口
  • BridgeEvent:engine.Event 的 JSON 可序列化包装
  • ClientMessage:客户端发来的消息(prompt/权限回复/关闭)

升华改进(ELEVATED): 早期方案 将 v1 轮询和 v2 SSE 的传输逻辑硬编码在 两个不同的代码路径中,无法互换.我们定义 BridgeTransport 接口,SSE (transport/sse.go) 与 WebSocket (transport/websocket.go, 2026-04-16 L1328b 接线) 两个独立实现, DaemonManager 只持有接口--仓储 IoT 设备可以注入 MQTT 实现. LongPoll 占位已移除,真实客户(企业内网 WAF 拦 WS 的场景)驱动再加,非假设性需求. 替代方案:<为每种传输类型写独立的服务器> - 否决:代码重复,平台层无法统一路由.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnClosed 表示连接已关闭,Send() 无法继续.
	ErrConnClosed = &BridgeError{Code: "conn_closed", Message: "连接已关闭"}

	// ErrTransportClosed 表示传输层已关闭,不接受新连接.
	ErrTransportClosed = &BridgeError{Code: "transport_closed", Message: "传输层已关闭"}

	// ErrMessageTooLarge 表示客户端消息超过 MaxMessageSize 限制.
	ErrMessageTooLarge = &BridgeError{Code: "message_too_large", Message: "消息超过大小限制"}

	// ErrInvalidMessage 表示客户端发送了格式错误的消息.
	ErrInvalidMessage = &BridgeError{Code: "invalid_message", Message: "无效的消息格式"}
)

Functions

This section is empty.

Types

type BoundedUUIDSet

type BoundedUUIDSet struct {
	// contains filtered or unexported fields
}

BoundedUUIDSet 是固定容量的 ID 去重集合.

内部结构:

  • ring: 环形数组,记录 ID 的插入顺序(FIFO 淘汰)
  • set: 哈希表,O(1) 查询
  • head: 下一次写入位置(当 size == cap 时,覆盖 head 处的旧 ID)

精妙之处(CLEVER): ring[head] 既是"下一个写入位置"也是"最老的元素位置"-- 当容量满时,直接覆盖 head 就完成了"淘汰旧 + 写入新"两步操作,无需额外指针. 如果用 head/tail 两个指针,逻辑更复杂但性能没有提升.

func NewBoundedUUIDSet

func NewBoundedUUIDSet(cap int) *BoundedUUIDSet

NewBoundedUUIDSet 创建指定容量的 ID 集合. cap 应大于"最坏重试窗口内的最大重复消息数". 推荐默认值:1000(约覆盖 5 分钟 × 200 msg/min 的重试窗口).

func (*BoundedUUIDSet) Add

func (s *BoundedUUIDSet) Add(id string) bool

Add 将 id 加入集合.如果 id 已存在,返回 false(重复). 如果容量已满,淘汰最老的 ID 后再插入,始终返回 true.

func (*BoundedUUIDSet) Cap

func (s *BoundedUUIDSet) Cap() int

Cap 返回集合容量上限.

func (*BoundedUUIDSet) Contains

func (s *BoundedUUIDSet) Contains(id string) bool

Contains 检查 id 是否已在集合中(不改变状态).

func (*BoundedUUIDSet) Len

func (s *BoundedUUIDSet) Len() int

Len 返回当前集合中的元素数量.

type BridgeConfig

type BridgeConfig struct {
	// MaxMessageSize 是客户端单条消息的最大字节数,默认 1MB.
	// 防止恶意客户端发送超大 prompt 打爆服务器内存.
	MaxMessageSize int64

	// EventBufferSize 是出方向事件 channel 的缓冲大小,默认 256.
	// 精妙之处(CLEVER): 缓冲不能太大(内存浪费)也不能太小(背压传播太快).
	// 256 事件 × ~500 字节/事件 ≈ 128KB,是合理的上限.
	EventBufferSize int

	// PingInterval 是 keepalive ping 的发送间隔,默认 15s.
	// SSE 的 HTTP 连接可能被代理或负载均衡的 idle timeout 强制关闭.
	// 定期 ping(SSE 注释行 ": ping")维持连接活跃.
	PingInterval time.Duration

	// WriteTimeout 是单次写操作的超时,默认 10s.
	// 防止慢客户端导致 goroutine 泄漏(连接半死不活).
	WriteTimeout time.Duration

	// ResumeWindow 是断线重连时可以重放的最大事件数,默认 1000.
	// 超出 window 的旧事件不再重放(客户端需要重新加载完整状态).
	ResumeWindow int
}

BridgeConfig 是 BridgeTransport 实现的通用配置. 每个传输实现可以有自己的扩展配置,但基础字段在此统一定义.

func DefaultBridgeConfig

func DefaultBridgeConfig() BridgeConfig

DefaultBridgeConfig 返回生产合理的默认配置.

type BridgeError

type BridgeError struct {
	Code    string // 机器可读错误码
	Message string // 人类可读描述
}

BridgeError 是平台 Bridge 层的结构化错误.

func (*BridgeError) Error

func (e *BridgeError) Error() string

type BridgeEvent

type BridgeEvent struct {
	// ID 是事件的唯一递增 ID,用于 SSE Last-Event-ID 断线重连.
	// 格式:"<session_id>-<seq>",在 SessionConn 实现层生成.
	ID string `json:"id"`

	// Type 是事件类型名,对应 engine.Event.eventType().
	// 例如:"text_delta","tool_use","permission_request","done".
	Type string `json:"type"`

	// SessionID 标识事件所属的会话.
	// 在多路复用场景(单 WebSocket 连接承载多会话)中必须有.
	SessionID string `json:"session_id"`

	// Payload 是具体事件的 JSON 数据,由 EventSerializer 填充.
	Payload json.RawMessage `json:"payload,omitempty"`

	// Timestamp 是事件生成的时间,便于客户端排序和去重.
	Timestamp time.Time `json:"ts"`
}

BridgeEvent 是引擎事件的 JSON 可序列化包装,推送给客户端.

精妙之处(CLEVER): ID 字段对 SSE 的 Last-Event-ID 机制至关重要-- 客户端断线重连时携带最后收到的 ID,服务端从该 ID 之后重放事件. 没有 ID 则断线后丢失的事件永远丢失,用户看到的输出不完整.

type BridgeTransport

type BridgeTransport interface {
	http.Handler

	// Accept 返回新建立的客户端连接 channel.
	// Channel 在 Close() 被调用后关闭.
	// 调用方应持续从 channel 读取,否则新连接请求会阻塞.
	Accept() <-chan SessionConn

	// Close 关闭传输层,拒绝新连接,等待现有连接优雅关闭.
	Close() error
}

BridgeTransport 是 SessionConn 的工厂和 HTTP 入口.

实现:

  • SSETransport (transport/sse.go) : 客户端 GET 建立 SSE 流, POST 发消息
  • WebSocketTransport (transport/websocket.go) : 双向 WS, 低延迟 (2026-04-16 L1328b 接线)
  • LongPoll 未实现: WS 前的兼容方案, 现代浏览器/现代网络不需要, 待真实客户驱动再加

精妙之处(CLEVER): BridgeTransport 实现 http.Handler--平台的 HTTP 路由器 只需 mux.Handle("/bridge/", transport) 即可,无需知道具体协议细节. DaemonManager 通过 Accept() 获得新连接,与 HTTP 层完全解耦.

type ClientMessage

type ClientMessage struct {
	// Type 指定消息类型.
	Type ClientMessageType `json:"type"`

	// SessionID 指定目标会话.在多路复用模式下必须有.
	SessionID string `json:"session_id"`

	// Prompt 是用户输入文本(仅 TypePrompt 有效).
	Prompt string `json:"prompt,omitempty"`

	// RequestID 是权限请求 ID(仅 TypePermissionReply 有效).
	// 对应 engine.PermissionRequestEvent.ID.
	RequestID string `json:"request_id,omitempty"`

	// Allow 是权限决策(仅 TypePermissionReply 有效).
	Allow bool `json:"allow,omitempty"`
}

ClientMessage 是客户端发送给平台的消息.

历史包袱(LEGACY): 早期方案用多个独立的 HTTP 端点区分消息类型(POST /messages, POST /permissions/:id),导致 WebSocket 实现时需要额外适配层. 我们用统一结构体 + Type 字段,SSE/WS/LongPoll 共享同一个消息协议. 未来改进:可以考虑 protobuf 替代 JSON,减少序列化开销.

type ClientMessageType

type ClientMessageType string

ClientMessageType 是客户端消息的类型标识.

const (
	// ClientMessagePrompt 客户端发送用户输入(对话轮次).
	ClientMessagePrompt ClientMessageType = "prompt"

	// ClientMessagePermissionReply 客户端回复权限请求(允许/拒绝).
	ClientMessagePermissionReply ClientMessageType = "permission_reply"

	// ClientMessageClose 客户端主动关闭会话.
	ClientMessageClose ClientMessageType = "close"
)

type EventSerializer

type EventSerializer struct {
	// contains filtered or unexported fields
}

EventSerializer 将 engine.Event 转换为可推送给客户端的 BridgeEvent. 每个 SessionConn 持有一个 EventSerializer 实例,保持事件 ID 单调递增.

func NewEventSerializer

func NewEventSerializer(sessionID string) *EventSerializer

NewEventSerializer 创建绑定到指定会话的序列化器.

func (*EventSerializer) Serialize

func (s *EventSerializer) Serialize(evt engine.Event) BridgeEvent

Serialize 将 engine.Event 转换为 BridgeEvent.

精妙之处(CLEVER): 每次调用原子递增 seq,即使并发调用也不会产生重复 ID. ID 格式 "<sessionID>-<seq>" 确保跨会话唯一--多会话场景下客户端能区分来源.

对未知事件类型返回 type="unknown" 而非 error--引擎新版本增加事件类型时, 旧版客户端不会崩溃(向前兼容).

Shape 提取到 eventPayload helper, 让 SubAgentEvent 能递归取 Inner 的 类型和 payload 做扁平化合并 (避免 17 case struct literal 在两处重复).

Shape extraction moved to eventPayload helper so SubAgentEvent can recursively obtain its Inner's type and payload for flat merging (avoids duplicating the 17 struct literals across two switches).

type SerialBatchEventUploader

type SerialBatchEventUploader struct {
	// contains filtered or unexported fields
}

SerialBatchEventUploader 收集事件并串行批量上传.

func NewSerialBatchEventUploader

func NewSerialBatchEventUploader(
	ctx context.Context,
	upload UploadFunc,
	onError func(events []BridgeEvent, err error),
) *SerialBatchEventUploader

NewSerialBatchEventUploader 创建上传器.

upload: 批量上传函数(调用方实现,如 HTTP POST) onError: 上传失败回调(传 nil 则忽略失败)

func (*SerialBatchEventUploader) Enqueue

func (u *SerialBatchEventUploader) Enqueue(evt BridgeEvent)

Enqueue 将事件加入上传队列.

精妙之处(CLEVER): 三种情况只有一条代码路径--

  1. 无上传在途:立即启动 goroutine 上传(batch = [evt])
  2. 上传在途:追加到 pending,当前批次完成后自动触发下一批
  3. ctx 已取消:事件被丢弃(调用方已放弃)

状态检查在锁内,goroutine 在锁外启动-- 避免在持锁状态下启动 goroutine(持锁期间 goroutine 调度可能死锁).

func (*SerialBatchEventUploader) Flush

Flush 阻塞直到所有待上传事件完成或 ctx 取消. 用于优雅关闭时确保事件不丢失.

历史包袱(LEGACY): 用轮询(10ms 间隔)而非 sync.Cond-- sync.Cond.Wait() 不支持 context 取消,需要额外 goroutine 唤醒, 代码复杂度提升但 Flush 是非热路径,轮询开销可接受. 未来改进:用 sync.Cond + 独立的 ctx 监听 goroutine 消除轮询延迟.

func (*SerialBatchEventUploader) PendingCount

func (u *SerialBatchEventUploader) PendingCount() int

PendingCount 返回当前等待上传的事件数(调试/监控用).

type SessionConn

type SessionConn interface {
	// SessionID 返回此连接绑定的会话 ID.
	SessionID() string

	// Send 将事件推送给客户端.
	// 如果连接已断开,返回 ErrConnClosed.
	// 实现层负责序列化为协议格式(SSE data: 行,WS binary frame 等).
	Send(ctx context.Context, evt BridgeEvent) error

	// Recv 返回客户端消息的只读 channel.
	// Channel 在连接断开时关闭.
	Recv() <-chan ClientMessage

	// Done 返回连接终止信号 channel.
	// 客户端断开 OR 服务端主动 Close() 时关闭.
	Done() <-chan struct{}

	// Close 主动终止连接(服务端触发,如 idle timeout,daemon shutdown).
	// 幂等,多次调用安全.
	Close() error
}

SessionConn 代表一个已建立的客户端连接,绑定到某个会话.

升华改进(ELEVATED): 早期方案在每种传输实现中重复 send/receive 逻辑. SessionConn 接口让 DaemonManager 不依赖任何具体传输-- SSESessionConn,WebSocketSessionConn,LongPollSessionConn 都实现此接口. CLI 测试场景可以注入 MemorySessionConn(直接 channel 通信,无网络开销).

接口调用方(DaemonManager)的职责:

  1. 从引擎 Session.Send() 读取事件,调用 conn.Send() 推给客户端
  2. 从 conn.Recv() 读取客户端消息,调用 engine.Session.ResolvePermission() 等
  3. 监听 conn.Done(),会话断开时清理

type UploadFunc

type UploadFunc func(ctx context.Context, events []BridgeEvent) error

UploadFunc 是批量上传函数的签名. 实现层负责网络传输,失败时返回 error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL