// Anthropic 特有重试策略 -- 针对 Anthropic API 的优化子集. // // 其他供应商无需使用,可通过 NewCompositeRetryPolicy 组合通用策略. // // 包含: // - SubscriptionAwareRetry: 订阅用户 429 不自动重试(窗口级限制等待时间太长) // - FastModeCooldown: 快速模式 429/529 降级到标准速度 // - ModelFallback: 连续过载时触发模型降级信号 // - StaleConnectionRecovery: ECONNRESET/EPIPE 时标记需要重建连接 // - NewAnthropicRetryPolicy: 工厂函数组合出最优策略 // // 注意:这些策略对应早期方案 中的 Anthropic 特有逻辑, // 在注释中标注了早期方案的行号和对应关系,方便交叉参考和维护. package retry import ( "time" ) // ============================================================ // SubscriptionAwareRetry - 订阅用户限流策略 // ============================================================ // SubscriptionAwareRetry 根据用户订阅类型调整 429 重试行为. // // 对应早期方案 L737-741, L765-769: // - Pro/Max 用户 429 通常是窗口级限制(5小时/7天),retry-after 可能是几小时 // - Enterprise 用户通常是 PAYG,429 是短暂限流,可以重试 // - API Key 用户(非订阅)可以重试 // // 精妙之处(CLEVER): 不是"所有 429 都不重试",而是根据用户类型判断. // 订阅用户的 429 等待时间太长(小时级),自动重试毫无意义. type SubscriptionAwareRetry struct { // IsSubscriber 判断当前用户是否为订阅用户(Pro/Max) IsSubscriber func() bool // IsEnterprise 判断当前用户是否为企业用户 IsEnterprise func() bool } // ShouldRetry 订阅用户遇到 429 不自动重试(除非是企业用户). func (s *SubscriptionAwareRetry) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision { if err.Category() != "rate_limit" { return nil // 非限流,不管 } if s.IsSubscriber != nil && s.IsSubscriber() { if s.IsEnterprise != nil && s.IsEnterprise() { return nil // 企业用户可以重试 } return &RetryDecision{ Retry: false, Reason: "subscription rate limit (window-based, wait time too long)", } } return nil // 非订阅用户,让下游决定 } // ============================================================ // FastModeCooldown - 快速模式降级策略 // ============================================================ // FastModeCooldown 处理快速模式遇到限流/过载时的降级逻辑. // // 对应早期方案 L262-314: // - 短 retry-after(<20s):保持快速模式重试(保 prompt cache) // - 长 retry-after 或 529:进入冷却期,降级到标准速度 // - overage 被拒绝:永久禁用快速模式 // // 精妙之处(CLEVER): 短延迟保持快速模式是为了保住 prompt cache-- // 切换模型名(快速→标准)会导致 cache 失效,得重新计算. // 20 秒以内的等待比重建 cache 便宜. type FastModeCooldown struct { // IsFastMode 判断当前是否在快速模式 IsFastMode func() bool // ShortRetryThreshold 短延迟阈值(低于此值保持快速模式) ShortRetryThreshold time.Duration // CooldownDuration 默认冷却时间(retry-after 未知时使用) CooldownDuration time.Duration // MinCooldown 最低冷却时间(防止过快切回导致反复抖动) MinCooldown time.Duration // OnCooldownStart 进入冷却期回调(上层据此切换模型) OnCooldownStart func(duration time.Duration, reason string) // OnPermanentDisable overage 被拒绝时永久禁用回调 OnPermanentDisable func(reason string) } // DefaultFastModeCooldown 返回默认配置. func DefaultFastModeCooldown() *FastModeCooldown { return &FastModeCooldown{ ShortRetryThreshold: 20 * time.Second, CooldownDuration: 30 * time.Minute, MinCooldown: 10 * time.Minute, } } // ShouldRetry 快速模式下的限流/过载处理. func (f *FastModeCooldown) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision { // 不在快速模式,不管 if f.IsFastMode == nil || !f.IsFastMode() { return nil } // 只处理限流和过载 if err.Category() != "rate_limit" && err.Category() != "server_overload" { return nil } // 检查 overage 被拒绝(永久禁用) headers := err.Headers() if headers != nil { overageReason := headers.Get("anthropic-ratelimit-unified-overage-disabled-reason") if overageReason != "" { if f.OnPermanentDisable != nil { f.OnPermanentDisable(overageReason) } return &RetryDecision{ Retry: true, Delay: 0, // 立即重试(以标准速度) Reason: "fast mode permanently disabled: " + overageReason, } } } // 短 retry-after:保持快速模式重试 retryDelay := err.RetryDelay() if retryDelay > 0 && retryDelay < f.ShortRetryThreshold { return &RetryDecision{ Retry: true, Delay: retryDelay, Reason: "fast mode short retry (preserving prompt cache)", } } // 长 retry-after 或未知:进入冷却期 cooldown := f.CooldownDuration if retryDelay > cooldown { cooldown = retryDelay } if cooldown < f.MinCooldown { cooldown = f.MinCooldown } reason := "overloaded" if err.Category() == "rate_limit" { reason = "rate_limit" } if f.OnCooldownStart != nil { f.OnCooldownStart(cooldown, reason) } return &RetryDecision{ Retry: true, Delay: 0, // 立即重试(以标准速度,上层通过回调已切换) Reason: "fast mode cooldown: " + reason, } } // ============================================================ // ModelFallback - 模型降级策略 // ============================================================ // ModelFallback 在连续过载后触发模型降级信号. // // 对应早期方案 L327-365: // // 连续 3 次 529 → throw FallbackTriggeredError → 上层切换到 fallbackModel // // 精妙之处(CLEVER): 不直接切换模型(那是上层产品逻辑的事), // 只返回 Retry=false + 特定 Reason,Retryer.Do 据此返回 FallbackTriggeredError. // 但这个策略需要与 ConsecutiveLimit 配合--ConsecutiveLimit 先判断次数到了, // 本策略再决定是否降级(而非直接失败). // // 注意:这个策略应该放在 ConsecutiveLimit 之后, // 当 ConsecutiveLimit 拒绝重试时,Retryer 检查 FallbackModel 决定是否降级. type ModelFallback struct { // ConsecutiveThreshold 触发降级的连续过载次数 ConsecutiveThreshold int } // ShouldRetry 检查是否应触发模型降级. func (m *ModelFallback) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision { if err.Category() != "server_overload" { return nil } threshold := m.ConsecutiveThreshold if threshold <= 0 { threshold = 3 } count := ctx.ConsecutiveCounts["server_overload"] if count >= threshold && ctx.FallbackModel != "" { // 返回特殊 reason,Retryer.Do 据此抛 FallbackTriggeredError return &RetryDecision{ Retry: false, Reason: "model_fallback:" + ctx.FallbackModel, } } return nil } // ============================================================ // NewAnthropicRetryPolicy - Anthropic 工厂函数 // ============================================================ // AnthropicRetryOpts 是 Anthropic 重试策略的配置选项. type AnthropicRetryOpts struct { // MaxRetries 最大重试次数(默认 10) MaxRetries int // IsSubscriber 是否为订阅用户 IsSubscriber func() bool // IsEnterprise 是否为企业用户 IsEnterprise func() bool // IsFastMode 是否在快速模式 IsFastMode func() bool // OnFastModeCooldown 快速模式冷却回调 OnFastModeCooldown func(duration time.Duration, reason string) // OnFastModeDisable 快速模式永久禁用回调 OnFastModeDisable func(reason string) // FallbackThreshold 触发模型降级的连续 529 次数(默认 3) FallbackThreshold int } // NewAnthropicRetryPolicy 创建针对 Anthropic API 优化的组合重试策略. // // 策略优先级(从高到低): // 1. ForegroundOnly - 后台 529 直接失败(防容量级联) // 2. ServerDirective - 服务端说不重试就不重试 // 3. SubscriptionAwareRetry - 订阅用户 429 不重试(等太久) // 4. FastModeCooldown - 快速模式降级逻辑 // 5. ModelFallback - 连续过载触发模型降级 // 6. ExponentialBackoff - 兜底退避 // // 注意顺序很重要:ForegroundOnly 必须在最前面(高优先级), // ExponentialBackoff 必须在最后面(兜底). func NewAnthropicRetryPolicy(opts AnthropicRetryOpts) RetryPolicy { maxRetries := opts.MaxRetries if maxRetries <= 0 { maxRetries = 10 } fallbackThreshold := opts.FallbackThreshold if fallbackThreshold <= 0 { fallbackThreshold = 3 } policies := []RetryPolicy{ // 1. 后台 529 不重试 &ForegroundOnly{}, // 2. 服务端指令 &ServerDirective{}, } // 3. 订阅感知(可选--如果提供了判断函数) if opts.IsSubscriber != nil { policies = append(policies, &SubscriptionAwareRetry{ IsSubscriber: opts.IsSubscriber, IsEnterprise: opts.IsEnterprise, }) } // 4. 快速模式降级(可选--如果提供了判断函数) if opts.IsFastMode != nil { cooldown := DefaultFastModeCooldown() cooldown.IsFastMode = opts.IsFastMode cooldown.OnCooldownStart = opts.OnFastModeCooldown cooldown.OnPermanentDisable = opts.OnFastModeDisable policies = append(policies, cooldown) } // 5. 模型降级 policies = append(policies, &ModelFallback{ ConsecutiveThreshold: fallbackThreshold, }) // 6. 兜底退避 policies = append(policies, &ExponentialBackoff{ MaxRetries: maxRetries, }) return NewCompositeRetryPolicy(policies...) }