package evolve import ( "context" "errors" "sync" "sync/atomic" "testing" "time" ) // recordingReflector captures every event it receives. type recordingReflector struct { mu sync.Mutex events []ReplayEvent err error // returned from OnEvent when non-nil } func (r *recordingReflector) OnEvent(ctx context.Context, event ReplayEvent) error { r.mu.Lock() defer r.mu.Unlock() r.events = append(r.events, event) return r.err } func (r *recordingReflector) count() int { r.mu.Lock() defer r.mu.Unlock() return len(r.events) } func (r *recordingReflector) snapshot() []ReplayEvent { r.mu.Lock() defer r.mu.Unlock() out := make([]ReplayEvent, len(r.events)) copy(out, r.events) return out } // fixture builds a replayer backed by temp-dir file source + feedback channel. func fixture(t *testing.T, opts ...ReplayerOption) (*DefaultLogReplayer, *FileLogSource, *FileFeedbackChannel) { t.Helper() src := newLogSource(t) fb := newFeedbackChannel(t) return NewDefaultLogReplayer(src, fb, opts...), src, fb } func TestDefaultLogReplayer_EmptySource(t *testing.T) { r, _, _ := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) if err := r.Replay(context.Background(), base, base.Add(time.Hour), nil); err != nil { t.Fatalf("Replay: %v", err) } if rf.count() != 0 { t.Errorf("empty source: want 0 events, got %d", rf.count()) } } func TestDefaultLogReplayer_DecisionNoFeedback(t *testing.T) { r, src, _ := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) if err := src.Append(context.Background(), LogEntry{Timestamp: base, DecisionID: "d1", Entity: "carrier-A"}); err != nil { t.Fatal(err) } if err := r.Replay(context.Background(), base.Add(-time.Hour), base.Add(time.Hour), nil); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 1 { t.Fatalf("want 1 event, got %d", len(events)) } if events[0].Feedback != nil { t.Errorf("no feedback yet: want nil, got %+v", events[0].Feedback) } } func TestDefaultLogReplayer_DecisionPairsWithFeedback(t *testing.T) { r, src, fb := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) ctx := context.Background() decisionTime := time.Now().UTC().Add(-time.Hour) if err := src.Append(ctx, LogEntry{Timestamp: decisionTime, DecisionID: "d1", Entity: "carrier-A"}); err != nil { t.Fatal(err) } // Feedback arrives after the decision, within window. if err := fb.Report(ctx, "carrier-A", "on_time_rate", 0.95, 0.8, nil); err != nil { t.Fatal(err) } from := decisionTime.Add(-time.Hour) to := time.Now().UTC().Add(time.Second) if err := r.Replay(ctx, from, to, nil); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 1 { t.Fatalf("want 1 event, got %d", len(events)) } if events[0].Feedback == nil { t.Fatalf("feedback nil, expected paired") } if events[0].Feedback.Metric != "on_time_rate" { t.Errorf("metric: got %q", events[0].Feedback.Metric) } } func TestDefaultLogReplayer_MultiMetricFirstTouch(t *testing.T) { r, src, fb := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) ctx := context.Background() decisionTime := time.Now().UTC().Add(-time.Hour) if err := src.Append(ctx, LogEntry{Timestamp: decisionTime, DecisionID: "d1", Entity: "carrier-A"}); err != nil { t.Fatal(err) } // First-touch for on_time + damage_rate, and a later on_time that should NOT pair. if err := fb.Report(ctx, "carrier-A", "on_time", 0.9, 1, nil); err != nil { t.Fatal(err) } if err := fb.Report(ctx, "carrier-A", "damage_rate", 0.02, 1, nil); err != nil { t.Fatal(err) } if err := fb.Report(ctx, "carrier-A", "on_time", 0.7, 1, nil); err != nil { t.Fatal(err) } if err := r.Replay(ctx, decisionTime.Add(-time.Hour), time.Now().UTC().Add(time.Second), nil); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 2 { t.Fatalf("want 2 events (one per metric), got %d", len(events)) } metrics := map[string]int{} for _, e := range events { if e.Feedback == nil { t.Errorf("nil feedback in multi-metric test") continue } metrics[e.Feedback.Metric]++ } if metrics["on_time"] != 1 || metrics["damage_rate"] != 1 { t.Errorf("metrics histogram: %+v", metrics) } } func TestDefaultLogReplayer_WindowExcludes(t *testing.T) { r, src, fb := fixture(t, WithFeedbackWindow(time.Minute)) rf := &recordingReflector{} r.RegisterReflector(rf) ctx := context.Background() decisionTime := time.Now().UTC().Add(-time.Hour) if err := src.Append(ctx, LogEntry{Timestamp: decisionTime, DecisionID: "d1", Entity: "carrier-A"}); err != nil { t.Fatal(err) } // Feedback is 1h after decision but window is 1min => should NOT pair. if err := fb.Report(ctx, "carrier-A", "on_time", 0.9, 1, nil); err != nil { t.Fatal(err) } if err := r.Replay(ctx, decisionTime.Add(-time.Hour), time.Now().UTC().Add(time.Second), nil); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 1 || events[0].Feedback != nil { t.Errorf("want 1 event with nil feedback, got %+v", events) } } func TestDefaultLogReplayer_PreDecisionFeedbackIgnored(t *testing.T) { r, src, fb := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) ctx := context.Background() // Feedback first, then the decision. Feedback predates decision => ignored. if err := fb.Report(ctx, "carrier-A", "on_time", 0.9, 1, nil); err != nil { t.Fatal(err) } time.Sleep(20 * time.Millisecond) decisionTime := time.Now().UTC() if err := src.Append(ctx, LogEntry{Timestamp: decisionTime, DecisionID: "d1", Entity: "carrier-A"}); err != nil { t.Fatal(err) } if err := r.Replay(ctx, decisionTime.Add(-time.Hour), time.Now().UTC().Add(time.Second), nil); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 1 || events[0].Feedback != nil { t.Errorf("pre-decision feedback should not pair: %+v", events) } } func TestDefaultLogReplayer_FilterSkipsFeedbackQuery(t *testing.T) { r, src, fb := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) ctx := context.Background() decisionTime := time.Now().UTC().Add(-time.Hour) if err := src.Append(ctx, LogEntry{Timestamp: decisionTime, DecisionID: "keep", Entity: "carrier-A"}); err != nil { t.Fatal(err) } if err := src.Append(ctx, LogEntry{Timestamp: decisionTime.Add(time.Minute), DecisionID: "drop", Entity: "carrier-B"}); err != nil { t.Fatal(err) } if err := fb.Report(ctx, "carrier-A", "on_time", 0.9, 1, nil); err != nil { t.Fatal(err) } filter := func(e LogEntry) bool { return e.DecisionID == "keep" } if err := r.Replay(ctx, decisionTime.Add(-time.Hour), time.Now().UTC().Add(time.Second), filter); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 1 || events[0].Log.DecisionID != "keep" { t.Errorf("filter: got %+v", events) } } func TestDefaultLogReplayer_MultipleReflectorsAllReceive(t *testing.T) { r, src, _ := fixture(t) a, b, c := &recordingReflector{}, &recordingReflector{}, &recordingReflector{} r.RegisterReflector(a) r.RegisterReflector(b) r.RegisterReflector(c) base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) if err := src.Append(context.Background(), LogEntry{Timestamp: base, DecisionID: "d"}); err != nil { t.Fatal(err) } if err := r.Replay(context.Background(), base.Add(-time.Hour), base.Add(time.Hour), nil); err != nil { t.Fatal(err) } if a.count() != 1 || b.count() != 1 || c.count() != 1 { t.Errorf("counts: a=%d b=%d c=%d", a.count(), b.count(), c.count()) } } func TestDefaultLogReplayer_ReflectorErrorDoesNotStop(t *testing.T) { var logCalls atomic.Int32 logger := func(format string, args ...any) { logCalls.Add(1) } r, src, _ := fixture(t, WithLogger(logger)) bad := &recordingReflector{err: errors.New("boom")} good := &recordingReflector{} r.RegisterReflector(bad) r.RegisterReflector(good) base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) for i := 0; i < 3; i++ { if err := src.Append(context.Background(), LogEntry{Timestamp: base.Add(time.Duration(i) * time.Minute), DecisionID: "d"}); err != nil { t.Fatal(err) } } if err := r.Replay(context.Background(), base.Add(-time.Hour), base.Add(time.Hour), nil); err != nil { t.Fatalf("Replay must not return reflector error: %v", err) } if bad.count() != 3 { t.Errorf("bad reflector count: want 3, got %d", bad.count()) } if good.count() != 3 { t.Errorf("good reflector count: want 3, got %d", good.count()) } if logCalls.Load() != 3 { t.Errorf("logger calls: want 3, got %d", logCalls.Load()) } } func TestDefaultLogReplayer_CtxCancel(t *testing.T) { r, src, _ := fixture(t) r.RegisterReflector(&recordingReflector{}) base := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC) for i := 0; i < 500; i++ { if err := src.Append(context.Background(), LogEntry{Timestamp: base.Add(time.Duration(i) * time.Second), DecisionID: "d"}); err != nil { t.Fatal(err) } } ctx, cancel := context.WithCancel(context.Background()) cancel() err := r.Replay(ctx, base, base.Add(24*time.Hour), nil) if !errors.Is(err, context.Canceled) { t.Errorf("cancelled ctx: want context.Canceled, got %v", err) } } func TestDefaultLogReplayer_EntryWithoutEntity(t *testing.T) { r, src, _ := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) if err := src.Append(context.Background(), LogEntry{Timestamp: base, DecisionID: "d-no-entity"}); err != nil { t.Fatal(err) } if err := r.Replay(context.Background(), base.Add(-time.Hour), base.Add(time.Hour), nil); err != nil { t.Fatal(err) } events := rf.snapshot() if len(events) != 1 || events[0].Feedback != nil || events[0].Log.DecisionID != "d-no-entity" { t.Errorf("no-entity entry: got %+v", events) } } func TestDefaultLogReplayer_ConcurrentRegisterDuringReplay(t *testing.T) { r, src, _ := fixture(t) rf := &recordingReflector{} r.RegisterReflector(rf) base := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC) for i := 0; i < 200; i++ { if err := src.Append(context.Background(), LogEntry{Timestamp: base.Add(time.Duration(i) * time.Minute), DecisionID: "d"}); err != nil { t.Fatal(err) } } var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() _ = r.Replay(context.Background(), base, base.Add(24*time.Hour), nil) }() go func() { defer wg.Done() for i := 0; i < 20; i++ { r.RegisterReflector(&recordingReflector{}) } }() wg.Wait() // No race (go test -race) + initial reflector saw every event. if rf.count() != 200 { t.Errorf("initial reflector: want 200 events, got %d", rf.count()) } } func TestDefaultLogReplayer_WindowOptionApplied(t *testing.T) { r, _, _ := fixture(t, WithFeedbackWindow(42*time.Second)) if r.window != 42*time.Second { t.Errorf("WithFeedbackWindow: want 42s, got %v", r.window) } } func TestDefaultLogReplayer_NilReflectorIgnored(t *testing.T) { r, src, _ := fixture(t) r.RegisterReflector(nil) // must not panic, must not be dispatched to rf := &recordingReflector{} r.RegisterReflector(rf) base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) if err := src.Append(context.Background(), LogEntry{Timestamp: base, DecisionID: "d"}); err != nil { t.Fatal(err) } if err := r.Replay(context.Background(), base.Add(-time.Hour), base.Add(time.Hour), nil); err != nil { t.Fatal(err) } if rf.count() != 1 { t.Errorf("count: %d", rf.count()) } } func TestDefaultLogReplayer_NilSourcePanics(t *testing.T) { defer func() { if r := recover(); r == nil { t.Errorf("nil source: expected panic") } }() _ = NewDefaultLogReplayer(nil, newFeedbackChannel(t)) } func TestDefaultLogReplayer_NilFeedbackPanics(t *testing.T) { defer func() { if r := recover(); r == nil { t.Errorf("nil feedback: expected panic") } }() _ = NewDefaultLogReplayer(newLogSource(t), nil) } func TestDefaultLogReplayer_WithFeedbackWindowZeroPanics(t *testing.T) { defer func() { if r := recover(); r == nil { t.Errorf("zero window: expected panic") } }() _, _, _ = fixture(t, WithFeedbackWindow(0)) } func TestMatchFeedback_FirstTouchPerMetric(t *testing.T) { t0 := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) fbs := []Feedback{ {Timestamp: t0.Add(-time.Hour), Metric: "on_time"}, // before decision {Timestamp: t0.Add(5 * time.Minute), Metric: "on_time"}, // first-touch on_time {Timestamp: t0.Add(10 * time.Minute), Metric: "on_time"}, // later, ignored {Timestamp: t0.Add(20 * time.Minute), Metric: "damage_rate"}, // first-touch damage {Timestamp: t0.Add(48 * time.Hour), Metric: "on_time"}, // out of window } matched := matchFeedback(fbs, t0, time.Hour) if len(matched) != 2 { t.Fatalf("want 2, got %d (%+v)", len(matched), matched) } gotMetrics := map[string]time.Time{} for _, m := range matched { gotMetrics[m.Metric] = m.Timestamp } if gotMetrics["on_time"] != t0.Add(5*time.Minute) { t.Errorf("on_time first-touch: want +5min, got %v", gotMetrics["on_time"]) } if gotMetrics["damage_rate"] != t0.Add(20*time.Minute) { t.Errorf("damage_rate first-touch: want +20min, got %v", gotMetrics["damage_rate"]) } }