package daemon import ( "context" "sync" "sync/atomic" "testing" "time" ) func TestSessionPool_Unlimited(t *testing.T) { p := NewSessionPool(0) // 无限制 ctx := context.Background() for i := 0; i < 100; i++ { if err := p.Acquire(ctx); err != nil { t.Fatalf("unlimited pool should never block, got: %v", err) } } if p.Active() != 100 { t.Errorf("expected 100 active, got %d", p.Active()) } } func TestSessionPool_MaxEnforced(t *testing.T) { p := NewSessionPool(2) ctx := context.Background() if err := p.Acquire(ctx); err != nil { t.Fatal(err) } if err := p.Acquire(ctx); err != nil { t.Fatal(err) } // 第 3 次应该阻塞,ctx 超时后返回 timeoutCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) defer cancel() err := p.Acquire(timeoutCtx) if err == nil { t.Error("should have blocked and returned ctx error") } } func TestSessionPool_ReleaseWakesAcquire(t *testing.T) { p := NewSessionPool(1) ctx := context.Background() p.Acquire(ctx) // 占满 var acquired atomic.Bool go func() { p.Acquire(ctx) // 应该阻塞 acquired.Store(true) }() time.Sleep(20 * time.Millisecond) if acquired.Load() { t.Error("should still be blocked") } p.Release() // 释放,唤醒等待者 deadline := time.Now().Add(100 * time.Millisecond) for time.Now().Before(deadline) { if acquired.Load() { return } time.Sleep(5 * time.Millisecond) } t.Error("Acquire should have unblocked after Release") } func TestSessionPool_MultipleWaiters(t *testing.T) { // 多个等待者:Release 一次只有一个能成功 p := NewSessionPool(1) ctx := context.Background() p.Acquire(ctx) // 占满 var count atomic.Int32 var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func() { defer wg.Done() timeoutCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) defer cancel() if p.Acquire(timeoutCtx) == nil { count.Add(1) p.Release() } }() } time.Sleep(20 * time.Millisecond) p.Release() // 唤醒所有等待者 wg.Wait() // 三个等待者竞争,至少有 1 个应该成功(可能多个,因为每个成功后都 Release) if count.Load() == 0 { t.Error("at least one waiter should have acquired successfully") } } func TestSessionPool_ClosedReturnsError(t *testing.T) { p := NewSessionPool(5) p.Close() err := p.Acquire(context.Background()) if err != ErrPoolClosed { t.Errorf("expected ErrPoolClosed, got %v", err) } } func TestSessionPool_CloseWakesBlockedAcquire(t *testing.T) { p := NewSessionPool(1) ctx := context.Background() p.Acquire(ctx) // 占满 var gotErr error done := make(chan struct{}) go func() { gotErr = p.Acquire(ctx) // 阻塞 close(done) }() time.Sleep(20 * time.Millisecond) p.Close() select { case <-done: case <-time.After(100 * time.Millisecond): t.Fatal("Acquire should unblock after Close") } if gotErr != ErrPoolClosed { t.Errorf("expected ErrPoolClosed, got %v", gotErr) } } func TestSessionPool_Active(t *testing.T) { p := NewSessionPool(10) ctx := context.Background() for i := 0; i < 5; i++ { p.Acquire(ctx) } if p.Active() != 5 { t.Errorf("expected 5 active, got %d", p.Active()) } p.Release() p.Release() if p.Active() != 3 { t.Errorf("expected 3 active after 2 releases, got %d", p.Active()) } }