package evolve import ( "context" "fmt" "log" "sync" "time" ) // DefaultLogReplayer is the reference LogReplayer implementation. It composes // a LogSource and a FeedbackChannel; either can be any implementation // (file / SQL / object-storage), so the struct is not named "FileLogReplayer". // // Pairing strategy: // For every LogEntry read from the source, the replayer looks up the entity's // feedback history and pairs it with at most one feedback per metric. The // "first-touch" rule picks the earliest feedback whose Timestamp falls in // [entry.Timestamp, entry.Timestamp + window). Zero matches produce a single // ReplayEvent with Feedback=nil (contract: nil = KPI not yet arrived). // // CLEVER: feedback lookup is lazy per-entity. The first time an entity is // seen in the stream we call FeedbackChannel.Query once and cache the result // for the rest of the replay. A 1000-entity replay makes 1000 queries total, // not 1000 per decision. Cache lives for the duration of one Replay call. // // LEGACY: aggregation is fixed at "first-touch per metric". Mean / last / // median are deliberately not surfaced -- consumers that need them can // compute from raw feedback inside their Reflector. Adding aggregation // knobs here would pollute the interface without clear v0.3 demand. // // Reflector dispatch: // - Serial in registration order. Reflectors may hold in-process state; // parallel dispatch would force every implementor to be thread-safe. // - OnEvent errors are logged but not returned. The contract says the // replayer does not stop on reflector errors. // - The reflector slice is snapshotted at the start of each Replay, so a // RegisterReflector call mid-replay does not affect the current run. type DefaultLogReplayer struct { source LogSource feedback FeedbackChannel window time.Duration logger func(format string, args ...any) mu sync.RWMutex reflectors []Reflector } // ReplayerOption configures a DefaultLogReplayer at construction time. type ReplayerOption func(*DefaultLogReplayer) // WithFeedbackWindow sets the time window used for pairing. Default: 7 days. // Panics if d <= 0; a zero/negative window would never match anything. func WithFeedbackWindow(d time.Duration) ReplayerOption { return func(r *DefaultLogReplayer) { if d <= 0 { panic("evolve: WithFeedbackWindow requires d > 0") } r.window = d } } // WithLogger routes reflector error messages to a custom sink. Default: // log.Printf (standard library). Pass a no-op to silence errors. func WithLogger(fn func(format string, args ...any)) ReplayerOption { return func(r *DefaultLogReplayer) { if fn != nil { r.logger = fn } } } // NewDefaultLogReplayer constructs a replayer composing source and feedback. // Both are required; passing nil is a programming error and panics. func NewDefaultLogReplayer(source LogSource, feedback FeedbackChannel, opts ...ReplayerOption) *DefaultLogReplayer { if source == nil { panic("evolve: NewDefaultLogReplayer requires non-nil LogSource") } if feedback == nil { panic("evolve: NewDefaultLogReplayer requires non-nil FeedbackChannel") } r := &DefaultLogReplayer{ source: source, feedback: feedback, window: 7 * 24 * time.Hour, logger: log.Printf, } for _, o := range opts { o(r) } return r } // RegisterReflector adds r to the reflector list. Safe to call concurrently. // Duplicate registrations are allowed; the reflector will receive each event // as many times as it was registered (the caller's responsibility). func (d *DefaultLogReplayer) RegisterReflector(r Reflector) { if r == nil { return } d.mu.Lock() d.reflectors = append(d.reflectors, r) d.mu.Unlock() } // Replay implements LogReplayer. Scans LogSource for [from, to], pairs each // entry with feedback, and dispatches to every registered Reflector. func (d *DefaultLogReplayer) Replay(ctx context.Context, from, to time.Time, filter FilterFunc) error { if filter == nil { filter = func(LogEntry) bool { return true } } if err := ctx.Err(); err != nil { return err } ch, err := d.source.Read(ctx, from, to) if err != nil { return fmt.Errorf("evolve: replay read: %w", err) } d.mu.RLock() reflectors := make([]Reflector, len(d.reflectors)) copy(reflectors, d.reflectors) d.mu.RUnlock() // Drain the source even on early return, so the reader goroutine exits // and its file handles close. ctx cancel propagates via the source, but // a filter-only early exit has no such channel. defer func() { for range ch { } }() fbCache := make(map[string][]Feedback) for entry := range ch { if err := ctx.Err(); err != nil { return err } if !filter(entry) { continue } var matched []Feedback if entry.Entity != "" { fbs, cacheErr := d.fetchFeedback(ctx, entry.Entity, from, fbCache) if cacheErr != nil { return fmt.Errorf("evolve: replay feedback query %q: %w", entry.Entity, cacheErr) } matched = matchFeedback(fbs, entry.Timestamp, d.window) } if len(matched) == 0 { d.dispatch(ctx, reflectors, ReplayEvent{Log: entry}) continue } for i := range matched { fb := matched[i] d.dispatch(ctx, reflectors, ReplayEvent{Log: entry, Feedback: &fb}) } } // Source may close its channel when its own ctx check trips, producing // a clean EOF rather than an error. Re-check ctx so a cancel between // read start and channel drain still surfaces as context.Canceled. if err := ctx.Err(); err != nil { return err } return nil } // fetchFeedback populates and reads fbCache for entity. from bounds the // Query's since parameter (Query has no upper bound; we filter per-decision // inside matchFeedback). func (d *DefaultLogReplayer) fetchFeedback(ctx context.Context, entity string, from time.Time, fbCache map[string][]Feedback) ([]Feedback, error) { if fbs, ok := fbCache[entity]; ok { return fbs, nil } fbs, err := d.feedback.Query(ctx, entity, from, "") if err != nil { return nil, err } fbCache[entity] = fbs return fbs, nil } // dispatch fans an event to every reflector. Errors are logged, never fatal. func (d *DefaultLogReplayer) dispatch(ctx context.Context, reflectors []Reflector, event ReplayEvent) { for _, rf := range reflectors { if err := rf.OnEvent(ctx, event); err != nil { d.logger("evolve: reflector error on %s: %v", event.Log.DecisionID, err) } } } // matchFeedback returns first-touch feedback per metric within // [decisionTime, decisionTime + window). fbs must be sorted ascending by // Timestamp (FeedbackChannel.Query guarantees this). // // CLEVER: first-touch semantics. If metric "on_time_rate" appears at T+1h // and again at T+2h, only the T+1h record pairs with the decision. // Rationale: reflectors want "what happened right after this decision", // not a summary; later updates are separate events that will pair with // later decisions of the same entity. func matchFeedback(fbs []Feedback, decisionTime time.Time, window time.Duration) []Feedback { cutoff := decisionTime.Add(window) seen := make(map[string]bool) var out []Feedback for i := range fbs { if fbs[i].Timestamp.Before(decisionTime) { continue } if !fbs[i].Timestamp.Before(cutoff) { // fbs is sorted; everything after this is out of window too. break } if seen[fbs[i].Metric] { continue } seen[fbs[i].Metric] = true out = append(out, fbs[i]) } return out }