package engine import ( "sync" "testing" ) // TestFlushGate_BasicFlow 测试基本的 Start/Enqueue/End 流程 func TestFlushGate_BasicFlow(t *testing.T) { g := NewFlushGate[string]() // 未 Start 时 Enqueue 应返回 false if g.Enqueue("a") { t.Error("Enqueue should return false when not active") } // Start 后 Enqueue 应返回 true g.Start() if !g.Enqueue("a", "b") { t.Error("Enqueue should return true when active") } if !g.Enqueue("c") { t.Error("Enqueue should return true when active") } // End 应返回所有排队的项 items := g.End() if len(items) != 3 { t.Fatalf("End should return 3 items, got %d", len(items)) } if items[0] != "a" || items[1] != "b" || items[2] != "c" { t.Errorf("items = %v, want [a b c]", items) } // End 后 Enqueue 应返回 false if g.Enqueue("d") { t.Error("Enqueue should return false after End") } } // TestFlushGate_Drop 测试丢弃排队项 func TestFlushGate_Drop(t *testing.T) { g := NewFlushGate[int]() g.Start() g.Enqueue(1, 2, 3) n := g.Drop() if n != 3 { t.Errorf("Drop should return 3, got %d", n) } // Drop 后 End 应返回空 items := g.End() if len(items) != 0 { t.Errorf("End after Drop should return 0 items, got %d", len(items)) } } // TestFlushGate_Deactivate 测试清除激活状态但保留 pending func TestFlushGate_Deactivate(t *testing.T) { g := NewFlushGate[string]() g.Start() g.Enqueue("a", "b") g.Deactivate() // Deactivate 后不再接受新项 if g.Enqueue("c") { t.Error("Enqueue should return false after Deactivate") } // 但 pending 仍然保留 if g.PendingCount() != 2 { t.Errorf("PendingCount should be 2 after Deactivate, got %d", g.PendingCount()) } // 重新 Start 后可以继续接受 g.Start() g.Enqueue("c") items := g.End() if len(items) != 3 { t.Fatalf("End should return 3 items (preserved + new), got %d", len(items)) } } // TestFlushGate_IsActive 测试活动状态查询 func TestFlushGate_IsActive(t *testing.T) { g := NewFlushGate[int]() if g.IsActive() { t.Error("should not be active initially") } g.Start() if !g.IsActive() { t.Error("should be active after Start") } g.End() if g.IsActive() { t.Error("should not be active after End") } } // TestFlushGate_ConcurrentAccess 测试并发安全性 func TestFlushGate_ConcurrentAccess(t *testing.T) { g := NewFlushGate[int]() g.Start() var wg sync.WaitGroup const n = 100 // 并发 Enqueue for i := 0; i < n; i++ { wg.Add(1) go func(v int) { defer wg.Done() g.Enqueue(v) }(i) } wg.Wait() items := g.End() if len(items) != n { t.Errorf("expected %d items after concurrent Enqueue, got %d", n, len(items)) } } // TestFlushGate_EmptyEnd 测试无排队项时 End func TestFlushGate_EmptyEnd(t *testing.T) { g := NewFlushGate[string]() g.Start() items := g.End() if items != nil && len(items) != 0 { t.Errorf("End with no items should return nil or empty, got %v", items) } } // TestFlushGate_MultipleStartEnd 测试多轮 Start/End func TestFlushGate_MultipleStartEnd(t *testing.T) { g := NewFlushGate[int]() // 第一轮 g.Start() g.Enqueue(1) items := g.End() if len(items) != 1 || items[0] != 1 { t.Errorf("round 1: got %v, want [1]", items) } // 第二轮 g.Start() g.Enqueue(2, 3) items = g.End() if len(items) != 2 || items[0] != 2 || items[1] != 3 { t.Errorf("round 2: got %v, want [2 3]", items) } }