// Package transport 提供 BridgeTransport 的具体实现. // // 当前实现: // - SSETransport : Server-Sent Events, 防火墙友好, 适合 Web/IDE 客户端 // - WebSocketTransport : 双向 WebSocket, 低延迟, 适合实时交互 (websocket.go, 2026-04-16 L1328b) // // 未实现 (待真实客户驱动): // - LongPoll : 最大兼容性, 现代网络环境基本不需要, 不做假设性实现 package transport import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "strings" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/bridge" ) // SSETransport 实现 bridge.BridgeTransport,使用 Server-Sent Events 协议. // // 协议约定: // - 客户端通过 GET //events?session_id= 建立 SSE 流(接收事件) // - 客户端通过 POST //messages 发送消息(prompt/权限回复) // - 客户端可以携带 Last-Event-ID header 断线重连(服务端重放未收到的事件) // // 升华改进(ELEVATED): 早期方案 SSETransport 没有实现 Last-Event-ID 重连-- // 断线后客户端丢失中间的事件,用户看到输出缺失. // 我们维护每个会话的事件环形缓冲(ResumeWindow 条),断线重连时从指定 ID 重放. // 替代方案:<强制客户端重新加载完整会话状态> - 否决:对长时间运行的 Agent 任务 // 用户体验很差(任务跑了 10 分钟,断线后前 9 分钟的输出全没了). type SSETransport struct { cfg bridge.BridgeConfig prefix string // URL 前缀,如 "/bridge" mu sync.RWMutex conns map[string]*SSESessionConn // sessionID → conn acceptCh chan bridge.SessionConn closed bool closedCh chan struct{} } // NewSSETransport 创建 SSE 传输层. // // prefix: HTTP 路径前缀,如 "/bridge". // SSE 端点:GET /events?session_id= // 消息端点:POST /messages func NewSSETransport(prefix string, cfg bridge.BridgeConfig) *SSETransport { if prefix == "" { prefix = "/bridge" } return &SSETransport{ cfg: cfg, prefix: strings.TrimRight(prefix, "/"), conns: make(map[string]*SSESessionConn), acceptCh: make(chan bridge.SessionConn, 64), closedCh: make(chan struct{}), } } // ServeHTTP 实现 http.Handler,路由 SSE/消息端点请求. func (t *SSETransport) ServeHTTP(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, t.prefix) switch { case r.Method == http.MethodGet && path == "/events": t.handleSSEConnect(w, r) case r.Method == http.MethodPost && path == "/messages": t.handleMessage(w, r) default: http.NotFound(w, r) } } // Accept 实现 bridge.BridgeTransport. func (t *SSETransport) Accept() <-chan bridge.SessionConn { return t.acceptCh } // Close 关闭传输层,断开所有连接. func (t *SSETransport) Close() error { t.mu.Lock() defer t.mu.Unlock() if t.closed { return nil } t.closed = true close(t.closedCh) // 关闭所有活跃连接 for _, conn := range t.conns { conn.closeInternal() } close(t.acceptCh) return nil } // handleSSEConnect 处理 GET /events?session_id= 请求,建立 SSE 流. func (t *SSETransport) handleSSEConnect(w http.ResponseWriter, r *http.Request) { t.mu.RLock() if t.closed { t.mu.RUnlock() http.Error(w, "transport closed", http.StatusServiceUnavailable) return } t.mu.RUnlock() sessionID := r.URL.Query().Get("session_id") if sessionID == "" { http.Error(w, "missing session_id", http.StatusBadRequest) return } // 检查客户端是否支持 SSE(必须实现 http.Flusher) flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } // Last-Event-ID:断线重连时客户端携带最后收到的事件 ID lastEventID := r.Header.Get("Last-Event-ID") // 设置 SSE 响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // 精妙之处(CLEVER): X-Accel-Buffering: no 告知 nginx 不要缓冲 SSE 响应-- // nginx 默认会缓冲后端响应,导致 SSE 事件被延迟推送给客户端. // 这是 SSE 在 nginx 反向代理后最常见的"为什么没有实时更新"问题. w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) flusher.Flush() conn := t.getOrCreateConn(sessionID, w, flusher, lastEventID) // 通知 DaemonManager 有新连接(或重连) select { case t.acceptCh <- conn: default: // acceptCh 满了(DaemonManager 没有及时消费),记录日志但不阻塞 log.Printf("sse: acceptCh full, dropping new conn for session %s", sessionID) } // 阻塞直到连接断开 select { case <-conn.Done(): case <-t.closedCh: conn.closeInternal() case <-r.Context().Done(): conn.closeInternal() } } // getOrCreateConn 获取或创建会话的 SSE 连接. // 如果会话已有连接(重连),替换旧连接(关闭旧的,新连接重放历史事件). func (t *SSETransport) getOrCreateConn( sessionID string, w http.ResponseWriter, flusher http.Flusher, lastEventID string, ) *SSESessionConn { t.mu.Lock() defer t.mu.Unlock() // 关闭旧连接(如果存在) if old, exists := t.conns[sessionID]; exists { old.closeInternal() } conn := newSSESessionConn(sessionID, w, flusher, t.cfg) t.conns[sessionID] = conn // 重放客户端未收到的历史事件 if lastEventID != "" { conn.replayFrom(lastEventID) } return conn } // handleMessage 处理 POST /messages,接收客户端消息. func (t *SSETransport) handleMessage(w http.ResponseWriter, r *http.Request) { // 大小限制 maxSize := t.cfg.MaxMessageSize if maxSize <= 0 { maxSize = 1 << 20 } r.Body = http.MaxBytesReader(w, r.Body, maxSize) body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, bridge.ErrMessageTooLarge.Error(), http.StatusRequestEntityTooLarge) return } var msg bridge.ClientMessage if err := json.Unmarshal(body, &msg); err != nil { http.Error(w, bridge.ErrInvalidMessage.Error(), http.StatusBadRequest) return } if msg.SessionID == "" { http.Error(w, "missing session_id in message", http.StatusBadRequest) return } t.mu.RLock() conn, exists := t.conns[msg.SessionID] t.mu.RUnlock() if !exists { http.Error(w, "session not found", http.StatusNotFound) return } // 将消息投递给 SessionConn 的 Recv channel select { case conn.recvCh <- msg: w.WriteHeader(http.StatusAccepted) case <-conn.doneCh: http.Error(w, "session closed", http.StatusGone) case <-r.Context().Done(): http.Error(w, "request cancelled", http.StatusRequestTimeout) } } // ─── SSESessionConn ─────────────────────────────────────────────────────────── // SSESessionConn 实现 bridge.SessionConn,基于 SSE 协议. type SSESessionConn struct { sessionID string w http.ResponseWriter flusher http.Flusher cfg bridge.BridgeConfig recvCh chan bridge.ClientMessage doneCh chan struct{} closeOnce sync.Once // 事件历史环形缓冲(用于断线重连重放) historyMu sync.Mutex history []bridge.BridgeEvent // 最近 cfg.ResumeWindow 条事件 histHead int histSize int writeMu sync.Mutex // 保护 http.ResponseWriter 并发写 } func newSSESessionConn( sessionID string, w http.ResponseWriter, flusher http.Flusher, cfg bridge.BridgeConfig, ) *SSESessionConn { if cfg.ResumeWindow <= 0 { cfg.ResumeWindow = 1000 } if cfg.EventBufferSize <= 0 { cfg.EventBufferSize = 256 } c := &SSESessionConn{ sessionID: sessionID, w: w, flusher: flusher, cfg: cfg, recvCh: make(chan bridge.ClientMessage, cfg.EventBufferSize), doneCh: make(chan struct{}), history: make([]bridge.BridgeEvent, cfg.ResumeWindow), } // 启动 ping goroutine,维持 SSE 连接活跃 go c.pingLoop() return c } func (c *SSESessionConn) SessionID() string { return c.sessionID } // Send 将事件序列化为 SSE 格式推送给客户端. // // SSE 格式: // // id: \n // event: \n // data: \n // \n // // 精妙之处(CLEVER): id 字段对 Last-Event-ID 机制至关重要-- // 浏览器在断线重连时自动携带最后收到的 id,服务端据此重放. // 没有 id 则 EventSource API 无法感知断线期间的事件. func (c *SSESessionConn) Send(ctx context.Context, evt bridge.BridgeEvent) error { select { case <-c.doneCh: return bridge.ErrConnClosed default: } // 序列化为 SSE 格式 payload, err := json.Marshal(evt.Payload) if err != nil { payload = []byte("{}") } sseData := fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n", evt.ID, evt.Type, string(payload)) // 写入超时 writeTimeout := c.cfg.WriteTimeout if writeTimeout <= 0 { writeTimeout = 10 * time.Second } writeCtx, cancel := context.WithTimeout(ctx, writeTimeout) defer cancel() done := make(chan error, 1) go func() { c.writeMu.Lock() defer c.writeMu.Unlock() _, writeErr := fmt.Fprint(c.w, sseData) if writeErr == nil { c.flusher.Flush() } done <- writeErr }() select { case err := <-done: if err != nil { c.closeInternal() return bridge.ErrConnClosed } case <-writeCtx.Done(): c.closeInternal() return bridge.ErrConnClosed case <-c.doneCh: return bridge.ErrConnClosed } // 追加到历史缓冲,用于断线重连重放 c.appendHistory(evt) return nil } func (c *SSESessionConn) Recv() <-chan bridge.ClientMessage { return c.recvCh } func (c *SSESessionConn) Done() <-chan struct{} { return c.doneCh } func (c *SSESessionConn) Close() error { c.closeInternal() return nil } func (c *SSESessionConn) closeInternal() { c.closeOnce.Do(func() { close(c.doneCh) }) } // pingLoop 定期发送 SSE 注释行(keepalive). // // 精妙之处(CLEVER): SSE 注释以 ":" 开头,浏览器 EventSource API 忽略注释, // 但这条数据会维持 TCP 连接活跃,防止负载均衡器/代理因 idle timeout 断开连接. // 15 秒间隔小于大多数代理的 60 秒 idle timeout 默认值. func (c *SSESessionConn) pingLoop() { interval := c.cfg.PingInterval if interval <= 0 { interval = 15 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-c.doneCh: return case <-ticker.C: c.writeMu.Lock() fmt.Fprintf(c.w, ": ping\n\n") c.flusher.Flush() c.writeMu.Unlock() } } } // appendHistory 将事件追加到历史环形缓冲. func (c *SSESessionConn) appendHistory(evt bridge.BridgeEvent) { c.historyMu.Lock() defer c.historyMu.Unlock() c.history[c.histHead] = evt c.histHead = (c.histHead + 1) % c.cfg.ResumeWindow if c.histSize < c.cfg.ResumeWindow { c.histSize++ } } // replayFrom 从 lastEventID 之后重放历史事件. // 用于断线重连后补发客户端未收到的事件. func (c *SSESessionConn) replayFrom(lastEventID string) { c.historyMu.Lock() defer c.historyMu.Unlock() if c.histSize == 0 { return } // 找到 lastEventID 在历史中的位置 // 历史是环形的,从最老的元素开始遍历 start := -1 cap := c.cfg.ResumeWindow oldest := (c.histHead - c.histSize + cap) % cap for i := 0; i < c.histSize; i++ { idx := (oldest + i) % cap if c.history[idx].ID == lastEventID { start = i + 1 // 从 lastEventID 之后开始重放 break } } if start < 0 { // lastEventID 不在历史窗口内,重放全部 start = 0 } // 重放 start 之后的所有事件 for i := start; i < c.histSize; i++ { idx := (oldest + i) % cap evt := c.history[idx] payload, _ := json.Marshal(evt.Payload) c.writeMu.Lock() fmt.Fprintf(c.w, "id: %s\nevent: %s\ndata: %s\n\n", evt.ID, evt.Type, string(payload)) c.flusher.Flush() c.writeMu.Unlock() } }