package evolve import ( "context" "encoding/json" "errors" "os" "path/filepath" "sync" "testing" "time" ) func newLogSource(t *testing.T) *FileLogSource { t.Helper() s, err := NewFileLogSource(t.TempDir()) if err != nil { t.Fatalf("NewFileLogSource: %v", err) } return s } func drain(t *testing.T, ch <-chan LogEntry) []LogEntry { t.Helper() var out []LogEntry for e := range ch { out = append(out, e) } return out } func TestFileLogSource_AppendThenRead(t *testing.T) { s := newLogSource(t) ctx := context.Background() base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) want := []LogEntry{ {Timestamp: base, DecisionID: "d1", Entity: "carrier-A", Payload: map[string]any{"score": 0.8}}, {Timestamp: base.Add(time.Hour), DecisionID: "d2", Entity: "carrier-B", Payload: map[string]any{"score": 0.6}}, {Timestamp: base.Add(2 * time.Hour), DecisionID: "d3", Entity: "carrier-A", Payload: map[string]any{"score": 0.9}}, } for _, e := range want { if err := s.Append(ctx, e); err != nil { t.Fatalf("Append: %v", err) } } ch, err := s.Read(ctx, base.Add(-time.Second), base.Add(3*time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != len(want) { t.Fatalf("count: want %d, got %d", len(want), len(got)) } for i := range want { if got[i].DecisionID != want[i].DecisionID { t.Errorf("entry %d: DecisionID want %q got %q", i, want[i].DecisionID, got[i].DecisionID) } if got[i].Entity != want[i].Entity { t.Errorf("entry %d: Entity want %q got %q", i, want[i].Entity, got[i].Entity) } if !got[i].Timestamp.Equal(want[i].Timestamp) { t.Errorf("entry %d: Timestamp want %v got %v", i, want[i].Timestamp, got[i].Timestamp) } } } func TestFileLogSource_ReadTimeWindow(t *testing.T) { s := newLogSource(t) ctx := context.Background() base := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC) for i := 0; i < 10; i++ { e := LogEntry{Timestamp: base.Add(time.Duration(i) * time.Hour), DecisionID: "d"} if err := s.Append(ctx, e); err != nil { t.Fatalf("Append %d: %v", i, err) } } ch, err := s.Read(ctx, base.Add(3*time.Hour), base.Add(6*time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != 4 { t.Fatalf("window [3h, 6h] inclusive: want 4, got %d", len(got)) } } func TestFileLogSource_ReadCrossDay(t *testing.T) { s := newLogSource(t) ctx := context.Background() day1 := time.Date(2026, 4, 17, 23, 30, 0, 0, time.UTC) day2 := time.Date(2026, 4, 18, 0, 30, 0, 0, time.UTC) day3 := time.Date(2026, 4, 19, 1, 0, 0, 0, time.UTC) for i, ts := range []time.Time{day1, day2, day3} { if err := s.Append(ctx, LogEntry{Timestamp: ts, DecisionID: string(rune('a' + i))}); err != nil { t.Fatalf("Append: %v", err) } } ch, err := s.Read(ctx, day1.Add(-time.Hour), day3.Add(time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != 3 { t.Fatalf("cross-day: want 3, got %d", len(got)) } days, err := s.Days() if err != nil { t.Fatalf("Days: %v", err) } if len(days) != 3 { t.Fatalf("Days count: want 3, got %d (%v)", len(days), days) } } func TestFileLogSource_ReadMissingDay(t *testing.T) { s := newLogSource(t) ctx := context.Background() base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) if err := s.Append(ctx, LogEntry{Timestamp: base, DecisionID: "d"}); err != nil { t.Fatal(err) } // Window spans several days; only the one with data contributes. ch, err := s.Read(ctx, base.Add(-72*time.Hour), base.Add(72*time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != 1 { t.Fatalf("want 1, got %d", len(got)) } } func TestFileLogSource_ReadEmptyRoot(t *testing.T) { s := newLogSource(t) ctx := context.Background() base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) ch, err := s.Read(ctx, base, base.Add(time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != 0 { t.Fatalf("want 0, got %d", len(got)) } } func TestFileLogSource_ReadToBeforeFrom(t *testing.T) { s := newLogSource(t) ctx := context.Background() now := time.Now().UTC() if _, err := s.Read(ctx, now, now.Add(-time.Hour)); err == nil { t.Errorf("Read with to < from: want error, got nil") } } func TestFileLogSource_ReadCtxCancel(t *testing.T) { s := newLogSource(t) base := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC) for i := 0; i < 200; i++ { if err := s.Append(context.Background(), LogEntry{Timestamp: base.Add(time.Duration(i) * time.Minute), DecisionID: "d"}); err != nil { t.Fatalf("Append: %v", err) } } ctx, cancel := context.WithCancel(context.Background()) ch, err := s.Read(ctx, base, base.Add(24*time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } // Drain a few then cancel; channel must close without blocking. <-ch <-ch cancel() // Drain to completion; must terminate. done := make(chan struct{}) go func() { for range ch { } close(done) }() select { case <-done: case <-time.After(2 * time.Second): t.Fatalf("Read did not terminate after ctx cancel") } } func TestFileLogSource_AppendFillsZeroTimestamp(t *testing.T) { s := newLogSource(t) ctx := context.Background() before := time.Now().UTC() if err := s.Append(ctx, LogEntry{DecisionID: "auto"}); err != nil { t.Fatalf("Append: %v", err) } after := time.Now().UTC() ch, err := s.Read(ctx, before.Add(-time.Minute), after.Add(time.Minute)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != 1 { t.Fatalf("want 1, got %d", len(got)) } if got[0].Timestamp.Before(before) || got[0].Timestamp.After(after) { t.Errorf("Timestamp %v outside [%v, %v]", got[0].Timestamp, before, after) } } func TestFileLogSource_AppendCoercesToUTC(t *testing.T) { s := newLogSource(t) ctx := context.Background() loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { t.Skip("tz data unavailable") } shanghai := time.Date(2026, 4, 18, 7, 30, 0, 0, loc) // 2026-04-17 23:30 UTC if err := s.Append(ctx, LogEntry{Timestamp: shanghai, DecisionID: "tz"}); err != nil { t.Fatalf("Append: %v", err) } // The entry's UTC day is 2026-04-17, so file should be named accordingly. if _, err := os.Stat(filepath.Join(s.root, "2026-04-17.jsonl")); err != nil { t.Errorf("expected 2026-04-17.jsonl under %s: %v", s.root, err) } } func TestFileLogSource_AppendConcurrent(t *testing.T) { s := newLogSource(t) ctx := context.Background() const workers = 8 const perWorker = 50 base := time.Date(2026, 4, 18, 12, 0, 0, 0, time.UTC) var wg sync.WaitGroup for w := 0; w < workers; w++ { wg.Add(1) go func(w int) { defer wg.Done() for i := 0; i < perWorker; i++ { e := LogEntry{ Timestamp: base.Add(time.Duration(w*perWorker+i) * time.Millisecond), DecisionID: "d", Entity: "carrier", } if err := s.Append(ctx, e); err != nil { t.Errorf("Append: %v", err) return } } }(w) } wg.Wait() ch, err := s.Read(ctx, base, base.Add(24*time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } got := drain(t, ch) if len(got) != workers*perWorker { t.Errorf("concurrent Append count: want %d, got %d", workers*perWorker, len(got)) } } func TestFileLogSource_MalformedLineAborts(t *testing.T) { s := newLogSource(t) ctx := context.Background() day := time.Date(2026, 4, 18, 0, 0, 0, 0, time.UTC) // Write a valid line then a malformed one. good, _ := json.Marshal(LogEntry{Timestamp: day, DecisionID: "ok"}) path := s.dayFile(day) if err := os.WriteFile(path, append(append(good, '\n'), []byte("not-json\n")...), 0o644); err != nil { t.Fatal(err) } ch, err := s.Read(ctx, day, day.Add(24*time.Hour)) if err != nil { t.Fatalf("Read: %v", err) } // First entry must come through; second line aborts the stream, channel closes. got := drain(t, ch) if len(got) != 1 || got[0].DecisionID != "ok" { t.Errorf("malformed line handling: got %+v", got) } } func TestFileLogSource_NewFileLogSourceCreatesRoot(t *testing.T) { dir := filepath.Join(t.TempDir(), "does", "not", "exist") if _, err := NewFileLogSource(dir); err != nil { t.Fatalf("NewFileLogSource: %v", err) } info, err := os.Stat(dir) if err != nil { t.Fatalf("stat: %v", err) } if !info.IsDir() { t.Errorf("want dir, got %v", info.Mode()) } } func TestFileLogSource_AppendCtxCanceled(t *testing.T) { s := newLogSource(t) ctx, cancel := context.WithCancel(context.Background()) cancel() err := s.Append(ctx, LogEntry{DecisionID: "x"}) if !errors.Is(err, context.Canceled) { t.Errorf("Append ctx canceled: want context.Canceled, got %v", err) } }