// SSETransport 测试 - 覆盖路由,错误路径,历史缓冲,断线重连重放. // // 精妙之处(CLEVER): 不真正建立 SSE 长连接(会阻塞测试 goroutine), // 而是单元测试每个可独立调用的方法 + 用 httptest.ResponseRecorder 测错误路径. // 长连接的端到端验证留给集成测试(pkg/daemon 那一层). package transport import ( "bytes" "context" "encoding/json" "net/http" "net/http/httptest" "strings" "sync" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/bridge" ) // === 构造函数 === func TestNewSSETransport_DefaultPrefix(t *testing.T) { tr := NewSSETransport("", bridge.DefaultBridgeConfig()) if tr.prefix != "/bridge" { t.Errorf("空 prefix 应使用默认 /bridge,实际: %q", tr.prefix) } } func TestNewSSETransport_TrimsTrailingSlash(t *testing.T) { tr := NewSSETransport("/api/", bridge.DefaultBridgeConfig()) if tr.prefix != "/api" { t.Errorf("尾部斜杠应被去掉,实际: %q", tr.prefix) } } func TestNewSSETransport_AcceptChannelExists(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() if tr.Accept() == nil { t.Error("Accept() 不应返回 nil channel") } } // === ServeHTTP 路由 === func TestServeHTTP_NotFoundForUnknownPath(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/bridge/unknown", nil) tr.ServeHTTP(w, r) if w.Code != http.StatusNotFound { t.Errorf("未知路径应 404,实际 %d", w.Code) } } func TestServeHTTP_MethodMismatch(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() // POST /events 应 404(只接受 GET) w := httptest.NewRecorder() r := httptest.NewRequest("POST", "/bridge/events", nil) tr.ServeHTTP(w, r) if w.Code != http.StatusNotFound { t.Errorf("POST /events 应 404,实际 %d", w.Code) } // GET /messages 应 404(只接受 POST) w = httptest.NewRecorder() r = httptest.NewRequest("GET", "/bridge/messages", nil) tr.ServeHTTP(w, r) if w.Code != http.StatusNotFound { t.Errorf("GET /messages 应 404,实际 %d", w.Code) } } // === handleSSEConnect 非阻塞错误路径 === func TestSSEConnect_MissingSessionID(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/bridge/events", nil) tr.ServeHTTP(w, r) if w.Code != http.StatusBadRequest { t.Errorf("缺 session_id 应 400,实际 %d", w.Code) } if !strings.Contains(w.Body.String(), "session_id") { t.Errorf("错误信息应提到 session_id,实际: %q", w.Body.String()) } } func TestSSEConnect_AfterClose(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) tr.Close() // 立即关 w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/bridge/events?session_id=s1", nil) tr.ServeHTTP(w, r) if w.Code != http.StatusServiceUnavailable { t.Errorf("关闭后请求应 503,实际 %d", w.Code) } } // === handleMessage === func TestHandleMessage_MalformedJSON(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() w := httptest.NewRecorder() r := httptest.NewRequest("POST", "/bridge/messages", bytes.NewBufferString("{not valid json")) tr.ServeHTTP(w, r) if w.Code != http.StatusBadRequest { t.Errorf("坏 JSON 应 400,实际 %d", w.Code) } } func TestHandleMessage_MissingSessionID(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() body, _ := json.Marshal(bridge.ClientMessage{Type: bridge.ClientMessagePrompt, Prompt: "hi"}) w := httptest.NewRecorder() r := httptest.NewRequest("POST", "/bridge/messages", bytes.NewBuffer(body)) tr.ServeHTTP(w, r) if w.Code != http.StatusBadRequest { t.Errorf("缺 session_id 应 400,实际 %d", w.Code) } } func TestHandleMessage_SessionNotFound(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() body, _ := json.Marshal(bridge.ClientMessage{ Type: bridge.ClientMessagePrompt, SessionID: "nonexistent", Prompt: "hi", }) w := httptest.NewRecorder() r := httptest.NewRequest("POST", "/bridge/messages", bytes.NewBuffer(body)) tr.ServeHTTP(w, r) if w.Code != http.StatusNotFound { t.Errorf("不存在的 session 应 404,实际 %d", w.Code) } } func TestHandleMessage_DeliveredToSession(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) defer tr.Close() // 直接构造一个 SSESessionConn 注入 conns map(不走真实 SSE 流) conn := newSSESessionConn("s1", httptest.NewRecorder(), &noopFlusher{}, bridge.DefaultBridgeConfig()) defer conn.Close() tr.mu.Lock() tr.conns["s1"] = conn tr.mu.Unlock() body, _ := json.Marshal(bridge.ClientMessage{ Type: bridge.ClientMessagePrompt, SessionID: "s1", Prompt: "hello", }) w := httptest.NewRecorder() r := httptest.NewRequest("POST", "/bridge/messages", bytes.NewBuffer(body)) tr.ServeHTTP(w, r) if w.Code != http.StatusAccepted { t.Errorf("成功投递应 202,实际 %d", w.Code) } // 接收方应能从 Recv 拿到 select { case msg := <-conn.Recv(): if msg.Prompt != "hello" { t.Errorf("Prompt 丢失: %q", msg.Prompt) } case <-time.After(time.Second): t.Fatal("Recv 超时,消息未投递") } } func TestHandleMessage_BodyTooLarge(t *testing.T) { cfg := bridge.DefaultBridgeConfig() cfg.MaxMessageSize = 100 // 强制 100 字节上限 tr := NewSSETransport("/bridge", cfg) defer tr.Close() // 写一个超过 100 字节的 body huge := strings.Repeat("a", 200) body := `{"type":"prompt","session_id":"s1","prompt":"` + huge + `"}` w := httptest.NewRecorder() r := httptest.NewRequest("POST", "/bridge/messages", strings.NewReader(body)) tr.ServeHTTP(w, r) if w.Code != http.StatusRequestEntityTooLarge { t.Errorf("超大 body 应 413,实际 %d", w.Code) } } // === SSESessionConn === func TestSessionConn_SendAndHistory(t *testing.T) { rec := httptest.NewRecorder() cfg := bridge.DefaultBridgeConfig() cfg.ResumeWindow = 5 // 小窗口便于测试 conn := newSSESessionConn("s1", rec, &noopFlusher{}, cfg) defer conn.Close() ctx := context.Background() for i := 0; i < 3; i++ { evt := bridge.BridgeEvent{ ID: "e" + string(rune('0'+i)), Type: "text_delta", SessionID: "s1", Payload: json.RawMessage(`{"text":"hi"}`), } if err := conn.Send(ctx, evt); err != nil { t.Fatalf("Send #%d failed: %v", i, err) } } // 历史应保留 3 条 conn.historyMu.Lock() if conn.histSize != 3 { t.Errorf("histSize = %d, want 3", conn.histSize) } conn.historyMu.Unlock() // rec 中应有 3 条 SSE 事件 out := rec.Body.String() if strings.Count(out, "event: text_delta") != 3 { t.Errorf("应写入 3 条事件,实际:\n%s", out) } } func TestSessionConn_HistoryRingBufferOverflow(t *testing.T) { rec := httptest.NewRecorder() cfg := bridge.DefaultBridgeConfig() cfg.ResumeWindow = 3 // 只保留最近 3 条 conn := newSSESessionConn("s1", rec, &noopFlusher{}, cfg) defer conn.Close() ctx := context.Background() for i := 0; i < 5; i++ { evt := bridge.BridgeEvent{ ID: "e" + string(rune('0'+i)), Type: "text_delta", SessionID: "s1", } if err := conn.Send(ctx, evt); err != nil { t.Fatalf("Send #%d failed: %v", i, err) } } // histSize 应封顶在 3 conn.historyMu.Lock() if conn.histSize != 3 { t.Errorf("histSize 应封顶 3,实际 %d", conn.histSize) } conn.historyMu.Unlock() } func TestSessionConn_SendAfterClose(t *testing.T) { conn := newSSESessionConn("s1", httptest.NewRecorder(), &noopFlusher{}, bridge.DefaultBridgeConfig()) conn.Close() err := conn.Send(context.Background(), bridge.BridgeEvent{ID: "e1", Type: "x"}) if err != bridge.ErrConnClosed { t.Errorf("关闭后 Send 应返回 ErrConnClosed,实际: %v", err) } } func TestSessionConn_CloseIdempotent(t *testing.T) { conn := newSSESessionConn("s1", httptest.NewRecorder(), &noopFlusher{}, bridge.DefaultBridgeConfig()) // 多次 close 不应 panic for i := 0; i < 5; i++ { if err := conn.Close(); err != nil { t.Errorf("Close #%d 应幂等,实际: %v", i, err) } } // Done() channel 应已关闭 select { case <-conn.Done(): // ok case <-time.After(100 * time.Millisecond): t.Error("Close 后 Done() 应立即就绪") } } func TestSessionConn_ReplayFromKnownID(t *testing.T) { rec := httptest.NewRecorder() cfg := bridge.DefaultBridgeConfig() cfg.ResumeWindow = 10 conn := newSSESessionConn("s1", rec, &noopFlusher{}, cfg) defer conn.Close() ctx := context.Background() for i := 0; i < 5; i++ { conn.Send(ctx, bridge.BridgeEvent{ ID: "e" + string(rune('0'+i)), Type: "text", SessionID: "s1", }) } // 重放从 e2 之后开始(应包含 e3, e4) rec.Body.Reset() conn.replayFrom("e2") out := rec.Body.String() if !strings.Contains(out, "id: e3") { t.Errorf("重放应包含 e3,实际:\n%s", out) } if !strings.Contains(out, "id: e4") { t.Errorf("重放应包含 e4,实际:\n%s", out) } if strings.Contains(out, "id: e2") { t.Errorf("重放不应包含 e2 自己(从其后开始),实际:\n%s", out) } } func TestSessionConn_ReplayFromUnknownID_FallsBackToFull(t *testing.T) { rec := httptest.NewRecorder() conn := newSSESessionConn("s1", rec, &noopFlusher{}, bridge.DefaultBridgeConfig()) defer conn.Close() ctx := context.Background() for i := 0; i < 3; i++ { conn.Send(ctx, bridge.BridgeEvent{ ID: "e" + string(rune('0'+i)), Type: "text", SessionID: "s1", }) } // 不存在的 ID 应触发全量重放 rec.Body.Reset() conn.replayFrom("nonexistent") out := rec.Body.String() for i := 0; i < 3; i++ { marker := "id: e" + string(rune('0'+i)) if !strings.Contains(out, marker) { t.Errorf("全量重放应包含 %s,实际:\n%s", marker, out) } } } func TestSessionConn_ReplayFromEmptyHistory(t *testing.T) { rec := httptest.NewRecorder() conn := newSSESessionConn("s1", rec, &noopFlusher{}, bridge.DefaultBridgeConfig()) defer conn.Close() // 空历史调用 replayFrom 应不写任何东西,也不 panic conn.replayFrom("e1") if rec.Body.Len() != 0 { t.Errorf("空历史不应写任何输出,实际: %q", rec.Body.String()) } } func TestSessionConn_SessionID(t *testing.T) { conn := newSSESessionConn("my-session", httptest.NewRecorder(), &noopFlusher{}, bridge.DefaultBridgeConfig()) defer conn.Close() if conn.SessionID() != "my-session" { t.Errorf("SessionID() = %q, want my-session", conn.SessionID()) } } // === Transport.Close === func TestTransport_CloseIdempotent(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) for i := 0; i < 3; i++ { if err := tr.Close(); err != nil { t.Errorf("Close #%d 应幂等,实际: %v", i, err) } } } func TestTransport_CloseClosesActiveConns(t *testing.T) { tr := NewSSETransport("/bridge", bridge.DefaultBridgeConfig()) // 注入一个连接 conn := newSSESessionConn("s1", httptest.NewRecorder(), &noopFlusher{}, bridge.DefaultBridgeConfig()) tr.mu.Lock() tr.conns["s1"] = conn tr.mu.Unlock() tr.Close() // 注入的连接应被关闭 select { case <-conn.Done(): // ok case <-time.After(time.Second): t.Error("Close transport 应触发活跃连接的 Done()") } } // === 并发安全性 === // // 精妙之处(CLEVER): SSE 的 Send 走 writeMu,appendHistory 走 historyMu, // 多 goroutine 并发 Send 不应数据竞争或死锁. func TestSessionConn_ConcurrentSend(t *testing.T) { cfg := bridge.DefaultBridgeConfig() cfg.ResumeWindow = 100 conn := newSSESessionConn("s1", httptest.NewRecorder(), &noopFlusher{}, cfg) defer conn.Close() var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() ctx := context.Background() for j := 0; j < 5; j++ { _ = conn.Send(ctx, bridge.BridgeEvent{ ID: "g", Type: "text", }) } }(i) } wg.Wait() // 50 次 Send,histSize 应封顶在 50(小于 ResumeWindow 100) conn.historyMu.Lock() if conn.histSize != 50 { t.Errorf("histSize = %d, want 50", conn.histSize) } conn.historyMu.Unlock() } // noopFlusher 是 http.Flusher 的空实现. // // 历史包袱(LEGACY): httptest.ResponseRecorder 自 Go 1.7 起实现了 http.Flusher, // 但 Flush() 是 no-op,本身已经够用. // 我们额外提供这个 noopFlusher 是因为 newSSESessionConn 接受独立的 http.Flusher 参数, // 在某些测试中需要把 ResponseWriter 和 Flusher 解耦传入. type noopFlusher struct{} func (n *noopFlusher) Flush() {}