package circuitbreaker import ( "errors" "sync" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/validator" ) // fakeClock is a controllable time source for deterministic cooldown // tests. type fakeClock struct { mu sync.Mutex now time.Time } func (f *fakeClock) Now() time.Time { f.mu.Lock() defer f.mu.Unlock() return f.now } func (f *fakeClock) Advance(d time.Duration) { f.mu.Lock() defer f.mu.Unlock() f.now = f.now.Add(d) } func newTestBreaker(threshold int, cooldown time.Duration) (*CircuitBreaker, *fakeClock) { clk := &fakeClock{now: time.Unix(1000, 0)} b := New(Config{ Threshold: threshold, Cooldown: cooldown, Clock: clk, }) return b, clk } // -- defaults -- func TestNew_AppliesDefaults(t *testing.T) { b := New(Config{}) if b.threshold != DefaultThreshold { t.Errorf("expected default threshold %d, got %d", DefaultThreshold, b.threshold) } if b.cooldown != DefaultCooldown { t.Errorf("expected default cooldown %v, got %v", DefaultCooldown, b.cooldown) } if b.clock == nil { t.Errorf("expected default clock to be non-nil") } } func TestNew_NegativeValuesUseDefaults(t *testing.T) { b := New(Config{Threshold: -1, Cooldown: -time.Second}) if b.threshold != DefaultThreshold { t.Errorf("negative threshold should use default, got %d", b.threshold) } if b.cooldown != DefaultCooldown { t.Errorf("negative cooldown should use default, got %v", b.cooldown) } } // -- Closed state -- func TestBreaker_StartsClosed(t *testing.T) { b, _ := newTestBreaker(3, time.Minute) if b.State() != StateClosed { t.Errorf("should start Closed, got %v", b.State()) } if err := b.Allow(); err != nil { t.Errorf("Closed should allow, got %v", err) } } func TestBreaker_Closed_FailuresBelowThreshold(t *testing.T) { b, _ := newTestBreaker(3, time.Minute) b.RecordFailure() b.RecordFailure() if b.State() != StateClosed { t.Errorf("2/3 failures should keep Closed, got %v", b.State()) } if b.FailureCount() != 2 { t.Errorf("failCount should be 2, got %d", b.FailureCount()) } if err := b.Allow(); err != nil { t.Errorf("still Closed, Allow should pass, got %v", err) } } func TestBreaker_Closed_SuccessResetsCounter(t *testing.T) { b, _ := newTestBreaker(3, time.Minute) b.RecordFailure() b.RecordFailure() b.RecordSuccess() if b.FailureCount() != 0 { t.Errorf("success should reset failCount, got %d", b.FailureCount()) } } // -- Trip to Open -- func TestBreaker_TripsAtThreshold(t *testing.T) { b, _ := newTestBreaker(3, time.Minute) b.RecordFailure() b.RecordFailure() b.RecordFailure() if b.State() != StateOpen { t.Errorf("3rd failure should trip to Open, got %v", b.State()) } if err := b.Allow(); !errors.Is(err, ErrOpen) { t.Errorf("Open should return ErrOpen, got %v", err) } } // -- Open -> HalfOpen -- func TestBreaker_Open_RejectsDuringCooldown(t *testing.T) { b, clk := newTestBreaker(1, 60*time.Second) b.RecordFailure() clk.Advance(30 * time.Second) if err := b.Allow(); !errors.Is(err, ErrOpen) { t.Errorf("mid-cooldown Allow should ErrOpen, got %v", err) } } func TestBreaker_Open_AfterCooldown_AllowsProbe(t *testing.T) { b, clk := newTestBreaker(1, 60*time.Second) b.RecordFailure() clk.Advance(61 * time.Second) if err := b.Allow(); err != nil { t.Errorf("post-cooldown Allow should pass (probe), got %v", err) } if b.State() != StateHalfOpen { t.Errorf("post-cooldown should be HalfOpen, got %v", b.State()) } } func TestBreaker_HalfOpen_OnlyOneProbe(t *testing.T) { b, clk := newTestBreaker(1, 60*time.Second) b.RecordFailure() clk.Advance(61 * time.Second) _ = b.Allow() // probe granted if err := b.Allow(); !errors.Is(err, ErrOpen) { t.Errorf("second HalfOpen Allow should ErrOpen, got %v", err) } } func TestBreaker_HalfOpen_SuccessRestoresClosed(t *testing.T) { b, clk := newTestBreaker(2, 60*time.Second) b.RecordFailure() b.RecordFailure() clk.Advance(61 * time.Second) _ = b.Allow() b.RecordSuccess() if b.State() != StateClosed { t.Errorf("HalfOpen + success should restore Closed, got %v", b.State()) } if b.FailureCount() != 0 { t.Errorf("restore should clear failCount, got %d", b.FailureCount()) } } func TestBreaker_HalfOpen_FailureRestartsCooldown(t *testing.T) { b, clk := newTestBreaker(1, 60*time.Second) b.RecordFailure() // Open at t=0 clk.Advance(61 * time.Second) _ = b.Allow() // HalfOpen at t=61 clk.Advance(10 * time.Second) b.RecordFailure() // probe failed at t=71 if b.State() != StateOpen { t.Errorf("HalfOpen + failure should trip back to Open, got %v", b.State()) } // cooldown must restart: at t=71+30s=101s still Open clk.Advance(30 * time.Second) if err := b.Allow(); !errors.Is(err, ErrOpen) { t.Errorf("after HalfOpen fail, cooldown restarted; Allow should ErrOpen, got %v", err) } // at t=71+60s=131s cooldown elapsed -> probe clk.Advance(30 * time.Second) if err := b.Allow(); err != nil { t.Errorf("restart cooldown elapsed, Allow should probe, got %v", err) } } // -- VerdictSink bridge -- func TestBreaker_VerdictSink_BlockTriggersFailure(t *testing.T) { b, _ := newTestBreaker(2, time.Minute) sink := b.VerdictSink() sink("SQLCAS", validator.Verdict{Approved: false, Severity: validator.SeverityBlock}) sink("SQLCAS", validator.Verdict{Approved: false, Severity: validator.SeverityBlock}) if b.State() != StateOpen { t.Errorf("2 Block verdicts should trip breaker (threshold=2), got %v", b.State()) } } func TestBreaker_VerdictSink_WarnRecordedAsSuccess(t *testing.T) { b, _ := newTestBreaker(2, time.Minute) sink := b.VerdictSink() sink("SQLCAS", validator.Verdict{Approved: false, Severity: validator.SeverityBlock}) sink("SQLCAS", validator.Verdict{Approved: false, Severity: validator.SeverityWarn}) if b.FailureCount() != 0 { t.Errorf("Warn should record as success clearing failCount, got %d", b.FailureCount()) } } func TestBreaker_VerdictSink_ApprovedRecordedAsSuccess(t *testing.T) { b, _ := newTestBreaker(2, time.Minute) sink := b.VerdictSink() sink("SQLCAS", validator.Verdict{Approved: false, Severity: validator.SeverityBlock}) sink("SQLCAS", validator.Verdict{Approved: true, Severity: validator.SeverityWarn}) if b.FailureCount() != 0 { t.Errorf("Approved should reset failCount, got %d", b.FailureCount()) } } func TestBreaker_VerdictSink_IgnoresToolName(t *testing.T) { // Baseline implementation: single breaker protects all tools. // Different toolNames should hit the same failure counter. b, _ := newTestBreaker(3, time.Minute) sink := b.VerdictSink() sink("SQLCAS", validator.Verdict{Severity: validator.SeverityBlock}) sink("SQLDryRun", validator.Verdict{Severity: validator.SeverityBlock}) sink("anything-else", validator.Verdict{Severity: validator.SeverityBlock}) if b.State() != StateOpen { t.Errorf("3 Block verdicts from any tools should trip (threshold=3), got %v", b.State()) } } // -- concurrency -- func TestBreaker_ConcurrentAccess(t *testing.T) { b, _ := newTestBreaker(100, time.Minute) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 50; j++ { b.RecordFailure() _ = b.Allow() } }() } wg.Wait() // After 10×50=500 failures, threshold=100, must be Open. if b.State() != StateOpen { t.Errorf("expected Open after 500 failures (threshold 100), got %v", b.State()) } } // -- State.String -- func TestState_String(t *testing.T) { tests := []struct { s State want string }{ {StateClosed, "closed"}, {StateOpen, "open"}, {StateHalfOpen, "half-open"}, {State(99), "unknown"}, } for _, tc := range tests { if got := tc.s.String(); got != tc.want { t.Errorf("State(%d).String() = %q, want %q", tc.s, got, tc.want) } } }