package evolve import ( "context" "errors" "reflect" "sync" "testing" "time" ) func mkEvent(entity, decisionID string, ts time.Time, fb *Feedback) ReplayEvent { return ReplayEvent{ Log: LogEntry{Entity: entity, DecisionID: decisionID, Timestamp: ts}, Feedback: fb, } } func mkFeedback(metric string, val float64, ts time.Time) *Feedback { return &Feedback{Metric: metric, Value: val, Timestamp: ts} } // ============================================================================ // AggregatorReflector // ============================================================================ func TestAggregator_SingleEvent(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() if err := r.OnEvent(ctx, mkEvent("A", "d1", now, mkFeedback("m", 5, now))); err != nil { t.Fatal(err) } s := r.Stats("A", "m") if s.Count != 1 || s.Sum != 5 || s.Mean != 5 || s.Min != 5 || s.Max != 5 { t.Errorf("single event stats: %+v", s) } } func TestAggregator_MultipleSameMetric(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() for _, v := range []float64{1, 3, 5, 7, 9} { if err := r.OnEvent(ctx, mkEvent("A", "d", now, mkFeedback("m", v, now))); err != nil { t.Fatal(err) } } s := r.Stats("A", "m") if s.Count != 5 || s.Sum != 25 || s.Mean != 5 || s.Min != 1 || s.Max != 9 { t.Errorf("aggregated stats: %+v", s) } } func TestAggregator_MultipleMetricsSeparate(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() _ = r.OnEvent(ctx, mkEvent("A", "d", now, mkFeedback("on_time", 0.9, now))) _ = r.OnEvent(ctx, mkEvent("A", "d", now, mkFeedback("damage", 0.02, now))) onTime := r.Stats("A", "on_time") damage := r.Stats("A", "damage") if onTime.Count != 1 || onTime.Sum != 0.9 { t.Errorf("on_time: %+v", onTime) } if damage.Count != 1 || damage.Sum != 0.02 { t.Errorf("damage: %+v", damage) } } func TestAggregator_PendingCountsOnNilFeedback(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() _ = r.OnEvent(ctx, mkEvent("A", "d1", now, nil)) _ = r.OnEvent(ctx, mkEvent("A", "d2", now, nil)) _ = r.OnEvent(ctx, mkEvent("A", "d3", now, nil)) // Metric="" is the pending view. pending := r.Stats("A", "") if pending.PendingCount != 3 { t.Errorf("pending: want 3, got %+v", pending) } // Also exposed when querying a specific metric. m := r.Stats("A", "on_time") if m.PendingCount != 3 { t.Errorf("pending visible via metric view: %+v", m) } } func TestAggregator_FeedbackDecrementsPending(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() _ = r.OnEvent(ctx, mkEvent("A", "d1", now, nil)) _ = r.OnEvent(ctx, mkEvent("A", "d2", now, nil)) // Feedback arrives for one of them. _ = r.OnEvent(ctx, mkEvent("A", "d1", now, mkFeedback("m", 0.5, now))) s := r.Stats("A", "m") if s.Count != 1 { t.Errorf("metric stats: %+v", s) } if s.PendingCount != 1 { t.Errorf("pending should decrement to 1: %+v", s) } } func TestAggregator_PendingNeverNegative(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() // Feedback arrives without a prior pending. Pending stays 0. _ = r.OnEvent(ctx, mkEvent("A", "d1", now, mkFeedback("m", 1, now))) s := r.Stats("A", "") if s.PendingCount != 0 { t.Errorf("pending must not go negative: %+v", s) } } func TestAggregator_UnknownEntityReturnsZero(t *testing.T) { r := NewAggregatorReflector() s := r.Stats("ghost", "m") if s.Count != 0 || s.PendingCount != 0 || s.Sum != 0 { t.Errorf("unknown entity: %+v", s) } } func TestAggregator_EntitiesMetricsGetters(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() _ = r.OnEvent(ctx, mkEvent("B", "d", now, mkFeedback("x", 1, now))) _ = r.OnEvent(ctx, mkEvent("A", "d", now, mkFeedback("y", 1, now))) _ = r.OnEvent(ctx, mkEvent("A", "d", now, mkFeedback("z", 1, now))) ents := r.Entities() if !reflect.DeepEqual(ents, []string{"A", "B"}) { t.Errorf("Entities: %v", ents) } metrics := r.Metrics("A") if !reflect.DeepEqual(metrics, []string{"y", "z"}) { t.Errorf("Metrics(A): %v", metrics) } if got := r.Metrics("ghost"); len(got) != 0 { t.Errorf("Metrics(unknown): want empty, got %v", got) } } func TestAggregator_Reset(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() now := time.Now().UTC() _ = r.OnEvent(ctx, mkEvent("A", "d", now, mkFeedback("m", 1, now))) r.Reset() s := r.Stats("A", "m") if s.Count != 0 { t.Errorf("after Reset: %+v", s) } if len(r.Entities()) != 0 { t.Errorf("Entities after Reset: %v", r.Entities()) } } func TestAggregator_CtxCanceled(t *testing.T) { r := NewAggregatorReflector() ctx, cancel := context.WithCancel(context.Background()) cancel() err := r.OnEvent(ctx, mkEvent("A", "d", time.Now(), nil)) if !errors.Is(err, context.Canceled) { t.Errorf("ctx canceled: want context.Canceled, got %v", err) } } func TestAggregator_ConcurrentOnEvent(t *testing.T) { r := NewAggregatorReflector() const workers = 8 const per = 100 now := time.Now().UTC() var wg sync.WaitGroup for w := 0; w < workers; w++ { wg.Add(1) go func(w int) { defer wg.Done() for i := 0; i < per; i++ { _ = r.OnEvent(context.Background(), mkEvent("A", "d", now, mkFeedback("m", 1, now))) } }(w) } wg.Wait() s := r.Stats("A", "m") if s.Count != workers*per { t.Errorf("concurrent count: want %d, got %d", workers*per, s.Count) } if s.Sum != float64(workers*per) { t.Errorf("concurrent sum: want %d, got %v", workers*per, s.Sum) } } func TestAggregator_LastTimestamp(t *testing.T) { r := NewAggregatorReflector() ctx := context.Background() t1 := time.Date(2026, 4, 18, 10, 0, 0, 0, time.UTC) t2 := time.Date(2026, 4, 18, 11, 0, 0, 0, time.UTC) _ = r.OnEvent(ctx, mkEvent("A", "d1", t2, mkFeedback("m", 1, t2))) _ = r.OnEvent(ctx, mkEvent("A", "d2", t1, mkFeedback("m", 1, t1))) // older s := r.Stats("A", "m") if !s.LastTimestamp.Equal(t2) { t.Errorf("lastTs: want %v, got %v", t2, s.LastTimestamp) } } func TestAggregator_ImplementsInterface(t *testing.T) { r := NewAggregatorReflector() var _ Reflector = r } // ============================================================================ // FuncReflector // ============================================================================ func TestFuncReflector_Basic(t *testing.T) { var got ReplayEvent r, err := NewFuncReflector(func(_ context.Context, ev ReplayEvent) error { got = ev return nil }) if err != nil { t.Fatal(err) } want := mkEvent("A", "d", time.Now(), nil) if err := r.OnEvent(context.Background(), want); err != nil { t.Fatal(err) } if got.Log.Entity != "A" || got.Log.DecisionID != "d" { t.Errorf("fn got %+v", got) } } func TestFuncReflector_ErrorPropagates(t *testing.T) { boom := errors.New("boom") r, _ := NewFuncReflector(func(context.Context, ReplayEvent) error { return boom }) err := r.OnEvent(context.Background(), mkEvent("A", "d", time.Now(), nil)) if !errors.Is(err, boom) { t.Errorf("error propagate: %v", err) } } func TestFuncReflector_NilFnRejected(t *testing.T) { if _, err := NewFuncReflector(nil); err == nil { t.Error("nil fn: want error") } } func TestFuncReflector_CtxCanceledShortCircuits(t *testing.T) { called := false r, _ := NewFuncReflector(func(context.Context, ReplayEvent) error { called = true return nil }) ctx, cancel := context.WithCancel(context.Background()) cancel() err := r.OnEvent(ctx, mkEvent("A", "d", time.Now(), nil)) if !errors.Is(err, context.Canceled) { t.Errorf("ctx canceled: want context.Canceled, got %v", err) } if called { t.Errorf("fn must not be called with canceled ctx") } } func TestFuncReflector_ImplementsInterface(t *testing.T) { r, _ := NewFuncReflector(func(context.Context, ReplayEvent) error { return nil }) var _ Reflector = r }