// compact_persist.go -- 断路器状态跨进程持久化. // // 解决的问题: // CompactCircuitBreaker 是纯内存对象.进程重启后失败计数清零, // 导致已知"此会话无法压缩"的状态(如 prompt_too_long)在每次重启后 // 都要再浪费 3 次失败的 API 调用才能重新触发断路器. // 在 daemon(systemd/K8s)崩溃重启场景下尤为严重:崩溃 → 自动重启 → 再失败 3 次 → 崩溃. // // 设计决策: // 1. 接口 + 默认实现分离(FilePersister) // 调用方可注入 Redis 适配器用于多副本共享,不改核心逻辑. // nil persister = 原行为(纯内存),向后兼容. // // 2. per-project 隔离(cwd hash) // 同一台机器跑多个项目时,A 项目的压缩失败不污染 B 项目的断路器状态. // 反向思考:全局单文件更简单,但 A 项目的 prompt_too_long 失败会误判 B 项目正常的压缩. // // 3. TTL = 1 小时 // 防止恶意/意外输入永久锁死压缩功能.超过 TTL 的状态在加载时丢弃,断路器自动恢复. // 比内存层的5分钟时间重置更长(重启成本更高,值得更长的保护期), // 比永久持久化更安全(不会因为一次历史事故永久禁用压缩). // // 4. CompactCircuitBreaker 本身不感知存储 // 断路器保持纯内存计数器.Compressor 负责在状态变化后调用 persister.Save(). // 测试断路器逻辑不需要 mock 文件系统. // // 历史包袱(LEGACY): 早期方案完全没有持久化,失败计数仅在会话内存中. // 早期实现有一处跨进程持久化(OAuth dead token),但用的是全局 config 文件, // 不是通用机制.我们设计了通用接口,可覆盖任意需要持久化断路器状态的场景. package context import ( "crypto/sha256" "encoding/json" "fmt" "os" "path/filepath" "time" ) // breakerStateTTL 是持久化状态的有效期. // 超过此时间的状态在加载时自动丢弃,断路器恢复初始状态. // // 精妙之处(CLEVER): TTL 是防御深度的一部分: // - 内存层:5 分钟时间重置(ShouldAttempt 内检查) // - 持久化层:1 小时 TTL(加载时检查) // // 两层保证最坏情况下 1 小时后恢复,即使进程从未重启. const breakerStateTTL = 1 * time.Hour // BreakerState 是断路器的可序列化状态. // 精妙之处(CLEVER): 只持久化需要跨进程传递的最小状态-- // failures 和 lastFailedAt.maxFailures 是配置,不需要持久化. type BreakerState struct { Failures int `json:"failures"` LastFailedAt time.Time `json:"last_failed_at"` } // CircuitBreakerPersister 是断路器状态持久化的接口. // // 升华改进(ELEVATED): 接口驱动设计-- // // 默认实现:FilePersister(JSON 文件,per-project by cwd hash) // 扩展实现:Redis 适配器(多副本共享,P2 TODO) // 测试替代:NoopPersister(不写文件,测试无副作用) // // Save/Load 都是幂等操作:失败时静默返回 error(不影响压缩主流程), // 因为持久化是增强功能,不是核心路径. type CircuitBreakerPersister interface { // Save 将断路器状态持久化. // 在 RecordFailure / Reset 后调用. Save(state BreakerState) error // Load 加载上次持久化的状态. // 返回 (nil, nil) 表示无历史状态(首次启动或状态过期). Load() (*BreakerState, error) } // ─────────────────��─────────────────────────────���───────────────────── // FilePersister -- 默认实现 // ───────────────────────────────────────────────────────────────────���─ // FilePersister 将断路器状态写入 JSON 文件. // // 文件路径:`{baseDir}/{cwdHash}.json` // // baseDir 默认为 `~/.flyto/compact_breaker/` // cwdHash = SHA-256(cwd)[:16](16 个 hex 字符,防止路径冲突) // // 精妙之处(CLEVER): 用 cwd 的 SHA-256 前缀而非 cwd 本身做文件名, // 避免路径中的斜杠,空格等特殊字符导致文件名不合法. // 16 hex 字符 = 64 位熵,碰撞概率极低(生日悖论:10^6 个 cwd 碰撞概率 < 0.001%). type FilePersister struct { path string // 完整文件路径(含文件名) } // NewFilePersister 创建 FilePersister. // // baseDir:持久化文件目录,空字符串时使用默认路径 `~/.flyto/compact_breaker/`. // cwd:当前工作目录,用于 per-project 隔离(传入 os.Getwd() 的结果). func NewFilePersister(baseDir, cwd string) (*FilePersister, error) { if baseDir == "" { home, err := os.UserHomeDir() if err != nil { return nil, fmt.Errorf("compact_persist: get home dir: %w", err) } baseDir = filepath.Join(home, ".flyto", "compact_breaker") } if err := os.MkdirAll(baseDir, 0o700); err != nil { return nil, fmt.Errorf("compact_persist: mkdir %s: %w", baseDir, err) } hash := cwdHash(cwd) return &FilePersister{ path: filepath.Join(baseDir, hash+".json"), }, nil } // Save 将状态原子写入文件(write-then-rename 防止写一半的文件被读到). // // 升华改进(ELEVATED): atomic write = 写临时文件 → rename. // 直接写文件(os.WriteFile)在进程崩溃时会产生截断的 JSON, // 下次 Load 会 json.Unmarshal 失败并返回 error. // rename 是 POSIX 原子操作,要么旧文件要么新文件,没有中间态. func (f *FilePersister) Save(state BreakerState) error { data, err := json.Marshal(state) if err != nil { return fmt.Errorf("compact_persist: marshal: %w", err) } // 写临时文件 tmp := f.path + ".tmp" if err := os.WriteFile(tmp, data, 0o600); err != nil { return fmt.Errorf("compact_persist: write tmp: %w", err) } // 原子 rename if err := os.Rename(tmp, f.path); err != nil { _ = os.Remove(tmp) // 清理临时文件 return fmt.Errorf("compact_persist: rename: %w", err) } return nil } // Load 从文件加载状态,并检查 TTL. // // 以下情况返回 (nil, nil)(视为"无历史状态"): // - 文件不存在(首次启动) // - 状态已过期(LastFailedAt 超过 breakerStateTTL) // - Failures == 0(成功后 Reset 写入的清零状态) // // 以下情况返回 (nil, error)(IO 或解析失败,调用方 fail-open 忽略): // - 文件读取失败 // - JSON 解析失败(如截断文件,理论上不应发生因为有原子写入) func (f *FilePersister) Load() (*BreakerState, error) { data, err := os.ReadFile(f.path) if os.IsNotExist(err) { return nil, nil // 首次启动,正常 } if err != nil { return nil, fmt.Errorf("compact_persist: read: %w", err) } var state BreakerState if err := json.Unmarshal(data, &state); err != nil { return nil, fmt.Errorf("compact_persist: unmarshal: %w", err) } // 清零状态(Reset 后写入的),视为无历史 if state.Failures == 0 { return nil, nil } // TTL 检查:超过 1 小时的失败记录丢弃 if !state.LastFailedAt.IsZero() && time.Since(state.LastFailedAt) > breakerStateTTL { return nil, nil // 状态已过期,断路器自动恢复 } return &state, nil } // ───────────────────────────────────────────────────────────────────── // NoopPersister -- 测试/禁用用 // ────────────────────────────────────────���──────────────────────────── // NoopPersister 不做任何持久化. // 用于测试(避免测试污染文件系统)或明确禁用持久化的场景. type NoopPersister struct{} func (n *NoopPersister) Save(_ BreakerState) error { return nil } func (n *NoopPersister) Load() (*BreakerState, error) { return nil, nil } // ───────────────────────────────────────────────────────────────────── // 工具函数 // ───────────────────────────────────��───────────────────────────────── // cwdHash 返回 cwd 的 SHA-256 前 16 个 hex 字符,用作文件名. // 精妙之处(CLEVER): 前 16 hex = 64 位,碰撞概率对任何实际使用量可忽略不计. // 全 64 hex(256 位)作文件名更安全但更长,16 位已经足够. func cwdHash(cwd string) string { h := sha256.Sum256([]byte(cwd)) return fmt.Sprintf("%x", h[:8]) // 8 bytes = 16 hex chars }