package shadowdb import ( "context" "errors" "fmt" "sync" "time" ) // InMemoryOpener is the reference Opener implementation. It holds a // ShadowDB handle and an in-process session registry (map + // sync.Mutex). Sessions are tracked by sessionID; Close / Reap // consult the map, the shadow table rows are the physical state. // // Scope: suitable for tests, dev harnesses, and single-process // demos. Production deployments that need cross-process session // visibility or crash recovery belong in platform-layer Opener // implementations (e.g. a SQL-backed registry table). InMemoryOpener // intentionally keeps the registry in-memory so the reference // implementation has zero storage assumptions beyond the injected // ShadowDB. // // Thread safety: all Opener methods hold the mutex only for the // registry bookkeeping window; SQL executions happen outside the // lock so a slow DB does not stall concurrent Opens. The registry // ensures at most one Open per sessionID. // // InMemoryOpener 是 Opener 的参考实现. 持有 ShadowDB handle 与进程 // 内 session 注册表 (map + sync.Mutex). Session 按 sessionID 索引; // Close / Reap 查 map, 影子表行是物理状态. // // 适用范围: 测试 / dev / 单进程 demo. 生产部署若需跨进程可见性或 // 崩溃恢复, 应在平台层做 Opener 实现 (例如 SQL 注册表). InMemoryOpener // 刻意把注册表放内存, 参考实现对存储的假设仅限注入的 ShadowDB. // // 线程安全: Opener 方法仅在注册表簿记窗口持锁, SQL 执行在锁外, // 避免慢 DB 阻塞并发 Open. 注册表保证同 sessionID 最多一次 Open. type InMemoryOpener struct { db ShadowDB mu sync.Mutex sessions map[string]*trackedSession // clock is a test seam. Production code uses time.Now().UTC(); // tests inject deterministic clocks to validate Reap thresholds // without real sleeps. // // clock 是测试注入点. 生产走 time.Now().UTC(); 测试注入确定时钟 // 验证 Reap 阈值, 无需真实 sleep. clock func() time.Time } // trackedSession is the per-sessionID registry entry. Kept // unexported: no consumer outside the Opener needs to peek at // registry state. // // trackedSession 是 per-sessionID 注册条目. 不导出: Opener 外部无 // 消费方需窥视注册表. type trackedSession struct { createdAt time.Time shadowTable string onClose func(error) } // NewInMemoryOpener constructs the reference Opener. Panics on // invalid DI (db.DB nil) so startup errors surface immediately // rather than on first Open. // // NewInMemoryOpener 构造参考 Opener. 非法 DI (db.DB 为 nil) panic, // 启动期暴露而非首次 Open 才炸. func NewInMemoryOpener(db ShadowDB) *InMemoryOpener { if db.DB == nil { panic("shadowdb.NewInMemoryOpener: db.DB must not be nil") } return &InMemoryOpener{ db: db, sessions: make(map[string]*trackedSession), clock: func() time.Time { return time.Now().UTC() }, } } // Open implements Opener.Open. // // Seeding SQL when opts.SeedFromTable is non-empty: // // INSERT INTO {ShadowTable} SELECT ?, * FROM {SeedFromTable} // // Contract: {ShadowTable} column order MUST be // (session_id, ...same as SeedFromTable columns in order). Caller // pre-provisions the shadow table that way; a schema mismatch // surfaces as a driver error from this INSERT, which is returned // verbatim (not wrapped in a shadowdb sentinel) so the caller sees // the exact driver message. // // Identifier validation happens before registry mutation so a // rejected Options leaves the registry untouched. // // Open 实现 Opener.Open. // // opts.SeedFromTable 非空时的种子 SQL: // // INSERT INTO {ShadowTable} SELECT ?, * FROM {SeedFromTable} // // 契约: {ShadowTable} 的列顺序必须是 (session_id, ...SeedFromTable // 同序列). 调用方预建影子表时即按此排布; schema 不匹配以 driver // 错误在本 INSERT 暴露, 原样返回 (不包 shadowdb 哨兵), 调用方可见 // 原始 driver 信息. // // 标识符校验在注册表变更前完成, Options 被拒时注册表状态不变. func (o *InMemoryOpener) Open(ctx context.Context, sessionID string, opts Options) (*Session, error) { if sessionID == "" { return nil, ErrEmptySessionID } if err := ValidateIdentifier(opts.ShadowTable); err != nil { return nil, fmt.Errorf("shadowdb: ShadowTable %q: %w", opts.ShadowTable, err) } if opts.SeedFromTable != "" { if err := ValidateIdentifier(opts.SeedFromTable); err != nil { return nil, fmt.Errorf("shadowdb: SeedFromTable %q: %w", opts.SeedFromTable, err) } } o.mu.Lock() if _, exists := o.sessions[sessionID]; exists { o.mu.Unlock() return nil, ErrDuplicateSession } now := o.clock() o.sessions[sessionID] = &trackedSession{ createdAt: now, shadowTable: opts.ShadowTable, onClose: opts.OnClose, } o.mu.Unlock() if opts.SeedFromTable != "" { seedSQL := fmt.Sprintf("INSERT INTO %s SELECT ?, * FROM %s", opts.ShadowTable, opts.SeedFromTable) if _, err := o.db.ExecContext(ctx, seedSQL, sessionID); err != nil { o.mu.Lock() delete(o.sessions, sessionID) o.mu.Unlock() return nil, fmt.Errorf("shadowdb: seed %s from %s: %w", opts.ShadowTable, opts.SeedFromTable, err) } } return &Session{ ID: sessionID, CreatedAt: now, ShadowTable: opts.ShadowTable, DB: o.db, close: o.makeCloser(sessionID), }, nil } // makeCloser builds the per-session Close closure captured on the // returned *Session. The closure deletes the session's rows and // deregisters it. If DELETE fails the registry entry is kept so // Reap or a retry Close can retry the delete; OnClose fires with // the driver error for platform alerting. // // makeCloser 构造挂在返回 *Session 上的 per-session Close 闭包. // 闭包删除 session 行并注销. DELETE 失败时注册表条目保留, Reap // 或重试 Close 可再试 delete; OnClose 以 driver 错误触发供平台告警. func (o *InMemoryOpener) makeCloser(sessionID string) func(context.Context) error { return func(ctx context.Context) error { o.mu.Lock() ts, exists := o.sessions[sessionID] if !exists { o.mu.Unlock() return ErrSessionNotFound } shadowTable := ts.shadowTable onClose := ts.onClose o.mu.Unlock() deleteSQL := fmt.Sprintf("DELETE FROM %s WHERE session_id=?", shadowTable) if _, err := o.db.ExecContext(ctx, deleteSQL, sessionID); err != nil { if onClose != nil { onClose(err) } return fmt.Errorf("shadowdb: close session %q: %w", sessionID, err) } o.mu.Lock() delete(o.sessions, sessionID) o.mu.Unlock() return nil } } // Reap implements Opener.Reap. Walks the registry, collects // sessions whose createdAt is at or before now-olderThan, and // closes them one by one. Per-session errors accumulate in // ReapResult.Errors; Reap returns a top-level error only when the // context is cancelled mid-sweep (so callers see a clean abort // signal). ErrSessionNotFound from the close closure is benign // (Close raced with Reap) and is not recorded. // // Reap 实现 Opener.Reap. 遍历注册表, 收集 createdAt 早于或等于 // now-olderThan 的 session 并逐个关闭. per-session 错误累积到 // ReapResult.Errors; Reap 仅在 context 取消时返回顶层错误 (干净 // 中断信号). close 闭包的 ErrSessionNotFound 是良性 (Close 与 // Reap 赛跑), 不记录. func (o *InMemoryOpener) Reap(ctx context.Context, olderThan time.Duration) (ReapResult, error) { threshold := o.clock().Add(-olderThan) o.mu.Lock() victims := make([]string, 0, len(o.sessions)) for id, ts := range o.sessions { if !ts.createdAt.After(threshold) { victims = append(victims, id) } } o.mu.Unlock() var result ReapResult for _, id := range victims { if err := ctx.Err(); err != nil { result.Errors = append(result.Errors, fmt.Errorf("shadowdb: reap aborted before session %q: %w", id, err)) return result, err } closer := o.makeCloser(id) err := closer(ctx) if err == nil { result.Swept++ continue } if errors.Is(err, ErrSessionNotFound) { continue } result.Errors = append(result.Errors, err) } return result, nil } // Len reports the number of currently-open sessions. Intended for // test assertions and small-scale observability; platform-layer // opener implementations may add richer List / Stats methods. // // Len 报告当前打开的 session 数. 给测试断言与小规模观测用; 平台层 // opener 实现可追加更丰富的 List / Stats 方法. func (o *InMemoryOpener) Len() int { o.mu.Lock() defer o.mu.Unlock() return len(o.sessions) }