package reflector import ( "context" "errors" "fmt" "git.flytoex.net/yuanwei/flyto-agent/pkg/evolve" "git.flytoex.net/yuanwei/flyto-agent/pkg/validator" ) // ErrExtractFailed wraps any error returned by a caller-supplied // extractor function. Adapters return it with %w so callers can // detect the class with errors.Is. // // ErrExtractFailed 包装 extractor 回调返回的 error. adapter 以 %w // 透出, 调用方用 errors.Is 识别. var ErrExtractFailed = errors.New("reflector: extractor failed") // CandidateToDiff converts an evolve.Candidate into a validator.DiffInput. // A non-nil error aborts the conversion; adapters wrap it with // ErrExtractFailed before propagating. // // CandidateToDiff 把 evolve.Candidate 转为 validator.DiffInput. // 返回非 nil error 则终止转换, adapter 以 ErrExtractFailed 包装透出. type CandidateToDiff func(c evolve.Candidate) (validator.DiffInput, error) // DiffToCandidate converts a validator.DiffInput into an evolve.Candidate. // // DiffToCandidate 把 validator.DiffInput 转为 evolve.Candidate. type DiffToCandidate func(d validator.DiffInput) (evolve.Candidate, error) // EventToDiff converts an evolve.ReplayEvent into a validator.DiffInput // for side-channel validation during replay. // // EventToDiff 把 evolve.ReplayEvent 转为 validator.DiffInput, 用于 // 回放期旁路校验. type EventToDiff func(ev evolve.ReplayEvent) (validator.DiffInput, error) // EventToCandidate converts an evolve.ReplayEvent into an evolve.Candidate // for side-channel scoring during replay. // // EventToCandidate 把 evolve.ReplayEvent 转为 evolve.Candidate, 用于 // 回放期旁路打分. type EventToCandidate func(ev evolve.ReplayEvent) (evolve.Candidate, error) // VerdictSink receives each Verdict produced by ValidatorAsReflector, // along with the originating ReplayEvent and any Validate error (or // ErrExtractFailed from a failing extractor). The sink is how the // side-channel signal leaves the adapter (log, metric, audit, // parameter-evolver backpressure). OnEvent itself always returns nil // so it never stops the replayer loop. // // VerdictSink 接收 ValidatorAsReflector 产出的每个 Verdict, 附带触发 // 的 ReplayEvent 与 Validate error (或 extractor 失败时的 // ErrExtractFailed). sink 是旁路信号离开 adapter 的唯一出口 (日志 / // 指标 / 审计 / 反压给 ParameterEvolver). OnEvent 始终返回 nil, 不中断 // replayer. type VerdictSink func(ev evolve.ReplayEvent, v validator.Verdict, err error) // FitnessSink receives each fitness score produced by // EvaluatorAsReflector, plus breakdown and any Score error (or // ErrExtractFailed). Same contract as VerdictSink. // // FitnessSink 接收 EvaluatorAsReflector 产出的 fitness, 附带 breakdown // 与 Score error (或 ErrExtractFailed). 契约与 VerdictSink 一致. type FitnessSink func(ev evolve.ReplayEvent, fitness float64, breakdown map[string]float64, err error) // ---------------------------------------------------------------------------- // ValidatorAsEvaluator // ---------------------------------------------------------------------------- // ValidatorAsEvaluatorOption configures ValidatorAsEvaluator. // // ValidatorAsEvaluatorOption 配置 ValidatorAsEvaluator. type ValidatorAsEvaluatorOption func(*validatorAsEvaluatorConfig) type validatorAsEvaluatorConfig struct { rejectFitness float64 // default 0: Approved=false forces fitness=0 } // WithRejectFitness sets the fitness value ValidatorAsEvaluator emits // when Verdict.Approved is false. Default is 0, which is the safe // choice (GA tournaments will not prefer a rejected candidate over an // approved one with any positive score). Set to a positive value to // preserve gradient signal for GA use cases that need to differentiate // "barely rejected" from "hard rejected". // // WithRejectFitness 设置 ValidatorAsEvaluator 在 Verdict.Approved=false // 时产出的 fitness. 默认 0, 保守选择 (GA 锦标选择不会把"被拒候选" // 排在"通过+任意正分"之前). 设为正值可保留 GA 梯度信号, 用于区分 // "微拒" 与 "硬拒". func WithRejectFitness(f float64) ValidatorAsEvaluatorOption { return func(c *validatorAsEvaluatorConfig) { c.rejectFitness = f } } // ValidatorAsEvaluator wraps a validator.Validator so it satisfies // evolve.Evaluator. extract turns each Candidate into a DiffInput the // underlying Validator can review. If Verdict.Approved is true, the // returned fitness is Verdict.Score. Otherwise fitness is 0 (override // via WithRejectFitness). Verdict.Details is forwarded into breakdown // on a best-effort basis (only numeric entries survive). // // Panics at construction if src or extract is nil -- both are // mandatory and nil indicates a programming error that should surface // early rather than at the first Score call. // // ValidatorAsEvaluator 把 validator.Validator 包为 evolve.Evaluator. // extract 把每个 Candidate 转为 DiffInput 交给底层 Validator 审查. // Verdict.Approved=true 时返回 fitness=Verdict.Score; 否则返回 0 // (可用 WithRejectFitness 覆写). Verdict.Details 尽力透传为 breakdown // (仅数值条目存活). // // 构造时 src 或 extract 为 nil 直接 panic -- 两者均为必传, nil 是编程 // 错误应尽早暴露. func ValidatorAsEvaluator(src validator.Validator, extract CandidateToDiff, opts ...ValidatorAsEvaluatorOption) evolve.Evaluator { if src == nil { panic("reflector: ValidatorAsEvaluator: nil Validator") } if extract == nil { panic("reflector: ValidatorAsEvaluator: nil extractor") } cfg := validatorAsEvaluatorConfig{rejectFitness: 0} for _, o := range opts { o(&cfg) } return &validatorAsEvaluator{src: src, extract: extract, cfg: cfg} } type validatorAsEvaluator struct { src validator.Validator extract CandidateToDiff cfg validatorAsEvaluatorConfig } func (a *validatorAsEvaluator) Score(ctx context.Context, c evolve.Candidate) (float64, map[string]float64, error) { diff, err := a.extract(c) if err != nil { return 0, nil, fmt.Errorf("%w: %v", ErrExtractFailed, err) } v, err := a.src.Validate(ctx, diff) if err != nil { return 0, nil, err } fitness := v.Score if !v.Approved { fitness = a.cfg.rejectFitness } return fitness, detailsToBreakdown(v.Details), nil } // detailsToBreakdown converts Verdict.Details (map[string]any) into // the breakdown shape (map[string]float64) on a best-effort basis. // Non-numeric entries are skipped; a fully non-numeric map returns nil. // // detailsToBreakdown 尽力把 Verdict.Details (map[string]any) 转为 // breakdown (map[string]float64). 非数值条目跳过; 若全不可转则返回 nil. func detailsToBreakdown(d map[string]any) map[string]float64 { if len(d) == 0 { return nil } out := make(map[string]float64, len(d)) for k, v := range d { switch x := v.(type) { case float64: out[k] = x case float32: out[k] = float64(x) case int: out[k] = float64(x) case int64: out[k] = float64(x) } } if len(out) == 0 { return nil } return out } // ---------------------------------------------------------------------------- // EvaluatorAsValidator // ---------------------------------------------------------------------------- // EvaluatorAsValidatorOption configures EvaluatorAsValidator. // // EvaluatorAsValidatorOption 配置 EvaluatorAsValidator. type EvaluatorAsValidatorOption func(*evaluatorAsValidatorConfig) type evaluatorAsValidatorConfig struct { name string belowThresholdSeverity validator.Severity policyVersion string } // WithName overrides the adapter's Name(). Default is // "evaluator-as-validator". Validators in the same process must not // share a Name; if two EvaluatorAsValidator instances coexist, at // least one must carry an explicit WithName. // // WithName 覆写 adapter 的 Name(). 默认 "evaluator-as-validator". 同 // 进程内 Validator 不能同名; 若两个 EvaluatorAsValidator 共存, 至少 // 一个必须显式传 WithName. func WithName(n string) EvaluatorAsValidatorOption { return func(c *evaluatorAsValidatorConfig) { c.name = n } } // WithBelowThresholdSeverity controls the Severity attached to the // Verdict when fitness < threshold. Default is validator.SeverityBlock // (fail-closed). Set to validator.SeverityWarn for advisory-only // scoring. // // WithBelowThresholdSeverity 控制 fitness < threshold 时 Verdict 的 // Severity. 默认 validator.SeverityBlock (fail-closed). 可设为 // validator.SeverityWarn 作为 advisory-only 打分. func WithBelowThresholdSeverity(s validator.Severity) EvaluatorAsValidatorOption { return func(c *evaluatorAsValidatorConfig) { c.belowThresholdSeverity = s } } // WithPolicyVersion sets the PolicyVersion field the adapter stamps // on every Verdict it emits, so replay audits can tell which policy // version approved or rejected a past diff. // // WithPolicyVersion 设置 adapter 在产出 Verdict 时写入的 PolicyVersion // 字段, 便于回放审计判断当时的策略版本. func WithPolicyVersion(v string) EvaluatorAsValidatorOption { return func(c *evaluatorAsValidatorConfig) { c.policyVersion = v } } // EvaluatorAsValidator wraps an evolve.Evaluator so it satisfies // validator.Validator. extract turns each DiffInput into a Candidate // the underlying Evaluator can score. fitness >= threshold yields // Approved=true with SeverityWarn; below yields Approved=false with // Severity controlled by WithBelowThresholdSeverity (default Block). // breakdown is forwarded into Verdict.Details. // // Panics at construction if src or extract is nil. // // EvaluatorAsValidator 把 evolve.Evaluator 包为 validator.Validator. // extract 把每个 DiffInput 转为 Candidate 交给底层 Evaluator 打分. // fitness >= threshold 产出 Approved=true 且 SeverityWarn; 低于阈值 // 产出 Approved=false 且 Severity 由 WithBelowThresholdSeverity 控制 // (默认 Block). breakdown 透传至 Verdict.Details. // // 构造时 src 或 extract 为 nil 直接 panic. func EvaluatorAsValidator(src evolve.Evaluator, extract DiffToCandidate, threshold float64, opts ...EvaluatorAsValidatorOption) validator.Validator { if src == nil { panic("reflector: EvaluatorAsValidator: nil Evaluator") } if extract == nil { panic("reflector: EvaluatorAsValidator: nil extractor") } cfg := evaluatorAsValidatorConfig{ name: "evaluator-as-validator", belowThresholdSeverity: validator.SeverityBlock, } for _, o := range opts { o(&cfg) } return &evaluatorAsValidator{src: src, extract: extract, threshold: threshold, cfg: cfg} } type evaluatorAsValidator struct { src evolve.Evaluator extract DiffToCandidate threshold float64 cfg evaluatorAsValidatorConfig } func (a *evaluatorAsValidator) Name() string { return a.cfg.name } func (a *evaluatorAsValidator) Validate(ctx context.Context, diff validator.DiffInput) (validator.Verdict, error) { cand, err := a.extract(diff) if err != nil { return validator.Verdict{}, fmt.Errorf("%w: %v", ErrExtractFailed, err) } fitness, breakdown, err := a.src.Score(ctx, cand) if err != nil { return validator.Verdict{}, err } approved := fitness >= a.threshold v := validator.Verdict{ Approved: approved, Score: fitness, ValidatorName: a.cfg.name, PolicyVersion: a.cfg.policyVersion, Details: breakdownToDetails(breakdown), } if approved { v.Severity = validator.SeverityWarn v.Reason = fmt.Sprintf("fitness %.4f >= threshold %.4f", fitness, a.threshold) } else { v.Severity = a.cfg.belowThresholdSeverity v.Reason = fmt.Sprintf("fitness %.4f < threshold %.4f", fitness, a.threshold) } return v, nil } func breakdownToDetails(b map[string]float64) map[string]any { if len(b) == 0 { return nil } out := make(map[string]any, len(b)) for k, v := range b { out[k] = v } return out } // ---------------------------------------------------------------------------- // ValidatorAsReflector (side-channel) // ---------------------------------------------------------------------------- // ValidatorAsReflector wraps a validator.Validator so it satisfies // evolve.Reflector for side-channel replay observation. extract turns // each ReplayEvent into a DiffInput; sink receives the resulting // Verdict along with any Validate error or ErrExtractFailed. // OnEvent always returns nil (Reflector contract: never halt the // replayer), so extract / Validate errors flow through sink only. // // Panics at construction if src, extract or sink is nil. // // ValidatorAsReflector 把 validator.Validator 包为 evolve.Reflector // 做回放旁路观察. extract 把 ReplayEvent 转为 DiffInput; sink 收到 // 产出的 Verdict 及 Validate error 或 ErrExtractFailed. OnEvent 始终 // 返回 nil (Reflector 契约: 不中断 replayer), extract / Validate 错误 // 仅通过 sink 传出. // // 构造时 src / extract / sink 任一为 nil 直接 panic. func ValidatorAsReflector(src validator.Validator, extract EventToDiff, sink VerdictSink) evolve.Reflector { if src == nil { panic("reflector: ValidatorAsReflector: nil Validator") } if extract == nil { panic("reflector: ValidatorAsReflector: nil extractor") } if sink == nil { panic("reflector: ValidatorAsReflector: nil sink") } return &validatorAsReflector{src: src, extract: extract, sink: sink} } type validatorAsReflector struct { src validator.Validator extract EventToDiff sink VerdictSink } func (a *validatorAsReflector) OnEvent(ctx context.Context, ev evolve.ReplayEvent) error { diff, err := a.extract(ev) if err != nil { a.sink(ev, validator.Verdict{}, fmt.Errorf("%w: %v", ErrExtractFailed, err)) return nil } v, err := a.src.Validate(ctx, diff) a.sink(ev, v, err) return nil } // ---------------------------------------------------------------------------- // EvaluatorAsReflector (side-channel) // ---------------------------------------------------------------------------- // EvaluatorAsReflector wraps an evolve.Evaluator so it satisfies // evolve.Reflector for side-channel replay scoring. extract turns // each ReplayEvent into a Candidate; sink receives the resulting // fitness, breakdown, and any Score error or ErrExtractFailed. // OnEvent always returns nil. // // Panics at construction if src, extract or sink is nil. // // EvaluatorAsReflector 把 evolve.Evaluator 包为 evolve.Reflector 做 // 回放旁路打分. extract 把 ReplayEvent 转为 Candidate; sink 收到产出 // 的 fitness / breakdown / Score error 或 ErrExtractFailed. OnEvent // 始终返回 nil. // // 构造时 src / extract / sink 任一为 nil 直接 panic. func EvaluatorAsReflector(src evolve.Evaluator, extract EventToCandidate, sink FitnessSink) evolve.Reflector { if src == nil { panic("reflector: EvaluatorAsReflector: nil Evaluator") } if extract == nil { panic("reflector: EvaluatorAsReflector: nil extractor") } if sink == nil { panic("reflector: EvaluatorAsReflector: nil sink") } return &evaluatorAsReflector{src: src, extract: extract, sink: sink} } type evaluatorAsReflector struct { src evolve.Evaluator extract EventToCandidate sink FitnessSink } func (a *evaluatorAsReflector) OnEvent(ctx context.Context, ev evolve.ReplayEvent) error { cand, err := a.extract(ev) if err != nil { a.sink(ev, 0, nil, fmt.Errorf("%w: %v", ErrExtractFailed, err)) return nil } fitness, breakdown, err := a.src.Score(ctx, cand) a.sink(ev, fitness, breakdown, err) return nil }