// Package cache 提供通用的 TTL 缓存实现. // // TTLCache 是一个泛型的带过期时间的缓存,支持: // - 过期后返回旧值 + 后台异步刷新(stale-while-revalidate 模式) // - Thundering herd 防护(同一 key 只有一个 goroutine 在刷新) // - 缓存 miss 时同步加载(GetOrLoad) // - 手动设置和失效 // // 设计要点: // - 读操作用 RWMutex,高并发读性能好 // - 后台刷新用 goroutine,不阻塞读请求 // - 刷新中标记防止多个 goroutine 同时刷新同一 key package cache import ( "sync" "time" ) // TTLCache 是一个泛型的带 TTL 的缓存. // K 是键类型(必须 comparable),V 是值类型. type TTLCache[K comparable, V any] struct { mu sync.RWMutex items map[K]*cacheEntry[V] ttl time.Duration loader func(K) (V, error) // 缓存 miss 时的加载函数 // 精妙之处(CLEVER): 可替换时间源--测试中可注入假时间,不需要 sleep 就能测试过期逻辑. // 比 monkey-patching time.Now 更优雅,且不影响其他 goroutine. now func() time.Time // 后台清理 stopCh chan struct{} stopOnce sync.Once // 防止 Close() 重复调用 panic(close of closed channel) } // cacheEntry 是缓存中的一个条目. type cacheEntry[V any] struct { value V expireAt time.Time refreshing bool // 是否正在后台刷新(thundering herd 防护) } // NewTTLCache 创建一个新的 TTL 缓存. // ttl 是条目的生存时间. // loader 是缓存 miss 时的加载函数(可以为 nil,但 GetOrLoad 会返回错误). func NewTTLCache[K comparable, V any](ttl time.Duration, loader func(K) (V, error)) *TTLCache[K, V] { c := &TTLCache[K, V]{ items: make(map[K]*cacheEntry[V]), ttl: ttl, loader: loader, now: time.Now, stopCh: make(chan struct{}), } // 启动后台清理 goroutine: // 升华改进(ELEVATED): 间隔设为 max(ttl*2, 1s)-- // 若 interval == ttl,sweep 会在条目 TTL 到期时立即删除, // 干扰 stale-while-revalidate(Get 在 sweep 之后才看到过期条目). // 2x TTL 保证了:即使条目刚过期,sweep 也不会立刻删除它, // 给 Get 的 stale-while-revalidate 留出窗口. // 上限 5 分钟防止 TTL 很大的场景下等待过久. interval := ttl * 2 if interval <= 0 { interval = 5 * time.Minute } else if interval < time.Second { interval = time.Second } else if interval > 5*time.Minute { interval = 5 * time.Minute } go c.sweepExpired(interval) return c } // Get 获取缓存值. // 如果 key 存在且未过期,返回 (value, true). // 如果 key 存在但已过期,返回旧值 (value, true) 并触发后台刷新. // 如果 key 不存在,返回 (zero, false). func (c *TTLCache[K, V]) Get(key K) (V, bool) { c.mu.RLock() entry, exists := c.items[key] if !exists { c.mu.RUnlock() var zero V return zero, false } value := entry.value expired := c.now().After(entry.expireAt) refreshing := entry.refreshing c.mu.RUnlock() // 如果已过期且没有人在刷新,触发后台刷新 if expired && !refreshing && c.loader != nil { c.triggerBackgroundRefresh(key) } // 精妙之处(CLEVER): stale-while-revalidate 模式--过期后仍返回旧值,同时在后台异步刷新. // 这确保了读取永不阻塞,即使 loader 耗时较长也不会影响调用方的响应时间. // 这是 HTTP 缓存中 stale-while-revalidate 指令在内存缓存中的对应实现. return value, true } // GetOrLoad 获取缓存值,如果 miss 则同步加载. // 与 Get 不同,这个方法在缓存 miss 时会阻塞等待加载完成. // 如果 loader 为 nil,缓存 miss 时返回错误. func (c *TTLCache[K, V]) GetOrLoad(key K) (V, error) { // 先尝试读取 c.mu.RLock() entry, exists := c.items[key] if exists { value := entry.value expired := c.now().After(entry.expireAt) refreshing := entry.refreshing c.mu.RUnlock() if expired && !refreshing && c.loader != nil { c.triggerBackgroundRefresh(key) } return value, nil } c.mu.RUnlock() // 缓存 miss,同步加载 if c.loader == nil { var zero V return zero, ErrNoLoader } value, err := c.loader(key) if err != nil { var zero V return zero, err } c.Set(key, value) return value, nil } // Set 手动设置缓存条目. func (c *TTLCache[K, V]) Set(key K, value V) { c.mu.Lock() defer c.mu.Unlock() c.items[key] = &cacheEntry[V]{ value: value, expireAt: c.now().Add(c.ttl), } } // Invalidate 使指定 key 的条目失效. // 下次 Get 会触发后台刷新(如果有 loader),GetOrLoad 会同步加载. func (c *TTLCache[K, V]) Invalidate(key K) { c.mu.Lock() defer c.mu.Unlock() delete(c.items, key) } // Peek 读取缓存值,不触发后台刷新. // 纯观察性操作,适合监控和调试. func (c *TTLCache[K, V]) Peek(key K) (V, bool) { c.mu.RLock() defer c.mu.RUnlock() entry, exists := c.items[key] if !exists { var zero V return zero, false } return entry.value, true } // Close 停止后台清理 goroutine 并释放资源. // // 精妙之处(CLEVER): sync.Once 保护 close(stopCh)-- // 重复调用 Close()(如 defer + 显式调用)会 panic "close of closed channel". // Once 使 Close() 幂等,调用方无需追踪是否已关闭. func (c *TTLCache[K, V]) Close() { c.stopOnce.Do(func() { close(c.stopCh) }) } // sweepExpired 后台定期清理过期条目. // // 升华改进(ELEVATED): 早期方案 TTLCache 无后台清理--过期条目永久在 map 累积, // 在高频缓存场景下(如聊天消息缓存)造成内存泄漏. // 后台 goroutine 定期清理,既保证过期数据不会无限累积, // 又通过 interval 下限(min(ttl, 5min))避免过度清理带来的 CPU 开销. // // 精妙之处(CLEVER): 清理时跳过正在 refresh 的条目-- // stale-while-revalidate 依赖过期条目在 refresh 期间保留旧值. // 若 sweep 在 refresh 完成前删除了条目,调用方会拿到 miss 而非 stale value. func (c *TTLCache[K, V]) sweepExpired(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-c.stopCh: return case <-ticker.C: c.mu.Lock() now := c.now() for key, entry := range c.items { if now.After(entry.expireAt) && !entry.refreshing { delete(c.items, key) } } c.mu.Unlock() } } } // Len 返回缓存中的条目数量(包括已过期但未清理的). func (c *TTLCache[K, V]) Len() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.items) } // Clear 清空缓存. func (c *TTLCache[K, V]) Clear() { c.mu.Lock() defer c.mu.Unlock() c.items = make(map[K]*cacheEntry[V]) } // triggerBackgroundRefresh 触发后台刷新. // 使用 refreshing 标记防止同一 key 被多个 goroutine 同时刷新(thundering herd). func (c *TTLCache[K, V]) triggerBackgroundRefresh(key K) { c.mu.Lock() entry, exists := c.items[key] if !exists || entry.refreshing { c.mu.Unlock() return } entry.refreshing = true c.mu.Unlock() go func() { value, err := c.loader(key) c.mu.Lock() defer c.mu.Unlock() if err != nil { // 刷新失败,清除 refreshing 标记,下次 Get 可以重试 if e, ok := c.items[key]; ok { e.refreshing = false } return } // 刷新成功,更新条目 c.items[key] = &cacheEntry[V]{ value: value, expireAt: c.now().Add(c.ttl), } }() } // errNoLoader 是 loader 为 nil 时 GetOrLoad 返回的错误. type errNoLoader struct{} func (e errNoLoader) Error() string { return "cache: no loader configured, cannot load missing key" } // ErrNoLoader 是 loader 为 nil 时 GetOrLoad 返回的哨兵错误. var ErrNoLoader error = errNoLoader{}