package evolve import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "net/url" "os" "path/filepath" "sort" "strings" "time" ) // FileFeedbackChannel is the local-filesystem implementation of // FeedbackChannel. // // On-disk layout: // // //2026-04-18.jsonl // //2026-04-19.jsonl // // Each line is one Feedback record serialised as JSON. Two-level sharding // (entity first, then UTC day) matches the Query access pattern: Query is // always scoped to a single entity and a time window, so we never open files // belonging to other entities. // // Entity is escaped with url.PathEscape (same convention as // FileParameterStore keys), so entity values containing slashes / dots / // whitespace / CJK characters map to a single safe directory name. // // Concurrency: see the type-level comment on FileLogSource; the same POSIX // O_APPEND atomicity argument applies here. Writers do not take a lock; // readers never observe a partial line. // // LEGACY: Query does an in-process linear scan over the matching files. // Platform-layer SQL FeedbackChannel uses indexed time + metric queries; for // v0.3 MVP volumes (client per day < ~10k feedback events) the scan is fast // enough and avoids an external dependency. type FileFeedbackChannel struct { root string } // NewFileFeedbackChannel returns a feedback channel rooted at dir. Creates // the directory if missing. func NewFileFeedbackChannel(root string) (*FileFeedbackChannel, error) { if err := os.MkdirAll(root, 0o755); err != nil { return nil, fmt.Errorf("evolve: create feedback root: %w", err) } return &FileFeedbackChannel{root: root}, nil } // entityDir returns the directory holding all daily files for entity. func (c *FileFeedbackChannel) entityDir(entity string) string { return filepath.Join(c.root, url.PathEscape(entity)) } // Report implements FeedbackChannel. Appends one Feedback record to the // current UTC day file under the entity's directory. Timestamp is stamped // with time.Now().UTC() so the chosen file and the stored timestamp agree; // callers cannot override this (if backdating is needed it belongs in // Meta, not the canonical Timestamp). func (c *FileFeedbackChannel) Report(ctx context.Context, entity, metric string, value, confidence float64, meta map[string]any) error { if err := ctx.Err(); err != nil { return err } if entity == "" { return ErrEntityRequired } if metric == "" { return ErrMetricRequired } fb := Feedback{ Timestamp: time.Now().UTC(), Entity: entity, Metric: metric, Value: value, Confidence: confidence, Meta: meta, } data, err := json.Marshal(fb) if err != nil { return fmt.Errorf("evolve: encode feedback: %w", err) } if bytes.ContainsAny(data, "\n\r") { return fmt.Errorf("evolve: encoded feedback contains newline") } data = append(data, '\n') dir := c.entityDir(entity) if err := os.MkdirAll(dir, 0o755); err != nil { return err } path := filepath.Join(dir, fb.Timestamp.Format("2006-01-02")+".jsonl") f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return fmt.Errorf("evolve: open feedback file: %w", err) } defer f.Close() if _, err := f.Write(data); err != nil { return fmt.Errorf("evolve: append feedback: %w", err) } return nil } // Query implements FeedbackChannel. Returns all records for the entity whose // Timestamp >= since. Empty metric matches every metric; a non-empty metric // is matched exactly. Results are sorted by Timestamp ascending. // // An unknown entity (directory does not exist) is not an error: new entities // legitimately have no history. Empty string entity is a caller error and // returns ErrEntityRequired. func (c *FileFeedbackChannel) Query(ctx context.Context, entity string, since time.Time, metric string) ([]Feedback, error) { if entity == "" { return nil, ErrEntityRequired } dir := c.entityDir(entity) entries, err := os.ReadDir(dir) if errors.Is(err, os.ErrNotExist) { return []Feedback{}, nil } if err != nil { return nil, err } sinceUTC := since.UTC() sinceDay := time.Date(sinceUTC.Year(), sinceUTC.Month(), sinceUTC.Day(), 0, 0, 0, 0, time.UTC) var dayFiles []string for _, e := range entries { if e.IsDir() { continue } name := e.Name() if !strings.HasSuffix(name, ".jsonl") { continue } day, err := time.Parse("2006-01-02", strings.TrimSuffix(name, ".jsonl")) if err != nil { continue } if day.Before(sinceDay) { continue } dayFiles = append(dayFiles, name) } // Lexicographic sort on YYYY-MM-DD matches chronological order. sort.Strings(dayFiles) var out []Feedback for _, name := range dayFiles { if err := ctx.Err(); err != nil { return nil, err } path := filepath.Join(dir, name) records, err := scanFeedbackFile(path, sinceUTC, metric) if err != nil { return nil, err } out = append(out, records...) } // Belt-and-suspenders: append-only writes are near-monotonic but clock // skew or batched backfills could produce out-of-order lines within a // file. A final sort guarantees the interface contract. sort.SliceStable(out, func(i, j int) bool { return out[i].Timestamp.Before(out[j].Timestamp) }) return out, nil } // scanFeedbackFile reads one JSONL file line-by-line and returns records // satisfying the filter (Timestamp >= since, and metric match when non-empty). func scanFeedbackFile(path string, since time.Time, metric string) ([]Feedback, error) { f, err := os.Open(path) if err != nil { return nil, err } defer f.Close() scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 64*1024), 1024*1024) var out []Feedback for scanner.Scan() { line := scanner.Bytes() if len(bytes.TrimSpace(line)) == 0 { continue } var fb Feedback if err := json.Unmarshal(line, &fb); err != nil { return nil, fmt.Errorf("evolve: decode feedback in %s: %w", filepath.Base(path), err) } if fb.Timestamp.Before(since) { continue } if metric != "" && fb.Metric != metric { continue } out = append(out, fb) } if err := scanner.Err(); err != nil { return nil, err } return out, nil }