package memory // sync.go - 记忆同步的可插拔适配器接口(模块 10.2). // // # 定位 // // 早期实现(1256 行)把同步逻辑硬编码为: // - 认证:Anthropic 第一方 OAuth // - 作用域:GitHub repo slug // - 冲突策略:本地永远胜(local wins) // - Pull 时机:session 启动一次 // // 这套设计对 CLI 单用户 + GitHub 场景很合理,但对以下场景完全不适用: // - HTTP API 无状态模式("本地"是临时容器,本地胜 = 数据丢失) // - 私有部署 / 企业内网(无法访问 Anthropic OAuth) // - 仓储/医疗等离线场景(无网络) // - 多人 SDK 嵌入(每请求独立,Pull 频率需精细控制) // // # 升华改进(ELEVATED) // // 我们的设计:三层可配置 // 1. SyncAdapter 接口--后端完全可替换(Git,HTTP,NFS,noop) // 2. ConflictPolicy--冲突策略随场景切换(LocalWins/ServerWins/Merge/Fail) // 3. PullPolicy--Pull 时机精细控制(OnSessionStart/WithTTL/Always/Never) // // 调用方通过 WithSyncAdapter(adapter, config) 注入,引擎不感知任何具体后端或认证机制. // 替代方案:<像 TS 一样在 memory 包内硬编码 HTTP 同步> - 否决原因: // // HTTP API 模式下无法使用,私有部署无法替换后端,测试需要真实网络. // // # 使用示例 // // // CLI 模式:Git 后端,session 开始 Pull 一次,本地胜 // store := memory.NewFileStoreWithOptions(cwd, // memory.WithSyncAdapter( // sync_git.New(gitDir, sync_git.Options{Remote: "origin", Branch: "main"}), // memory.DefaultSyncConfig(), // ), // ) // // // API 模式:HTTP 后端,TTL=1min,服务器胜 // store := memory.NewFileStoreWithOptions(cwd, // memory.WithSyncAdapter(httpAdapter, memory.APISyncConfig(time.Minute)), // ) import ( "context" "sync" "time" ) // ───────────────────────────────────────────────────────────────────────────── // ConflictPolicy - 冲突解决策略 // ───────────────────────────────────────────────────────────────────────────── // ConflictPolicy 定义 Push 时遇到写冲突的处理策略. // // 冲突场景:两个 session 同时修改了同一记忆文件, // 后 Push 的一方发现服务器版本已被修改(HTTP 412 / git diverge). type ConflictPolicy int const ( // ConflictLocalWins 本地版本覆盖服务器版本. // // 适用:CLI 单用户,用户正在活跃编辑,不希望 teammate 的远端更新覆盖自己的工作. // 早期实现 的行为:412 时刷新 serverChecksums 再 push,等价于本地覆盖. // 注意:另一方的修改会丢失(静默),不适用于高并发写场景. ConflictLocalWins ConflictPolicy = iota // ConflictServerWins 服务器版本覆盖本地版本. // // 适用:HTTP API 无状态模式--"本地"是临时容器,一旦冲突说明本地已过时, // 应丢弃本地,以远端为准. // 语义:Pull 成功后 Push,若 Pull 前有未提交的本地更改则丢弃. ConflictServerWins // ConflictMerge 三路合并,冲突时产生冲突标记(<<<< HEAD). // // 适用:Git 后端,多人协作,需要完整历史记录. // Git 的 rebase/merge 处理大多数非交叉修改,只有真正冲突才产生标记. // AI 可以读取并解决冲突标记,不需要人工介入. ConflictMerge // ConflictFail 发生冲突时返回错误,由调用方决定如何处理. // // 适用:需要强一致性的场景(金融,医疗),任何冲突都需要显式确认. // 调用方可以提示用户,重试,或放弃本次写入. ConflictFail ) // String 返回策略名称,方便日志和错误消息. func (p ConflictPolicy) String() string { switch p { case ConflictLocalWins: return "local_wins" case ConflictServerWins: return "server_wins" case ConflictMerge: return "merge" case ConflictFail: return "fail" default: return "unknown" } } // ───────────────────────────────────────────────────────────────────────────── // PullPolicy - Pull 触发策略 // ───────────────────────────────────────────────────────────────────────────── // PullPolicy 定义何时自动触发 Pull 操作. // // Pull 的代价因后端不同而差异巨大: // - Git 本地 fetch:毫秒级 // - HTTP API(304 Not Modified):网络 RTT,通常 50-200ms // - HTTP API(需要传输内容):RTT + 传输时间,可达数秒 // // 精妙之处(CLEVER): PullPolicy 是 Pull 时机的纯策略描述,不含时间戳状态-- // 状态(lastPullTime,pulled 标志)保存在 fileStore 中. // 这样同一个 PullPolicy 值可以安全地在多个 store 实例间共享(无数据竞争). type PullPolicy int const ( // PullOnSessionStart 每个 store 实例生命周期内只 Pull 一次. // // 适用:CLI 模式,session 生命周期与 store 生命周期一致. // "首次读操作"触发 Pull,之后不再 Pull,保持 session 内的一致性. PullOnSessionStart PullPolicy = iota // PullWithTTL 距上次 Pull 超过 SyncConfig.PullTTL 才重新 Pull. // // 适用:SDK 嵌入,长生命周期服务,请求频繁但不希望每次都 Pull. // TTL 是新鲜度与性能的平衡点--TTL 越短一致性越强,TTL 越长 I/O 越少. PullWithTTL // PullAlways 每次读操作(List/FindRelevant)前都 Pull. // // 适用:强一致性要求,允许每次读都承受 Pull 开销(如审计日志场景). // 警告:高频读场景下会显著增加延迟,谨慎使用. PullAlways // PullNever 不自动 Pull. // // 适用:离线场景,纯写场景,调用方手动控制同步时机. // 也是 NoopSyncAdapter 的隐含策略(IsAvailable=false 时自动不 Pull). PullNever ) // ───────────────────────────────────────────────────────────────────────────── // SyncConfig - 同步行为配置 // ───────────────────────────────────────────────────────────────────────────── // SyncConfig 组合了冲突策略和 Pull 策略,描述 fileStore 的完整同步行为. type SyncConfig struct { // ConflictPolicy 是写冲突时的处理方式. ConflictPolicy ConflictPolicy // PullPolicy 是自动 Pull 的触发条件. PullPolicy PullPolicy // PullTTL 是 PullWithTTL 策略的冷却时间. // 仅在 PullPolicy == PullWithTTL 时有效,其他策略忽略此字段. PullTTL time.Duration } // DefaultSyncConfig 返回适合 CLI 单用户的默认配置. // // 策略:session 开始时 Pull 一次,本地修改优先. // 对应场景:开发者本地运行 CLI,低频写,期望自己的改动不被覆盖. func DefaultSyncConfig() SyncConfig { return SyncConfig{ ConflictPolicy: ConflictLocalWins, PullPolicy: PullOnSessionStart, } } // APISyncConfig 返回适合 HTTP API 高频无状态模式的配置. // // 策略:TTL 内缓存(避免每次请求都 Pull),冲突时服务器胜(本地是临时缓存). // 对应场景:多个无状态 API server 实例共享同一 memory 后端. // // ttl 建议值: // - 强一致性场景:30s-1min // - 一般场景:5min // - 只关心启动时状态:PullOnSessionStart(使用 DefaultSyncConfig) func APISyncConfig(ttl time.Duration) SyncConfig { return SyncConfig{ ConflictPolicy: ConflictServerWins, PullPolicy: PullWithTTL, PullTTL: ttl, } } // ───────────────────────────────────────────────────────────────────────────── // SyncAdapter - 同步后端接口 // ───────────────────────────────────────────────────────────────────────────── // SyncAdapter 是记忆同步的可插拔后端接口. // // 接口设计原则: // 1. 操作目录而非单文件--后端(git,HTTP)天然是批量操作, // 逐文件接口会丢失原子性保证. // 2. 不感知记忆格式--SyncAdapter 只传输 .md 文件, // 不解析 frontmatter,不懂 Memory.Entry,保持职责分离. // 3. 凭证/认证由实现者管理--接口不传 token, // GitSyncAdapter 用 SSH key,HTTPSyncAdapter 用 Bearer header, // 引擎不存任何凭证. // // 升华改进(ELEVATED): 早期实现没有接口抽象, // 直接在函数内调用 Anthropic API(硬编码 URL + OAuth). // 我们通过接口解耦,同一 fileStore 可在 CLI/SDK/API 模式下使用完全不同的后端, // 无需改引擎代码. // 替代方案:<像 TS 那样在 memory 包内内置 HTTP sync> - // 否决:耦合特定 API server,无法用于私有部署,测试需要 mock HTTP. // // Shape: synchronous callback. Engine (at configured sync triggers) calls // Push / Pull synchronously; the adapter talks to the remote backend // (git / HTTP / Notion / custom) and returns. // // 形态: 同步回调. 引擎在配置的同步触发点同步调 Push / Pull; adapter 对接 // 远端后端 (git / HTTP / Notion / 自定义) 然后返回. type SyncAdapter interface { // Pull 从远端拉取最新状态到本地目录. // // 实现约定: // - 必须是幂等的:多次 Pull 结果相同 // - 不应删除本地存在但远端不存在的文件(防止意外丢失) // 除非 ConflictServerWins 语义要求(由实现者决定) // - context 取消时应尽快返回 // // 返回值:pulled 是实际更新(新增或覆盖)的文件数,0 表示无变化. Pull(ctx context.Context, localDir string) (pulled int, err error) // Push 将本地目录的变化上传到远端. // // policy 控制写冲突时的行为: // - ConflictLocalWins:本地覆盖远端 // - ConflictServerWins:检测到冲突时先 Pull 再 Push(服务器版本保留) // - ConflictMerge:三路合并(Git 后端支持,HTTP 后端可能不支持) // - ConflictFail:冲突时直接返回 ErrSyncConflict // // 返回值:pushed 是实际上传的文件数(delta),0 表示无变化. Push(ctx context.Context, localDir string, policy ConflictPolicy) (pushed int, err error) // IsAvailable 检查后端是否可用(凭证有效,网络可达,工具存在等). // // 精妙之处(CLEVER): fileStore 在每次 shouldPull/shouldPush 前调用此方法. // NoopSyncAdapter 永远返回 false,使 fileStore 在无同步需求时完全跳过同步逻辑, // 零 overhead.同时允许后端在运行时动态上线(如网络恢复后返回 true). IsAvailable() bool } // ErrSyncConflict 是 ConflictFail 策略下检测到冲突时返回的错误. // 调用方可以用 errors.Is(err, memory.ErrSyncConflict) 判断. var ErrSyncConflict = &syncConflictError{} type syncConflictError struct{} func (e *syncConflictError) Error() string { return "memory: sync conflict detected" } func (e *syncConflictError) Is(target error) bool { _, ok := target.(*syncConflictError) return ok } // ───────────────────────────────────────────────────────────────────────────── // NoopSyncAdapter - 空实现(默认,向后兼容) // ───────────────────────────────────────────────────────────────────────────── // NoopSyncAdapter 是不执行任何同步的空实现. // // 所有现有 NewFileStore* 调用默认使用此实现(即不同步), // 向后兼容--现有代码无需任何修改即可升级到支持同步的版本. // // 精妙之处(CLEVER): IsAvailable() 始终返回 false, // fileStore 的 shouldPull/shouldPush 在 IsAvailable=false 时立即跳过, // 不会有任何锁竞争或时间戳检查开销. // 如果用"空方法直接返回"代替,shouldPull 逻辑里仍然需要 nil 检查,代码更复杂. type NoopSyncAdapter struct{} // Pull 是空操作,始终返回 (0, nil). func (n *NoopSyncAdapter) Pull(_ context.Context, _ string) (int, error) { return 0, nil } // Push 是空操作,始终返回 (0, nil). func (n *NoopSyncAdapter) Push(_ context.Context, _ string, _ ConflictPolicy) (int, error) { return 0, nil } // IsAvailable 始终返回 false,使 fileStore 完全跳过同步逻辑. func (n *NoopSyncAdapter) IsAvailable() bool { return false } // ───────────────────────────────────────────────────────────────────────────── // syncState - fileStore 内部同步状态(非导出) // ───────────────────────────────────────────────────────────────────────────── // syncState 追踪 fileStore 实例的同步状态. // // 与 SyncConfig 分离的原因:SyncConfig 是静态配置(只读), // syncState 是运行时可变状态(需要锁保护). // 精妙之处(CLEVER): 分离静态配置和动态状态-- // 同一 SyncConfig 可以被多个 store 共享(无数据竞争), // 每个 store 有独立的 syncState(互不干扰). type syncState struct { mu sync.Mutex lastPullTime time.Time // 上次 Pull 完成的时间(PullWithTTL 使用) pulled bool // 是否已 Pull 过(PullOnSessionStart 使用) } // shouldPull 根据 PullPolicy 判断是否需要执行 Pull. // // 线程安全:内部加锁,可并发调用. // 精妙之处(CLEVER): shouldPull 在判断"是否需要 Pull"的同时更新状态-- // PullOnSessionStart 在返回 true 的同时将 pulled 设为 true, // 防止并发 List() 调用触发两次 Pull(类似 sync.Once 的语义,但可重置). func (s *syncState) shouldPull(adapter SyncAdapter, cfg SyncConfig) bool { if adapter == nil || !adapter.IsAvailable() { return false } s.mu.Lock() defer s.mu.Unlock() switch cfg.PullPolicy { case PullOnSessionStart: // 精妙之处(CLEVER): pulled 标志在返回 true 的瞬间就设为 true, // 确保并发调用时只有第一个返回 true. // 如果先检查后设置(check-then-act),两个并发 goroutine 可能都通过检查, // 导致双重 Pull. if s.pulled { return false } s.pulled = true return true case PullWithTTL: if cfg.PullTTL <= 0 { // TTL 未设置时退化为 Always return true } if time.Since(s.lastPullTime) < cfg.PullTTL { return false } // 更新时间戳(在锁内,防止并发重复 Pull) s.lastPullTime = time.Now() return true case PullAlways: return true case PullNever: return false default: return false } } // resetForTest 重置同步状态(仅测试用). func (s *syncState) resetForTest() { s.mu.Lock() defer s.mu.Unlock() s.pulled = false s.lastPullTime = time.Time{} }