package shadowdb_test import ( "context" "database/sql" "errors" "sync" "sync/atomic" "testing" "time" _ "modernc.org/sqlite" // test-only driver; prod build excludes. "git.flytoex.net/yuanwei/flyto-agent/pkg/shadowdb" ) // newTestDB opens an in-memory sqlite DB and creates: // - inventory (sku TEXT PRIMARY KEY, qty INTEGER) seeded with two rows // - shadow_inventory (session_id TEXT, sku TEXT, qty INTEGER) empty // // The column orders match the seeding contract documented on // InMemoryOpener.Open: shadow_inventory = (session_id, ...inventory cols). // // newTestDB 打开内存 sqlite 并建表: // - inventory (sku TEXT PRIMARY KEY, qty INTEGER) 种入 2 行 // - shadow_inventory (session_id TEXT, sku TEXT, qty INTEGER) 空 // // 列顺序对齐 InMemoryOpener.Open 的种子契约: shadow_inventory = // (session_id, ...inventory 列). func newTestDB(t *testing.T) shadowdb.ShadowDB { t.Helper() db, err := sql.Open("sqlite", ":memory:") if err != nil { t.Fatalf("open sqlite: %v", err) } // in-memory sqlite does not support concurrent writers; serialize // through a single connection so tests validate shadowdb's own // bookkeeping races rather than sqlite driver internals. // // in-memory sqlite 不支持并发写, 强制单连接串行, 让测试验证 // shadowdb 自身的 bookkeeping 竞态, 不测 sqlite driver 内部. db.SetMaxOpenConns(1) t.Cleanup(func() { _ = db.Close() }) ddl := []string{ `CREATE TABLE inventory (sku TEXT PRIMARY KEY, qty INTEGER)`, `INSERT INTO inventory VALUES ('SKU-1', 100), ('SKU-2', 50)`, `CREATE TABLE shadow_inventory (session_id TEXT, sku TEXT, qty INTEGER)`, } for _, stmt := range ddl { if _, err := db.Exec(stmt); err != nil { t.Fatalf("ddl %q: %v", stmt, err) } } return shadowdb.ShadowDB{DB: db} } // countShadow reads all rows from shadow_inventory for a given // sessionID. Used by tests to verify seed / close semantics. // // countShadow 读指定 sessionID 的 shadow_inventory 所有行. 测试用来 // 验证 seed / close 语义. func countShadow(t *testing.T, db shadowdb.ShadowDB, sessionID string) int { t.Helper() var n int err := db.QueryRow("SELECT COUNT(*) FROM shadow_inventory WHERE session_id=?", sessionID).Scan(&n) if err != nil { t.Fatalf("count shadow: %v", err) } return n } func TestOpen_SeedsShadowFromSource(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) sess, err := opener.Open(context.Background(), "sess-1", shadowdb.Options{ ShadowTable: "shadow_inventory", SeedFromTable: "inventory", }) if err != nil { t.Fatalf("Open: %v", err) } if sess.ID != "sess-1" { t.Errorf("Session.ID = %q, want %q", sess.ID, "sess-1") } if sess.ShadowTable != "shadow_inventory" { t.Errorf("Session.ShadowTable = %q", sess.ShadowTable) } if got := countShadow(t, db, "sess-1"); got != 2 { t.Errorf("seeded rows for sess-1 = %d, want 2", got) } } func TestOpen_NoSeedStartsEmpty(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) sess, err := opener.Open(context.Background(), "sess-x", shadowdb.Options{ ShadowTable: "shadow_inventory", }) if err != nil { t.Fatalf("Open: %v", err) } if got := countShadow(t, db, sess.ID); got != 0 { t.Errorf("unseeded shadow should be empty, got %d rows", got) } } func TestOpen_ReadYourWrites(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() sess, err := opener.Open(ctx, "sess-rw", shadowdb.Options{ ShadowTable: "shadow_inventory", SeedFromTable: "inventory", }) if err != nil { t.Fatalf("Open: %v", err) } // Write through the session DB, read it back. if _, err := sess.DB.ExecContext(ctx, "UPDATE shadow_inventory SET qty=qty-10 WHERE sku=? AND session_id=?", "SKU-1", sess.ID); err != nil { t.Fatalf("update: %v", err) } var qty int err = sess.DB.QueryRowContext(ctx, "SELECT qty FROM shadow_inventory WHERE sku=? AND session_id=?", "SKU-1", sess.ID).Scan(&qty) if err != nil { t.Fatalf("select: %v", err) } if qty != 90 { t.Errorf("after -10 update, qty = %d, want 90", qty) } // Production inventory MUST be unchanged. var prodQty int if err := db.QueryRow("SELECT qty FROM inventory WHERE sku='SKU-1'").Scan(&prodQty); err != nil { t.Fatalf("read prod: %v", err) } if prodQty != 100 { t.Errorf("production inventory contaminated: qty=%d, want 100", prodQty) } } func TestOpen_ConcurrentSessionsDoNotCrossContaminate(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() sessA, err := opener.Open(ctx, "A", shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}) if err != nil { t.Fatalf("open A: %v", err) } sessB, err := opener.Open(ctx, "B", shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}) if err != nil { t.Fatalf("open B: %v", err) } // A drains SKU-1 to 0; B decrements by 5. if _, err := sessA.DB.ExecContext(ctx, "UPDATE shadow_inventory SET qty=0 WHERE sku='SKU-1' AND session_id=?", sessA.ID); err != nil { t.Fatalf("A update: %v", err) } if _, err := sessB.DB.ExecContext(ctx, "UPDATE shadow_inventory SET qty=qty-5 WHERE sku='SKU-1' AND session_id=?", sessB.ID); err != nil { t.Fatalf("B update: %v", err) } // A sees 0, B sees 95 -- no cross-talk. var qtyA, qtyB int if err := sessA.DB.QueryRowContext(ctx, "SELECT qty FROM shadow_inventory WHERE sku='SKU-1' AND session_id=?", sessA.ID).Scan(&qtyA); err != nil { t.Fatalf("A read: %v", err) } if err := sessB.DB.QueryRowContext(ctx, "SELECT qty FROM shadow_inventory WHERE sku='SKU-1' AND session_id=?", sessB.ID).Scan(&qtyB); err != nil { t.Fatalf("B read: %v", err) } if qtyA != 0 { t.Errorf("A view of SKU-1 = %d, want 0", qtyA) } if qtyB != 95 { t.Errorf("B view of SKU-1 = %d, want 95", qtyB) } } func TestOpen_EmptySessionIDRejected(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) _, err := opener.Open(context.Background(), "", shadowdb.Options{ShadowTable: "shadow_inventory"}) if !errors.Is(err, shadowdb.ErrEmptySessionID) { t.Fatalf("got %v, want ErrEmptySessionID", err) } } func TestOpen_InvalidShadowTableRejected(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) _, err := opener.Open(context.Background(), "s", shadowdb.Options{ShadowTable: "bad table"}) if !errors.Is(err, shadowdb.ErrInvalidIdentifier) { t.Fatalf("got %v, want ErrInvalidIdentifier", err) } } func TestOpen_InvalidSeedIdentifierRejected(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) _, err := opener.Open(context.Background(), "s", shadowdb.Options{ ShadowTable: "shadow_inventory", SeedFromTable: `"bad";DROP`, }) if !errors.Is(err, shadowdb.ErrInvalidIdentifier) { t.Fatalf("got %v, want ErrInvalidIdentifier", err) } } func TestOpen_DuplicateSessionRejected(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() if _, err := opener.Open(ctx, "dup", shadowdb.Options{ShadowTable: "shadow_inventory"}); err != nil { t.Fatalf("first Open: %v", err) } _, err := opener.Open(ctx, "dup", shadowdb.Options{ShadowTable: "shadow_inventory"}) if !errors.Is(err, shadowdb.ErrDuplicateSession) { t.Fatalf("got %v, want ErrDuplicateSession", err) } } func TestOpen_FailedSeedRollsBackRegistration(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) // nonexistent_src does not exist → seed SQL fails at INSERT. _, err := opener.Open(context.Background(), "rollback-me", shadowdb.Options{ ShadowTable: "shadow_inventory", SeedFromTable: "nonexistent_src", }) if err == nil { t.Fatalf("expected error from missing source table") } if opener.Len() != 0 { t.Errorf("registry should be empty after rollback, Len=%d", opener.Len()) } // Re-opening with same ID must not hit ErrDuplicateSession. if _, err := opener.Open(context.Background(), "rollback-me", shadowdb.Options{ShadowTable: "shadow_inventory"}); err != nil { t.Fatalf("reopen after rollback: %v", err) } } func TestClose_DeletesRowsAndDeregisters(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() sess, err := opener.Open(ctx, "sess-close", shadowdb.Options{ ShadowTable: "shadow_inventory", SeedFromTable: "inventory", }) if err != nil { t.Fatalf("Open: %v", err) } if err := sess.Close(ctx); err != nil { t.Fatalf("Close: %v", err) } if got := countShadow(t, db, sess.ID); got != 0 { t.Errorf("after close, rows for sess-close = %d, want 0", got) } if opener.Len() != 0 { t.Errorf("after close, Len=%d, want 0", opener.Len()) } } func TestClose_IdempotentReturnsSentinelOnSecondCall(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() sess, err := opener.Open(ctx, "sess-idem", shadowdb.Options{ShadowTable: "shadow_inventory"}) if err != nil { t.Fatalf("Open: %v", err) } if err := sess.Close(ctx); err != nil { t.Fatalf("first Close: %v", err) } err = sess.Close(ctx) if !errors.Is(err, shadowdb.ErrSessionNotFound) { t.Fatalf("second Close: got %v, want ErrSessionNotFound", err) } } func TestClose_NilSessionReturnsNil(t *testing.T) { var sess *shadowdb.Session if err := sess.Close(context.Background()); err != nil { t.Errorf("nil Session Close: got %v, want nil", err) } } func TestClose_OtherSessionsUnaffected(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() sessA, _ := opener.Open(ctx, "A", shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}) sessB, _ := opener.Open(ctx, "B", shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}) if err := sessA.Close(ctx); err != nil { t.Fatalf("Close A: %v", err) } if got := countShadow(t, db, sessA.ID); got != 0 { t.Errorf("A rows after close = %d, want 0", got) } if got := countShadow(t, db, sessB.ID); got != 2 { t.Errorf("B rows after A close = %d, want 2 (untouched)", got) } } func TestReap_RemovesOlderThanThreshold(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() // Inject a clock that starts at t0. Open sess-old at t0, advance // to t0+5s, open sess-new, advance to t0+10s, Reap at 7s // threshold -> sess-old (age 10s > 7s) reaped, sess-new (age 5s // < 7s) kept. base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) var now = base shadowdb.SetInMemoryOpenerClockForTest(opener, func() time.Time { return now }) if _, err := opener.Open(ctx, "sess-old", shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}); err != nil { t.Fatalf("Open old: %v", err) } now = base.Add(5 * time.Second) if _, err := opener.Open(ctx, "sess-new", shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}); err != nil { t.Fatalf("Open new: %v", err) } now = base.Add(10 * time.Second) result, err := opener.Reap(ctx, 7*time.Second) if err != nil { t.Fatalf("Reap: %v", err) } if result.Swept != 1 { t.Errorf("Swept = %d, want 1", result.Swept) } if len(result.Errors) != 0 { t.Errorf("Reap errors = %v, want empty", result.Errors) } if opener.Len() != 1 { t.Errorf("after reap Len=%d, want 1", opener.Len()) } if got := countShadow(t, db, "sess-old"); got != 0 { t.Errorf("reaped sess-old rows = %d, want 0", got) } if got := countShadow(t, db, "sess-new"); got != 2 { t.Errorf("kept sess-new rows = %d, want 2", got) } } func TestReap_ZeroOlderThanSweepsAll(t *testing.T) { db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() if _, err := opener.Open(ctx, "a", shadowdb.Options{ShadowTable: "shadow_inventory"}); err != nil { t.Fatalf("open a: %v", err) } if _, err := opener.Open(ctx, "b", shadowdb.Options{ShadowTable: "shadow_inventory"}); err != nil { t.Fatalf("open b: %v", err) } result, err := opener.Reap(ctx, 0) if err != nil { t.Fatalf("Reap: %v", err) } if result.Swept != 2 { t.Errorf("Swept = %d, want 2", result.Swept) } if opener.Len() != 0 { t.Errorf("after reap-all Len=%d, want 0", opener.Len()) } } func TestReap_ConcurrentCloseRace(t *testing.T) { // Open 10 sessions, then fire Reap and explicit Close concurrently. // Expectation: no ErrSessionNotFound leaks into ReapResult.Errors, // total rows cleaned, Len == 0. db := newTestDB(t) opener := shadowdb.NewInMemoryOpener(db) ctx := context.Background() var sessions []*shadowdb.Session for i := range 10 { s, err := opener.Open(ctx, "s-"+string(rune('0'+i)), shadowdb.Options{ShadowTable: "shadow_inventory", SeedFromTable: "inventory"}) if err != nil { t.Fatalf("open %d: %v", i, err) } sessions = append(sessions, s) } var wg sync.WaitGroup wg.Add(2) var reapErrs atomic.Int32 go func() { defer wg.Done() result, err := opener.Reap(ctx, 0) if err != nil { reapErrs.Add(1) } for _, e := range result.Errors { if !errors.Is(e, shadowdb.ErrSessionNotFound) { reapErrs.Add(1) t.Logf("unexpected reap error: %v", e) } } }() go func() { defer wg.Done() for _, s := range sessions { _ = s.Close(ctx) } }() wg.Wait() if reapErrs.Load() != 0 { t.Errorf("unexpected reap errors: %d", reapErrs.Load()) } if opener.Len() != 0 { t.Errorf("after race, Len=%d, want 0", opener.Len()) } } func TestNewInMemoryOpener_NilDBPanics(t *testing.T) { defer func() { if r := recover(); r == nil { t.Error("expected panic on nil DB") } }() shadowdb.NewInMemoryOpener(shadowdb.ShadowDB{}) } func TestValidateIdentifier(t *testing.T) { cases := []struct { in string ok bool }{ {"plain", true}, {"_underscore", true}, {"table1", true}, {"Camel_Case", true}, {"", false}, {"1leading", false}, {"has space", false}, {`"quoted"`, false}, {"`backtick`", false}, {"drop;--", false}, } for _, c := range cases { err := shadowdb.ValidateIdentifier(c.in) gotOK := err == nil if gotOK != c.ok { t.Errorf("ValidateIdentifier(%q) ok=%v, want %v (err=%v)", c.in, gotOK, c.ok, err) } } }