package evolve import ( "context" "encoding/json" "errors" "fmt" "net/url" "os" "path/filepath" "sort" "strconv" "strings" "sync" "time" ) // FileParameterStore is the local-filesystem implementation of ParameterStore. // // On-disk layout: // // // // current.json # latest value + version number // history/v1.json # one file per version (Change struct) // history/v2.json // lock.json # presence = locked, content has reason + timestamp // // Key escaping: url.PathEscape, so dotted/slashed keys // (e.g. "evolve.carrier_risk_penalty/Y-express") map to a single safe dir name. // // Concurrency model: // - Single process: sync.RWMutex serialises writers, in-memory subscriber // list protected by the same lock. // - File atomicity: temp file + os.Rename (POSIX atomic rename on same FS). // - Cross-process: not guaranteed. The platform-layer SQL ParameterStore // handles multi-tenant + multi-process concurrency. // // LEGACY: Watch is in-process only. External edits to files (or another // process holding the same root) are not detected. fsnotify is intentionally // excluded (CLAUDE.md rule 8: zero external deps). Multi-process change // notification is the SQL impl's job (LISTEN/NOTIFY). type FileParameterStore struct { root string mu sync.RWMutex subs []*subscription } type subscription struct { prefix string ch chan ChangeEvent } type currentFile struct { Value any `json:"value"` Version int `json:"version"` } type lockFile struct { Reason string `json:"reason"` Timestamp time.Time `json:"timestamp"` } // NewFileParameterStore returns a store rooted at dir. // Creates the root directory if missing. func NewFileParameterStore(root string) (*FileParameterStore, error) { if err := os.MkdirAll(root, 0o755); err != nil { return nil, fmt.Errorf("evolve: create store root: %w", err) } return &FileParameterStore{root: root}, nil } func (s *FileParameterStore) keyDir(key string) string { return filepath.Join(s.root, url.PathEscape(key)) } func (s *FileParameterStore) currentPath(key string) string { return filepath.Join(s.keyDir(key), "current.json") } func (s *FileParameterStore) lockPath(key string) string { return filepath.Join(s.keyDir(key), "lock.json") } func (s *FileParameterStore) historyDir(key string) string { return filepath.Join(s.keyDir(key), "history") } func (s *FileParameterStore) historyPath(key string, version int) string { return filepath.Join(s.historyDir(key), fmt.Sprintf("v%d.json", version)) } // atomicWrite writes data to path atomically via temp file + rename. // CLEVER: POSIX rename is atomic on the same filesystem, so concurrent // readers either see the old file or the new file, never a partial write. func atomicWrite(path string, data []byte, perm os.FileMode) error { dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0o755); err != nil { return err } tmp, err := os.CreateTemp(dir, ".tmp-*") if err != nil { return err } tmpPath := tmp.Name() cleanup := func() { _ = os.Remove(tmpPath) } if _, err := tmp.Write(data); err != nil { _ = tmp.Close() cleanup() return err } if err := tmp.Chmod(perm); err != nil { _ = tmp.Close() cleanup() return err } if err := tmp.Sync(); err != nil { _ = tmp.Close() cleanup() return err } if err := tmp.Close(); err != nil { cleanup() return err } if err := os.Rename(tmpPath, path); err != nil { cleanup() return err } return nil } // readCurrent reads current.json. Returns ErrParameterNotFound if missing. func (s *FileParameterStore) readCurrent(key string) (*currentFile, error) { data, err := os.ReadFile(s.currentPath(key)) if errors.Is(err, os.ErrNotExist) { return nil, ErrParameterNotFound } if err != nil { return nil, err } var c currentFile if err := json.Unmarshal(data, &c); err != nil { return nil, fmt.Errorf("evolve: decode current: %w", err) } return &c, nil } // readHistory reads a history entry by version. func (s *FileParameterStore) readHistory(key string, version int) (*Change, error) { data, err := os.ReadFile(s.historyPath(key, version)) if errors.Is(err, os.ErrNotExist) { return nil, ErrVersionNotFound } if err != nil { return nil, err } var c Change if err := json.Unmarshal(data, &c); err != nil { return nil, fmt.Errorf("evolve: decode history: %w", err) } return &c, nil } // isLocked checks lock.json existence. Caller must hold s.mu. func (s *FileParameterStore) isLocked(key string) bool { _, err := os.Stat(s.lockPath(key)) return err == nil } // Get implements ParameterStore. func (s *FileParameterStore) Get(ctx context.Context, key string) (any, int, error) { s.mu.RLock() defer s.mu.RUnlock() c, err := s.readCurrent(key) if err != nil { return nil, 0, err } return c.Value, c.Version, nil } // Set implements ParameterStore. Returns ErrParameterLocked if the key is // currently locked, ErrReasonRequired on empty reason. func (s *FileParameterStore) Set(ctx context.Context, key string, value any, reason string) (int, error) { if reason == "" { return 0, ErrReasonRequired } s.mu.Lock() defer s.mu.Unlock() if s.isLocked(key) { return 0, ErrParameterLocked } return s.writeNewVersion(key, value, reason, "user") } // writeNewVersion appends a new history entry and updates current.json. // Caller must hold s.mu (write lock). func (s *FileParameterStore) writeNewVersion(key string, value any, reason, author string) (int, error) { var nextVersion int cur, err := s.readCurrent(key) switch { case err == nil: nextVersion = cur.Version + 1 case errors.Is(err, ErrParameterNotFound): nextVersion = 1 default: return 0, err } change := Change{ Version: nextVersion, Value: value, Reason: reason, Timestamp: time.Now().UTC(), Author: author, } histData, err := json.MarshalIndent(change, "", " ") if err != nil { return 0, err } if err := atomicWrite(s.historyPath(key, nextVersion), histData, 0o644); err != nil { return 0, err } curData, err := json.MarshalIndent(currentFile{Value: value, Version: nextVersion}, "", " ") if err != nil { return 0, err } if err := atomicWrite(s.currentPath(key), curData, 0o644); err != nil { return 0, err } s.broadcast(ChangeEvent{Key: key, Change: change, IsLock: false}) return nextVersion, nil } // List implements ParameterStore. Returns keys sorted ascending. func (s *FileParameterStore) List(ctx context.Context, prefix string) ([]string, error) { s.mu.RLock() defer s.mu.RUnlock() entries, err := os.ReadDir(s.root) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, nil } return nil, err } var keys []string for _, e := range entries { if !e.IsDir() { continue } decoded, err := url.PathUnescape(e.Name()) if err != nil { continue } if _, err := os.Stat(filepath.Join(s.root, e.Name(), "current.json")); err != nil { continue } if prefix == "" || strings.HasPrefix(decoded, prefix) { keys = append(keys, decoded) } } sort.Strings(keys) return keys, nil } // History implements ParameterStore. // Returns versions in ascending order. limit <= 0 means no cap; otherwise // the most-recent `limit` versions are returned. func (s *FileParameterStore) History(ctx context.Context, key string, limit int) ([]Change, error) { s.mu.RLock() defer s.mu.RUnlock() entries, err := os.ReadDir(s.historyDir(key)) if err != nil { if errors.Is(err, os.ErrNotExist) { if _, e2 := os.Stat(s.keyDir(key)); errors.Is(e2, os.ErrNotExist) { return nil, ErrParameterNotFound } return []Change{}, nil } return nil, err } var versions []int for _, e := range entries { if e.IsDir() { continue } name := e.Name() if !strings.HasPrefix(name, "v") || !strings.HasSuffix(name, ".json") { continue } v, err := strconv.Atoi(name[1 : len(name)-len(".json")]) if err != nil { continue } versions = append(versions, v) } sort.Ints(versions) if limit > 0 && len(versions) > limit { versions = versions[len(versions)-limit:] } out := make([]Change, 0, len(versions)) for _, v := range versions { c, err := s.readHistory(key, v) if err != nil { return nil, err } out = append(out, *c) } return out, nil } // Rollback implements ParameterStore. // Allowed even when the key is locked (per interface contract: locked params // can still be rolled back to escape a bad release without first unlocking). // Rollback creates a new version that copies the target's value, preserving // the audit chain. func (s *FileParameterStore) Rollback(ctx context.Context, key string, toVersion int, reason string) (int, error) { if reason == "" { return 0, ErrReasonRequired } s.mu.Lock() defer s.mu.Unlock() target, err := s.readHistory(key, toVersion) if err != nil { return 0, err } rollbackReason := fmt.Sprintf("rollback to v%d: %s", toVersion, reason) return s.writeNewVersion(key, target.Value, rollbackReason, "rollback") } // Lock implements ParameterStore. Requires the key to exist (returns // ErrParameterNotFound otherwise). Re-locking an already-locked key // overwrites the lock reason and emits a fresh lock event. func (s *FileParameterStore) Lock(ctx context.Context, key string, reason string) error { if reason == "" { return ErrReasonRequired } s.mu.Lock() defer s.mu.Unlock() cur, err := s.readCurrent(key) if err != nil { return err } now := time.Now().UTC() data, err := json.MarshalIndent(lockFile{Reason: reason, Timestamp: now}, "", " ") if err != nil { return err } if err := atomicWrite(s.lockPath(key), data, 0o644); err != nil { return err } s.broadcast(ChangeEvent{ Key: key, Change: Change{ Version: cur.Version, Reason: reason, Timestamp: now, Author: "lock", }, IsLock: true, }) return nil } // Unlock implements ParameterStore. No-op if the key is not currently locked // (idempotent), in which case no event is emitted. func (s *FileParameterStore) Unlock(ctx context.Context, key string, reason string) error { if reason == "" { return ErrReasonRequired } s.mu.Lock() defer s.mu.Unlock() if !s.isLocked(key) { return nil } if err := os.Remove(s.lockPath(key)); err != nil && !errors.Is(err, os.ErrNotExist) { return err } s.broadcast(ChangeEvent{ Key: key, Change: Change{ Reason: reason, Timestamp: time.Now().UTC(), Author: "unlock", }, IsLock: true, }) return nil } // Watch implements ParameterStore. // Returns a buffered channel receiving events whose Key starts with keyPrefix // (empty prefix matches all). Channel closes when ctx is canceled. // // CLEVER: events are dropped on a full subscriber buffer. A slow consumer // must not block the writer path; production watchers should drain promptly // or accept lossy semantics. Buffer size 16 absorbs short bursts. func (s *FileParameterStore) Watch(ctx context.Context, keyPrefix string) (<-chan ChangeEvent, error) { ch := make(chan ChangeEvent, 16) sub := &subscription{prefix: keyPrefix, ch: ch} s.mu.Lock() s.subs = append(s.subs, sub) s.mu.Unlock() go func() { <-ctx.Done() s.mu.Lock() for i, x := range s.subs { if x == sub { s.subs = append(s.subs[:i], s.subs[i+1:]...) break } } s.mu.Unlock() close(ch) }() return ch, nil } // broadcast pushes evt to subscribers whose prefix matches evt.Key. // Caller must hold s.mu (write lock). Non-blocking: drops on full buffer. func (s *FileParameterStore) broadcast(evt ChangeEvent) { for _, sub := range s.subs { if sub.prefix != "" && !strings.HasPrefix(evt.Key, sub.prefix) { continue } select { case sub.ch <- evt: default: } } }