package bridge import ( "context" "errors" "sync" "sync/atomic" "testing" "time" ) func TestSerialBatchEventUploader_SingleEvent(t *testing.T) { ctx := context.Background() var uploaded [][]BridgeEvent var mu sync.Mutex u := NewSerialBatchEventUploader(ctx, func(_ context.Context, evts []BridgeEvent) error { mu.Lock() uploaded = append(uploaded, evts) mu.Unlock() return nil }, nil) u.Enqueue(BridgeEvent{ID: "1", Type: "text_delta"}) if err := u.Flush(ctx); err != nil { t.Fatal(err) } mu.Lock() defer mu.Unlock() if len(uploaded) != 1 || len(uploaded[0]) != 1 { t.Errorf("expected 1 batch of 1 event, got %v batches", len(uploaded)) } } func TestSerialBatchEventUploader_BatchAccumulation(t *testing.T) { // 上传函数加 50ms 延迟,模拟"上传在途"时新事件累积 ctx := context.Background() var batchSizes []int var mu sync.Mutex ready := make(chan struct{}) // 第一次上传开始的信号 firstCall := true u := NewSerialBatchEventUploader(ctx, func(_ context.Context, evts []BridgeEvent) error { if firstCall { firstCall = false close(ready) // 通知:第一次上传已开始 time.Sleep(50 * time.Millisecond) // 模拟慢上传 } mu.Lock() batchSizes = append(batchSizes, len(evts)) mu.Unlock() return nil }, nil) u.Enqueue(BridgeEvent{ID: "1"}) // 触发第一次上传(慢) <-ready // 等第一次上传开始 u.Enqueue(BridgeEvent{ID: "2"}) // 在第一次上传期间入队 u.Enqueue(BridgeEvent{ID: "3"}) // 在第一次上传期间入队 if err := u.Flush(context.Background()); err != nil { t.Fatal(err) } mu.Lock() defer mu.Unlock() // 应该是 2 批次:[1] 和 [2,3](背压导致 2 和 3 合并) if len(batchSizes) != 2 { t.Errorf("expected 2 batches, got %d: %v", len(batchSizes), batchSizes) } if batchSizes[0] != 1 { t.Errorf("first batch should have 1 event, got %d", batchSizes[0]) } if batchSizes[1] != 2 { t.Errorf("second batch should have 2 events (accumulated), got %d", batchSizes[1]) } } func TestSerialBatchEventUploader_Serial(t *testing.T) { // 验证串行性:上传 N+1 必须在 N 完成后才开始 ctx := context.Background() var inFlight atomic.Int32 var maxInFlight atomic.Int32 u := NewSerialBatchEventUploader(ctx, func(_ context.Context, _ []BridgeEvent) error { cur := inFlight.Add(1) // 记录最大并发 for { m := maxInFlight.Load() if cur <= m || maxInFlight.CompareAndSwap(m, cur) { break } } time.Sleep(10 * time.Millisecond) inFlight.Add(-1) return nil }, nil) for i := 0; i < 5; i++ { u.Enqueue(BridgeEvent{ID: "evt"}) time.Sleep(5 * time.Millisecond) } u.Flush(ctx) if maxInFlight.Load() > 1 { t.Errorf("max concurrent uploads = %d, expected 1 (serial)", maxInFlight.Load()) } } func TestSerialBatchEventUploader_OnError(t *testing.T) { ctx := context.Background() var errCount atomic.Int32 u := NewSerialBatchEventUploader(ctx, func(_ context.Context, _ []BridgeEvent) error { return errors.New("upload failed") }, func(_ []BridgeEvent, err error) { if err != nil { errCount.Add(1) } }, ) u.Enqueue(BridgeEvent{ID: "1"}) u.Flush(ctx) if errCount.Load() == 0 { t.Error("onError should have been called") } } func TestSerialBatchEventUploader_CtxCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // 立即取消 u := NewSerialBatchEventUploader(ctx, func(_ context.Context, _ []BridgeEvent) error { t.Error("upload should not be called after ctx cancel") return nil }, nil) // ctx 已取消,Enqueue 应该丢弃事件 u.Enqueue(BridgeEvent{ID: "1"}) // 不应 panic,不应调用 upload } func TestSerialBatchEventUploader_PendingCount(t *testing.T) { ctx := context.Background() hold := make(chan struct{}) u := NewSerialBatchEventUploader(ctx, func(_ context.Context, _ []BridgeEvent) error { <-hold // 阻塞住 return nil }, nil) u.Enqueue(BridgeEvent{ID: "1"}) // 触发上传(被 hold 住) time.Sleep(10 * time.Millisecond) u.Enqueue(BridgeEvent{ID: "2"}) // 进入 pending u.Enqueue(BridgeEvent{ID: "3"}) // 进入 pending if cnt := u.PendingCount(); cnt != 2 { t.Errorf("expected 2 pending, got %d", cnt) } close(hold) u.Flush(ctx) }