package evolve import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "os" "path/filepath" "sort" "strings" "time" ) // FileLogSource is the local-filesystem implementation of LogSource. // // On-disk layout: // // /2026-04-18.jsonl # one file per UTC calendar day // /2026-04-19.jsonl // // Each line is one LogEntry serialised as JSON. Date-sharded so that // Read(from, to) only opens files covering the requested time window rather // than scanning the whole corpus. // // CLEVER: POSIX O_APPEND guarantees atomicity for writes smaller than // PIPE_BUF (4 KiB on Linux). A single LogEntry line is far below that in the // expected shape, so concurrent writers do not need a mutex and readers will // never observe a half-written line. This is the append-only counterpart to // FileParameterStore's rename-based atomicity. // // LEGACY: the reader does an in-process linear scan. v0.3 MVP log volume is // expected to stay well under 1M lines/day per client, so a full scan is // acceptable. An index-backed implementation belongs in the platform-layer // SQL LogSource; adding an index here would drag in a CGO/boltdb dependency // that CLAUDE.md rule 8 (zero external deps) forbids. // // Append is a method on *FileLogSource, not a member of the LogSource // interface. Different backends (SQL INSERT, object-storage upload) have // wildly different write semantics; forcing Append into the interface would // produce awkward wrappers. Callers who need to write hold a *FileLogSource // directly. type FileLogSource struct { root string } // NewFileLogSource returns a log source rooted at dir. Creates the directory // if missing. func NewFileLogSource(root string) (*FileLogSource, error) { if err := os.MkdirAll(root, 0o755); err != nil { return nil, fmt.Errorf("evolve: create log source root: %w", err) } return &FileLogSource{root: root}, nil } // dayFile returns the JSONL path for the UTC day containing t. func (s *FileLogSource) dayFile(t time.Time) string { return filepath.Join(s.root, t.UTC().Format("2006-01-02")+".jsonl") } // Append writes one LogEntry to the current UTC day file as a JSON line. // If entry.Timestamp is the zero value it is set to time.Now().UTC(); any // non-zero value is coerced to UTC so the day-file selection and the stored // timestamp agree. // // Not part of the LogSource interface. See the type-level comment for the // rationale. func (s *FileLogSource) Append(ctx context.Context, entry LogEntry) error { if err := ctx.Err(); err != nil { return err } if entry.Timestamp.IsZero() { entry.Timestamp = time.Now().UTC() } else { entry.Timestamp = entry.Timestamp.UTC() } data, err := json.Marshal(entry) if err != nil { return fmt.Errorf("evolve: encode log entry: %w", err) } if bytes.ContainsAny(data, "\n\r") { // Defensive: json.Marshal never emits raw newlines, but if a future // encoder swap did, one corrupt line would poison the whole file. return fmt.Errorf("evolve: encoded log entry contains newline") } data = append(data, '\n') if err := os.MkdirAll(s.root, 0o755); err != nil { return err } path := s.dayFile(entry.Timestamp) f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return fmt.Errorf("evolve: open log file: %w", err) } defer f.Close() if _, err := f.Write(data); err != nil { return fmt.Errorf("evolve: append log entry: %w", err) } return nil } // Read implements LogSource. The returned channel has buffer 64. A background // goroutine walks each UTC day file in [from, to], decodes each JSON line, // filters by Timestamp, and sends matching entries downstream. The channel is // closed when the window is exhausted or ctx is canceled. // // Missing day files (no activity that day) are skipped silently. A malformed // line aborts the stream and closes the channel early; the caller sees EOF // and can diagnose via the file's contents. func (s *FileLogSource) Read(ctx context.Context, from, to time.Time) (<-chan LogEntry, error) { if to.Before(from) { return nil, fmt.Errorf("evolve: Read: to (%v) is before from (%v)", to, from) } fromUTC := from.UTC() toUTC := to.UTC() ch := make(chan LogEntry, 64) go func() { defer close(ch) startDay := time.Date(fromUTC.Year(), fromUTC.Month(), fromUTC.Day(), 0, 0, 0, 0, time.UTC) endDay := time.Date(toUTC.Year(), toUTC.Month(), toUTC.Day(), 0, 0, 0, 0, time.UTC) for day := startDay; !day.After(endDay); day = day.Add(24 * time.Hour) { if ctx.Err() != nil { return } if !streamDay(ctx, s.dayFile(day), fromUTC, toUTC, ch) { return } } }() return ch, nil } // streamDay reads one JSONL file line-by-line, pushing entries whose // Timestamp falls in [from, to] onto out. Returns false to abort the whole // Read (ctx canceled or decode error); true to continue with the next day. // A non-existent file is not an error. func streamDay(ctx context.Context, path string, from, to time.Time, out chan<- LogEntry) bool { f, err := os.Open(path) if errors.Is(err, os.ErrNotExist) { return true } if err != nil { return false } defer f.Close() scanner := bufio.NewScanner(f) // CLEVER: 1 MiB max line. A LogEntry Payload is typically small (IDs, // decision metadata) but is user-controlled; lift the 64 KiB default so // reasonably large payloads do not corrupt the whole stream. scanner.Buffer(make([]byte, 64*1024), 1024*1024) for scanner.Scan() { if ctx.Err() != nil { return false } line := scanner.Bytes() if len(bytes.TrimSpace(line)) == 0 { continue } var entry LogEntry if err := json.Unmarshal(line, &entry); err != nil { return false } if entry.Timestamp.Before(from) || entry.Timestamp.After(to) { continue } select { case out <- entry: case <-ctx.Done(): return false } } return scanner.Err() == nil } // Days returns all UTC calendar days for which at least one entry has been // appended, sorted ascending. Helper for tooling (e.g. tail viewers, // housekeeping). Not part of the LogSource interface. func (s *FileLogSource) Days() ([]time.Time, error) { entries, err := os.ReadDir(s.root) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, nil } return nil, err } var days []time.Time for _, e := range entries { if e.IsDir() { continue } name := e.Name() if !strings.HasSuffix(name, ".jsonl") { continue } day, err := time.Parse("2006-01-02", strings.TrimSuffix(name, ".jsonl")) if err != nil { continue } days = append(days, day) } sort.Slice(days, func(i, j int) bool { return days[i].Before(days[j]) }) return days, nil }