package evolve import ( "context" "errors" "sort" "sync" "time" ) // AggregatedStats summarises a stream of feedback events for one // entity+metric pair. Returned by AggregatorReflector.Stats() as a pull-form // snapshot (value copy, safe for external consumers to mutate). // // Sum sits in scan-baseline as dead because no in-tree code reads it after // AggregatorReflector writes it -- the consumer is an external watch-only // panel / operator dashboard that calls Stats() and renders rolling totals. // test lock is in reflector_impls_test.go. // // AggregatedStats 汇总单条 (entity, metric) 的反馈事件流. 由 // AggregatorReflector.Stats() 以 pull 形态返回快照 (值拷贝, 外部消费方自由 // 修改不污染内部状态). // // Sum 在 scan-baseline 里标 dead 是因为 core 内无 reader -- 消费者是外部 // watch-only 面板 / 运营 dashboard, 调 Stats() 取滚动累加展示. test 锁在 // reflector_impls_test.go. type AggregatedStats struct { Count int // events whose Feedback matched this (entity, metric) Sum float64 // sum of Feedback.Value Mean float64 // Sum / Count; 0 when Count == 0 Min, Max float64 // bounds of observed Feedback.Value; 0 when Count == 0 PendingCount int // events whose Feedback was nil for this entity LastTimestamp time.Time // most recent event timestamp (decision or feedback) } // AggregatorReflector is the zero-ML reference Reflector: it consumes // ReplayEvent stream and maintains rolling descriptive statistics keyed by // (entity, metric). Stats are read-only queryable; the reflector does not // push back into ParameterEvolver or ParameterStore. // // Why this is a useful ref impl even without ML: // A staged rollout of evolve typically runs a "watch-only" phase where the // operator just wants to see "what would the reflector see?" A stats view // is a cheap way to get there and doubles as the input signal for an // eventual ML reflector. The interface contract for Reflector makes no // assumption that reflection has to trigger writes. // // Concurrency: // - OnEvent acquires a write lock for the duration of one update. Contention // is minor under typical replay rates (hundreds of events/sec). // - Stats / Entities / Metrics / Reset take RLock / Lock as needed. // - All exported reads return copies; internal maps are never leaked. type AggregatorReflector struct { mu sync.RWMutex stats map[entityMetricKey]*statsCell } type entityMetricKey struct { Entity string Metric string } type statsCell struct { count int sum, min, max float64 pending int lastTs time.Time } // NewAggregatorReflector returns an empty reflector. func NewAggregatorReflector() *AggregatorReflector { return &AggregatorReflector{ stats: make(map[entityMetricKey]*statsCell), } } // OnEvent implements Reflector. // // Event classification: // - event.Feedback == nil: counted as pending under key (entity, "") so // the operator can see decisions still awaiting feedback. // - event.Feedback != nil: contributes to (entity, metric) count / sum / // min / max / mean / lastTs. PendingCount of the matched cell is also // decremented up to zero (first-touch pairing "resolved" the earlier // pending event), but never below zero -- feedback arriving before any // pending event is legal (e.g. external feedback ingest). func (r *AggregatorReflector) OnEvent(ctx context.Context, event ReplayEvent) error { if err := ctx.Err(); err != nil { return err } entity := event.Log.Entity eventTs := event.Log.Timestamp r.mu.Lock() defer r.mu.Unlock() if event.Feedback == nil { pendingKey := entityMetricKey{Entity: entity, Metric: ""} cell := r.stats[pendingKey] if cell == nil { cell = &statsCell{} r.stats[pendingKey] = cell } cell.pending++ cell.lastTs = maxTime(cell.lastTs, eventTs) return nil } fb := event.Feedback // Decrement the pending counter for this entity (cap at 0). The event's // feedback has arrived; one fewer outstanding decision. pendingKey := entityMetricKey{Entity: entity, Metric: ""} if pcell := r.stats[pendingKey]; pcell != nil && pcell.pending > 0 { pcell.pending-- } key := entityMetricKey{Entity: entity, Metric: fb.Metric} cell := r.stats[key] if cell == nil { cell = &statsCell{min: fb.Value, max: fb.Value} r.stats[key] = cell } cell.count++ cell.sum += fb.Value if fb.Value < cell.min { cell.min = fb.Value } if fb.Value > cell.max { cell.max = fb.Value } cell.lastTs = maxTime(cell.lastTs, maxTime(eventTs, fb.Timestamp)) return nil } // Stats returns a snapshot of (entity, metric) statistics. Unknown pair // returns the zero AggregatedStats (all fields zero) with no error. // metric="" returns the pending-only view for entity (Count=0, Sum=0, // PendingCount filled). func (r *AggregatorReflector) Stats(entity, metric string) AggregatedStats { r.mu.RLock() defer r.mu.RUnlock() key := entityMetricKey{Entity: entity, Metric: metric} cell := r.stats[key] var pendingCell *statsCell if metric != "" { pendingCell = r.stats[entityMetricKey{Entity: entity, Metric: ""}] } var out AggregatedStats if cell != nil { out.Count = cell.count out.Sum = cell.sum out.Min = cell.min out.Max = cell.max if cell.count > 0 { out.Mean = cell.sum / float64(cell.count) } out.LastTimestamp = cell.lastTs if metric == "" { out.PendingCount = cell.pending } } if pendingCell != nil { out.PendingCount = pendingCell.pending if pendingCell.lastTs.After(out.LastTimestamp) { out.LastTimestamp = pendingCell.lastTs } } return out } // Entities returns every known entity, sorted. func (r *AggregatorReflector) Entities() []string { r.mu.RLock() defer r.mu.RUnlock() seen := make(map[string]struct{}, len(r.stats)) for k := range r.stats { seen[k.Entity] = struct{}{} } out := make([]string, 0, len(seen)) for e := range seen { out = append(out, e) } sort.Strings(out) return out } // Metrics returns every known metric for the given entity, sorted. The // pending pseudo-metric ("") is excluded. func (r *AggregatorReflector) Metrics(entity string) []string { r.mu.RLock() defer r.mu.RUnlock() var out []string for k := range r.stats { if k.Entity == entity && k.Metric != "" { out = append(out, k.Metric) } } sort.Strings(out) return out } // Reset drops all aggregated state. func (r *AggregatorReflector) Reset() { r.mu.Lock() defer r.mu.Unlock() r.stats = make(map[entityMetricKey]*statsCell) } func maxTime(a, b time.Time) time.Time { if a.After(b) { return a } return b } // ============================================================================ // FuncReflector: escape hatch for caller-supplied OnEvent logic // ============================================================================ // FuncReflector wraps a plain function as a Reflector. Use this when the // reflector needs bespoke behaviour (direct Apply via ParameterEvolver, // webhook push, custom aggregation) that does not fit AggregatorReflector's // descriptive-stats shape. type FuncReflector struct { fn func(ctx context.Context, event ReplayEvent) error } // NewFuncReflector wraps fn. fn must be non-nil. func NewFuncReflector(fn func(ctx context.Context, event ReplayEvent) error) (*FuncReflector, error) { if fn == nil { return nil, errors.New("evolve: NewFuncReflector requires non-nil fn") } return &FuncReflector{fn: fn}, nil } // OnEvent implements Reflector. ctx cancellation short-circuits before the // wrapped fn runs. func (r *FuncReflector) OnEvent(ctx context.Context, event ReplayEvent) error { if err := ctx.Err(); err != nil { return err } return r.fn(ctx, event) }