// Package bridge 定义 Flyto Agent Platform Layer 的 Bridge 基础设施. // // Bridge 解决的核心问题:客户端(IDE 插件/Web/IoT 设备)与引擎之间的网络通信协议. // 引擎本身不知道网络,Bridge 是引擎事件流到远程客户端的"最后一公里". // // 架构位置: // // 客户端 (IDE/Web/Device) // ↕ BridgeTransport (SSE/WS/LongPoll) // Flyto Platform // └─ DaemonManager (pkg/daemon) // └─ engine.Session (flyto-agent/pkg/engine) // // 核心抽象: // - SessionConn:一个客户端连接的抽象,隔离传输协议细节 // - BridgeTransport:SessionConn 的工厂 + HTTP 入口 // - BridgeEvent:engine.Event 的 JSON 可序列化包装 // - ClientMessage:客户端发来的消息(prompt/权限回复/关闭) // // 升华改进(ELEVATED): 早期方案 将 v1 轮询和 v2 SSE 的传输逻辑硬编码在 // 两个不同的代码路径中,无法互换.我们定义 BridgeTransport 接口,SSE (transport/sse.go) // 与 WebSocket (transport/websocket.go, 2026-04-16 L1328b 接线) 两个独立实现, // DaemonManager 只持有接口--仓储 IoT 设备可以注入 MQTT 实现. // LongPoll 占位已移除,真实客户(企业内网 WAF 拦 WS 的场景)驱动再加,非假设性需求. // 替代方案:<为每种传输类型写独立的服务器> - 否决:代码重复,平台层无法统一路由. package bridge import ( "context" "encoding/json" "net/http" "time" ) // ─── 出方向:引擎事件 → 客户端 ──────────────────────────────────────────────── // BridgeEvent 是引擎事件的 JSON 可序列化包装,推送给客户端. // // 精妙之处(CLEVER): ID 字段对 SSE 的 Last-Event-ID 机制至关重要-- // 客户端断线重连时携带最后收到的 ID,服务端从该 ID 之后重放事件. // 没有 ID 则断线后丢失的事件永远丢失,用户看到的输出不完整. type BridgeEvent struct { // ID 是事件的唯一递增 ID,用于 SSE Last-Event-ID 断线重连. // 格式:"-",在 SessionConn 实现层生成. ID string `json:"id"` // Type 是事件类型名,对应 engine.Event.eventType(). // 例如:"text_delta","tool_use","permission_request","done". Type string `json:"type"` // SessionID 标识事件所属的会话. // 在多路复用场景(单 WebSocket 连接承载多会话)中必须有. SessionID string `json:"session_id"` // Payload 是具体事件的 JSON 数据,由 EventSerializer 填充. Payload json.RawMessage `json:"payload,omitempty"` // Timestamp 是事件生成的时间,便于客户端排序和去重. Timestamp time.Time `json:"ts"` } // ─── 入方向:客户端消息 → 引擎 ──────────────────────────────────────────────── // ClientMessageType 是客户端消息的类型标识. type ClientMessageType string const ( // ClientMessagePrompt 客户端发送用户输入(对话轮次). ClientMessagePrompt ClientMessageType = "prompt" // ClientMessagePermissionReply 客户端回复权限请求(允许/拒绝). ClientMessagePermissionReply ClientMessageType = "permission_reply" // ClientMessageClose 客户端主动关闭会话. ClientMessageClose ClientMessageType = "close" ) // ClientMessage 是客户端发送给平台的消息. // // 历史包袱(LEGACY): 早期方案用多个独立的 HTTP 端点区分消息类型(POST /messages, // POST /permissions/:id),导致 WebSocket 实现时需要额外适配层. // 我们用统一结构体 + Type 字段,SSE/WS/LongPoll 共享同一个消息协议. // 未来改进:可以考虑 protobuf 替代 JSON,减少序列化开销. type ClientMessage struct { // Type 指定消息类型. Type ClientMessageType `json:"type"` // SessionID 指定目标会话.在多路复用模式下必须有. SessionID string `json:"session_id"` // Prompt 是用户输入文本(仅 TypePrompt 有效). Prompt string `json:"prompt,omitempty"` // RequestID 是权限请求 ID(仅 TypePermissionReply 有效). // 对应 engine.PermissionRequestEvent.ID. RequestID string `json:"request_id,omitempty"` // Allow 是权限决策(仅 TypePermissionReply 有效). Allow bool `json:"allow,omitempty"` } // ─── 核心接口 ──────────────────────────────────────────────────────────────── // SessionConn 代表一个已建立的客户端连接,绑定到某个会话. // // 升华改进(ELEVATED): 早期方案在每种传输实现中重复 send/receive 逻辑. // SessionConn 接口让 DaemonManager 不依赖任何具体传输-- // SSESessionConn,WebSocketSessionConn,LongPollSessionConn 都实现此接口. // CLI 测试场景可以注入 MemorySessionConn(直接 channel 通信,无网络开销). // // 接口调用方(DaemonManager)的职责: // 1. 从引擎 Session.Send() 读取事件,调用 conn.Send() 推给客户端 // 2. 从 conn.Recv() 读取客户端消息,调用 engine.Session.ResolvePermission() 等 // 3. 监听 conn.Done(),会话断开时清理 type SessionConn interface { // SessionID 返回此连接绑定的会话 ID. SessionID() string // Send 将事件推送给客户端. // 如果连接已断开,返回 ErrConnClosed. // 实现层负责序列化为协议格式(SSE data: 行,WS binary frame 等). Send(ctx context.Context, evt BridgeEvent) error // Recv 返回客户端消息的只读 channel. // Channel 在连接断开时关闭. Recv() <-chan ClientMessage // Done 返回连接终止信号 channel. // 客户端断开 OR 服务端主动 Close() 时关闭. Done() <-chan struct{} // Close 主动终止连接(服务端触发,如 idle timeout,daemon shutdown). // 幂等,多次调用安全. Close() error } // BridgeTransport 是 SessionConn 的工厂和 HTTP 入口. // // 实现: // - SSETransport (transport/sse.go) : 客户端 GET 建立 SSE 流, POST 发消息 // - WebSocketTransport (transport/websocket.go) : 双向 WS, 低延迟 (2026-04-16 L1328b 接线) // - LongPoll 未实现: WS 前的兼容方案, 现代浏览器/现代网络不需要, 待真实客户驱动再加 // // 精妙之处(CLEVER): BridgeTransport 实现 http.Handler--平台的 HTTP 路由器 // 只需 mux.Handle("/bridge/", transport) 即可,无需知道具体协议细节. // DaemonManager 通过 Accept() 获得新连接,与 HTTP 层完全解耦. type BridgeTransport interface { http.Handler // Accept 返回新建立的客户端连接 channel. // Channel 在 Close() 被调用后关闭. // 调用方应持续从 channel 读取,否则新连接请求会阻塞. Accept() <-chan SessionConn // Close 关闭传输层,拒绝新连接,等待现有连接优雅关闭. Close() error } // ─── 配置 ──────────────────────────────────────────────────────────────────── // BridgeConfig 是 BridgeTransport 实现的通用配置. // 每个传输实现可以有自己的扩展配置,但基础字段在此统一定义. type BridgeConfig struct { // MaxMessageSize 是客户端单条消息的最大字节数,默认 1MB. // 防止恶意客户端发送超大 prompt 打爆服务器内存. MaxMessageSize int64 // EventBufferSize 是出方向事件 channel 的缓冲大小,默认 256. // 精妙之处(CLEVER): 缓冲不能太大(内存浪费)也不能太小(背压传播太快). // 256 事件 × ~500 字节/事件 ≈ 128KB,是合理的上限. EventBufferSize int // PingInterval 是 keepalive ping 的发送间隔,默认 15s. // SSE 的 HTTP 连接可能被代理或负载均衡的 idle timeout 强制关闭. // 定期 ping(SSE 注释行 ": ping")维持连接活跃. PingInterval time.Duration // WriteTimeout 是单次写操作的超时,默认 10s. // 防止慢客户端导致 goroutine 泄漏(连接半死不活). WriteTimeout time.Duration // ResumeWindow 是断线重连时可以重放的最大事件数,默认 1000. // 超出 window 的旧事件不再重放(客户端需要重新加载完整状态). ResumeWindow int } // DefaultBridgeConfig 返回生产合理的默认配置. func DefaultBridgeConfig() BridgeConfig { return BridgeConfig{ MaxMessageSize: 1 << 20, // 1MB EventBufferSize: 256, PingInterval: 15 * time.Second, WriteTimeout: 10 * time.Second, ResumeWindow: 1000, } } // ─── 错误类型 ───────────────────────────────────────────────────────────────── // BridgeError 是平台 Bridge 层的结构化错误. type BridgeError struct { Code string // 机器可读错误码 Message string // 人类可读描述 } func (e *BridgeError) Error() string { return e.Code + ": " + e.Message } var ( // ErrConnClosed 表示连接已关闭,Send() 无法继续. ErrConnClosed = &BridgeError{Code: "conn_closed", Message: "连接已关闭"} // ErrTransportClosed 表示传输层已关闭,不接受新连接. ErrTransportClosed = &BridgeError{Code: "transport_closed", Message: "传输层已关闭"} // ErrMessageTooLarge 表示客户端消息超过 MaxMessageSize 限制. ErrMessageTooLarge = &BridgeError{Code: "message_too_large", Message: "消息超过大小限制"} // ErrInvalidMessage 表示客户端发送了格式错误的消息. ErrInvalidMessage = &BridgeError{Code: "invalid_message", Message: "无效的消息格式"} )