// 重试策略系统 -- 可组合的重试决策框架. // // 升华改进(ELEVATED): 早期方案 withRetry 是 822 行的单函数巨兽, // 把前景/后台分流,快速模式降级,退避计算,模型降级,认证恢复, // max_tokens 修正 6 个正交关注点全部塞在一个 for 循环里. // // 本设计:RetryPolicy 接口 + CompositeRetryPolicy 叠加, // 每个策略只关心一个维度,通过组合得到复杂行为. // 工厂函数(如 NewAnthropicRetryPolicy)针对特定供应商组合最优策略. // // 替代方案:<原方案 822 行 AsyncGenerator 包含所有逻辑> package retry import ( "context" "net/http" "time" "git.flytoex.net/yuanwei/flyto-agent/internal/apierror" ) // RetryInfo 是 apierror.RetryInfo 的别名,用于 retry.RetryError 接口. type RetryInfo = apierror.RetryInfo // ============================================================ // RetryPolicy - 重试决策接口 // ============================================================ // RetryError 是 retry 包对 API 错误的抽象. // api.APIError 实现了这个接口,使 retry 包可以处理 api 错误而不需要导入 api 包. type RetryError interface { Error() string Category() string IsRetryable() bool RetryDelay() time.Duration Message() string Headers() http.Header RetryInfo() *RetryInfo } // RetryPolicy 决定一个失败请求是否应该重试. type RetryPolicy interface { ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision } // RetryDecision 是重试决策结果. type RetryDecision struct { // Retry 是否重试 Retry bool // Delay 建议的等待时间 Delay time.Duration // Reason 决策原因(可观测性:为什么重试/不重试) Reason string } // RetryContext 是重试的请求上下文. type RetryContext struct { // QuerySource labels the caller ("main_thread"/"summary"/"compact"/...) // so retry-layer diagnostics can attribute a failed retry to its origin. // It is a free-form string (not an enum) so third-party providers and // call sites outside pkg/engine can add their own source labels without // touching retry-layer types. // // Wire: callers inject via retry.WithQuerySource(ctx, s); transport // Client reads it via retry.QuerySourceFromCtx(ctx) into this field at // rctx construction (internal/transport/client.go). Retryer.Do embeds // it into CannotRetryError.Reason so an operator watching the error // stream sees "policy denied (source=compact)" instead of opaque // "policy denied". Future retry policies may branch on this field // (e.g. be more aggressive about dropping background sources), but the // field is deliberately not yet policy-reading -- diagnostics first, // behavior changes only after data. // // QuerySource 标记请求调用方 ("main_thread"/"summary"/"compact" 等) // 让重试层诊断把失败重试归因到来源. 自由字符串 (非枚举), 第三方 // provider 和 pkg/engine 外的 call site 可以加自定义标签, 不需要碰 // 重试层类型. // // Wire: 调用方经 retry.WithQuerySource(ctx, s) 注入; transport Client // 经 retry.QuerySourceFromCtx(ctx) 在 rctx 构造时填充此字段 // (internal/transport/client.go). Retryer.Do 将其嵌入 // CannotRetryError.Reason, 运维在错误流中看到 "policy denied // (source=compact)" 而不是含糊的 "policy denied". 未来重试策略可 // 按此字段分支 (如对后台来源更激进地放弃), 但当前字段**不**被策略 // 读 -- 先诊断, 行为变更需数据支撑. QuerySource string // IsForeground 用户是否在等结果. // 精妙之处(CLEVER): 调用方声明式标注,而非重试模块维护白名单. // 早期方案用 FOREGROUND_529_RETRY_SOURCES Set 硬编码 15+ 来源--每加一种要改重试模块. // 声明式方式:调用方设 IsForeground=true,重试模块只读. // 替代方案:<原方案在重试模块中维护前景来源白名单> IsForeground bool // StartTime 首次尝试时间(用于计算总耗时上限) StartTime time.Time // MaxDuration 最大总重试时长(0=不限制,仅受 MaxRetries 控制) // 升华改进(ELEVATED): 仓储 daemon 模式需要 "尽力重试但有上限", // 不是早期方案的 "无限重试" 也不是 "固定次数后放弃". MaxDuration time.Duration // ConsecutiveCounts 按错误类别统计连续出现次数. // 精妙之处(CLEVER): 由 Retryer 在每次失败后自动维护-- // 连续出现同一类错误递增,出现不同类错误清零. // ConsecutiveLimit 策略读取此字段做决策. ConsecutiveCounts map[string]int // Model 当前使用的模型(用于模型降级策略) Model string // FallbackModel 降级模型(用于模型降级策略) FallbackModel string // MaxTokensOverride 被溢出修正调整过的 max_tokens(0=未调整) MaxTokensOverride int } // ============================================================ // CompositeRetryPolicy - 叠加策略(宪法第8条) // ============================================================ // CompositeRetryPolicy 按顺序组合多个策略. // 第一个返回非 nil 决策的策略胜出. // // 升华改进(ELEVATED): 策略的顺序就是优先级-- // 前景/后台分流放在最前(优先级最高),退避兜底放在最后. // 运行时可通过 Add 动态追加策略(如仓储场景追加 PLC 重试策略). type CompositeRetryPolicy struct { policies []RetryPolicy } // NewCompositeRetryPolicy 创建组合策略. func NewCompositeRetryPolicy(policies ...RetryPolicy) *CompositeRetryPolicy { return &CompositeRetryPolicy{policies: policies} } // Add 追加策略(优先级最低). func (c *CompositeRetryPolicy) Add(p RetryPolicy) { c.policies = append(c.policies, p) } // ShouldRetry 按顺序评估,第一个明确决策的胜出. func (c *CompositeRetryPolicy) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision { for _, p := range c.policies { if d := p.ShouldRetry(err, attempt, ctx); d != nil { return d } } // 所有策略都不表态--默认不重试 return &RetryDecision{Retry: false, Reason: "no policy matched"} } // ============================================================ // Context-scoped QuerySource plumbing // ============================================================ // querySourceCtxKey is the unexported context key for QuerySource. Using a // typed struct{} instead of a string avoids collisions with unrelated // ctx-values (standard Go idiom). // // querySourceCtxKey 是 QuerySource 的不导出 context key. 使用 struct{} // 类型而非 string 避免与无关 ctx-value 冲突 (Go 标准习惯). type querySourceCtxKey struct{} // WithQuerySource injects a call-site source label into ctx so the // downstream transport Client can fill RetryContext.QuerySource. Callers // pass labels like "main_thread" / "compact" / "summary"; the field is // free-form so third-party providers can add their own labels. // // WithQuerySource 将调用方来源标签注入 ctx, 下游 transport Client 从 // 中填充 RetryContext.QuerySource. 调用方传 "main_thread" / "compact" / // "summary" 等; 字段自由格式, 第三方 provider 可自定义标签. func WithQuerySource(ctx context.Context, source string) context.Context { return context.WithValue(ctx, querySourceCtxKey{}, source) } // QuerySourceFromCtx reads the call-site source label from ctx. Returns // empty string when not injected -- callers should handle this as "source // unknown" rather than error out, so an uninstrumented path still works. // // QuerySourceFromCtx 从 ctx 读调用方来源标签. 未注入时返回空串 -- 调用 // 方应按"来源未知"处理而不报错, 让未插桩路径仍能工作. func QuerySourceFromCtx(ctx context.Context) string { v, _ := ctx.Value(querySourceCtxKey{}).(string) return v }