Documentation
¶
Overview ¶
Anthropic 特有重试策略 -- 针对 Anthropic API 的优化子集.
其他供应商无需使用,可通过 NewCompositeRetryPolicy 组合通用策略.
包含:
- SubscriptionAwareRetry: 订阅用户 429 不自动重试(窗口级限制等待时间太长)
- FastModeCooldown: 快速模式 429/529 降级到标准速度
- ModelFallback: 连续过载时触发模型降级信号
- StaleConnectionRecovery: ECONNRESET/EPIPE 时标记需要重建连接
- NewAnthropicRetryPolicy: 工厂函数组合出最优策略
注意:这些策略对应早期方案 中的 Anthropic 特有逻辑, 在注释中标注了早期方案的行号和对应关系,方便交叉参考和维护.
通用重试策略 -- 跨行业可用的重试策略实现.
包含:
- ExponentialBackoff: 指数退避 + 抖动(防 thundering herd)
- ForegroundOnly: 后台请求不重试 529/过载(防容量级联放大)
- ConsecutiveLimit: 连续 N 次同类错误后放弃
- ServerDirective: 尊重服务端的 x-should-retry 指令
- MaxAttemptsLimit: 最大重试次数限制
这些策略不依赖任何特定供应商,仓储/金融/编程场景通用. Anthropic 特有策略在 anthropic.go 中.
Context Overflow 修正 -- 自动调整 max_tokens 避免超出上下文窗口.
精妙之处(CLEVER): API 返回 "input length and `max_tokens` exceed context limit: 188059 + 20000 > 200000" 时,自动计算安全的 max_tokens 值并注入 RetryContext, 让 Retryer 的下次尝试使用修正后的值.用户完全无感知.
升华改进(ELEVATED): 早期方案把这段逻辑混在 822 行的 withRetry 主循环中. 我们提取为独立模块,可以被 Retryer 或其他组件复用.
替代方案:<原方案嵌在 withRetry 的 catch 块中,与重试/退避/降级代码交织>
重试策略系统 -- 可组合的重试决策框架.
升华改进(ELEVATED): 早期方案 withRetry 是 822 行的单函数巨兽, 把前景/后台分流,快速模式降级,退避计算,模型降级,认证恢复, max_tokens 修正 6 个正交关注点全部塞在一个 for 循环里.
本设计:RetryPolicy 接口 + CompositeRetryPolicy 叠加, 每个策略只关心一个维度,通过组合得到复杂行为. 工厂函数(如 NewAnthropicRetryPolicy)针对特定供应商组合最优策略.
替代方案:<原方案 822 行 AsyncGenerator 包含所有逻辑>
Retryer -- 带重试的操作执行器.
精妙之处(CLEVER): 不是 AsyncGenerator(Go 不需要 yield 心跳), 而是带回调的 Do() 方法.Observer 事件代替 yield 做可观测性.
设计要点:
- 自动维护 RetryContext 的 ConsecutiveCounts
- 每次重试前通过 Observer 发 "api_retry" 事件
- 支持 context.Context 取消
- ContextOverflowHandler 自动修正 max_tokens
Index ¶
- func QuerySourceFromCtx(ctx context.Context) string
- func WithQuerySource(ctx context.Context, source string) context.Context
- type AnthropicRetryOpts
- type CannotRetryError
- type CompositeRetryPolicy
- type ConsecutiveLimit
- type ContextOverflowHandler
- type ExponentialBackoff
- type FallbackTriggeredError
- type FastModeCooldown
- type ForegroundOnly
- type MaxAttemptsLimit
- type ModelFallback
- type OverflowInfo
- type RetryContext
- type RetryDecision
- type RetryError
- type RetryInfo
- type RetryPolicy
- type RetryableFunc
- type Retryer
- type ServerDirective
- type SubscriptionAwareRetry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func QuerySourceFromCtx ¶
QuerySourceFromCtx reads the call-site source label from ctx. Returns empty string when not injected -- callers should handle this as "source unknown" rather than error out, so an uninstrumented path still works.
QuerySourceFromCtx 从 ctx 读调用方来源标签. 未注入时返回空串 -- 调用 方应按"来源未知"处理而不报错, 让未插桩路径仍能工作.
func WithQuerySource ¶
WithQuerySource injects a call-site source label into ctx so the downstream transport Client can fill RetryContext.QuerySource. Callers pass labels like "main_thread" / "compact" / "summary"; the field is free-form so third-party providers can add their own labels.
WithQuerySource 将调用方来源标签注入 ctx, 下游 transport Client 从 中填充 RetryContext.QuerySource. 调用方传 "main_thread" / "compact" / "summary" 等; 字段自由格式, 第三方 provider 可自定义标签.
Types ¶
type AnthropicRetryOpts ¶
type AnthropicRetryOpts struct {
// MaxRetries 最大重试次数(默认 10)
MaxRetries int
// IsSubscriber 是否为订阅用户
IsSubscriber func() bool
// IsEnterprise 是否为企业用户
IsEnterprise func() bool
// IsFastMode 是否在快速模式
IsFastMode func() bool
// OnFastModeCooldown 快速模式冷却回调
OnFastModeCooldown func(duration time.Duration, reason string)
// OnFastModeDisable 快速模式永久禁用回调
OnFastModeDisable func(reason string)
// FallbackThreshold 触发模型降级的连续 529 次数(默认 3)
FallbackThreshold int
}
AnthropicRetryOpts 是 Anthropic 重试策略的配置选项.
type CannotRetryError ¶
type CannotRetryError struct {
// Original 是最后一次尝试的错误
Original error
// Attempts 总尝试次数
Attempts int
// Reason 放弃重试的原因
Reason string
}
CannotRetryError 当所有重试耗尽后包装原始错误返回.
func (*CannotRetryError) Error ¶
func (e *CannotRetryError) Error() string
func (*CannotRetryError) Unwrap ¶
func (e *CannotRetryError) Unwrap() error
type CompositeRetryPolicy ¶
type CompositeRetryPolicy struct {
// contains filtered or unexported fields
}
CompositeRetryPolicy 按顺序组合多个策略. 第一个返回非 nil 决策的策略胜出.
升华改进(ELEVATED): 策略的顺序就是优先级-- 前景/后台分流放在最前(优先级最高),退避兜底放在最后. 运行时可通过 Add 动态追加策略(如仓储场景追加 PLC 重试策略).
func NewCompositeRetryPolicy ¶
func NewCompositeRetryPolicy(policies ...RetryPolicy) *CompositeRetryPolicy
NewCompositeRetryPolicy 创建组合策略.
func (*CompositeRetryPolicy) Add ¶
func (c *CompositeRetryPolicy) Add(p RetryPolicy)
Add 追加策略(优先级最低).
func (*CompositeRetryPolicy) ShouldRetry ¶
func (c *CompositeRetryPolicy) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 按顺序评估,第一个明确决策的胜出.
type ConsecutiveLimit ¶
type ConsecutiveLimit struct {
// Category 要追踪的错误类别字符串(如 "server_overload")
Category string
// Limit 连续出现次数上限
Limit int
}
ConsecutiveLimit 在连续 N 次同类错误后放弃重试.
精妙之处(CLEVER): 早期方案硬编码 MAX_529_RETRIES=3,然后触发模型降级. 我们泛化为任意错误类别 + 可配置上限,模型降级由上层或另一个策略负责.
用法示例:连续 3 次 ErrOverloaded → 放弃(上层可以据此触发降级)
func (*ConsecutiveLimit) ShouldRetry ¶
func (c *ConsecutiveLimit) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 检查连续同类错误计数.
type ContextOverflowHandler ¶
type ContextOverflowHandler struct {
// FloorOutputTokens 输出 token 最低保障(默认 3000)
// 即使上下文快满了,也至少留这么多 token 给输出.
// 低于此值意味着没有足够空间生成有用回复,不如直接失败.
FloorOutputTokens int
// SafetyBuffer 安全余量(默认 1000)
// 防止因 token 估算误差导致再次溢出.
SafetyBuffer int
// ThinkingBudget 当前 thinking token 预算(0=未启用 thinking)
// max_tokens 必须能容纳 thinking budget + 至少 1 个 output token.
ThinkingBudget int
}
ContextOverflowHandler 处理 "input + max_tokens > context limit" 错误, 自动计算安全的 max_tokens 值.
func DefaultOverflowHandler ¶
func DefaultOverflowHandler() *ContextOverflowHandler
DefaultOverflowHandler 返回默认配置.
func (*ContextOverflowHandler) Adjust ¶
func (h *ContextOverflowHandler) Adjust(info *OverflowInfo) (adjustedMax int, ok bool)
Adjust 计算修正后的 max_tokens. 返回修正值和 true,或 0 和 false(表示空间不足,不应重试).
精妙之处(CLEVER): 三层保护--
- available = contextLimit - inputTokens - safetyBuffer
- 必须 >= FloorOutputTokens(否则回复太短没意义)
- 必须 >= thinkingBudget + 1(否则 thinking 无法运行)
type ExponentialBackoff ¶
type ExponentialBackoff struct {
// BaseDelay 初始退避时间(默认 500ms)
BaseDelay time.Duration
// MaxDelay 最大退避时间上限(默认 32s)
MaxDelay time.Duration
// MaxRetries 最大重试次数(默认 10)
MaxRetries int
// JitterPct 抖动比例(默认 0.25 = 25%)
JitterPct float64
}
ExponentialBackoff 实现指数退避策略.
精妙之处(CLEVER): 公式 baseDelay * 2^(attempt-1) + random * jitter% * base 经典的退避算法,被所有主流 SDK 采用(AWS/GCP/Azure 都用类似实现). 抖动防止分布式环境下多个客户端同时重试导致 thundering herd.
替代方案:
- 固定间隔重试:简单但不适应负载波动
- 线性退避:退避太慢,高负载时效果差
- 全随机退避:退避不可预测,调试困难
func DefaultExponentialBackoff ¶
func DefaultExponentialBackoff() *ExponentialBackoff
DefaultExponentialBackoff 返回默认配置的指数退避策略.
func (*ExponentialBackoff) ShouldRetry ¶
func (b *ExponentialBackoff) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 如果错误本身可重试且未超过最大次数,返回带退避的重试决策.
type FallbackTriggeredError ¶
FallbackTriggeredError 表示触发了模型降级. 上层捕获后可以用 FallbackModel 重新发起请求.
func (*FallbackTriggeredError) Error ¶
func (e *FallbackTriggeredError) Error() string
func (*FallbackTriggeredError) Unwrap ¶
func (e *FallbackTriggeredError) Unwrap() error
type FastModeCooldown ¶
type FastModeCooldown struct {
// IsFastMode 判断当前是否在快速模式
IsFastMode func() bool
// ShortRetryThreshold 短延迟阈值(低于此值保持快速模式)
ShortRetryThreshold time.Duration
// CooldownDuration 默认冷却时间(retry-after 未知时使用)
CooldownDuration time.Duration
// MinCooldown 最低冷却时间(防止过快切回导致反复抖动)
MinCooldown time.Duration
// OnCooldownStart 进入冷却期回调(上层据此切换模型)
OnCooldownStart func(duration time.Duration, reason string)
// OnPermanentDisable overage 被拒绝时永久禁用回调
OnPermanentDisable func(reason string)
}
FastModeCooldown 处理快速模式遇到限流/过载时的降级逻辑.
对应早期方案 L262-314:
- 短 retry-after(<20s):保持快速模式重试(保 prompt cache)
- 长 retry-after 或 529:进入冷却期,降级到标准速度
- overage 被拒绝:永久禁用快速模式
精妙之处(CLEVER): 短延迟保持快速模式是为了保住 prompt cache-- 切换模型名(快速→标准)会导致 cache 失效,得重新计算. 20 秒以内的等待比重建 cache 便宜.
func DefaultFastModeCooldown ¶
func DefaultFastModeCooldown() *FastModeCooldown
DefaultFastModeCooldown 返回默认配置.
func (*FastModeCooldown) ShouldRetry ¶
func (f *FastModeCooldown) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 快速模式下的限流/过载处理.
type ForegroundOnly ¶
type ForegroundOnly struct{}
ForegroundOnly 限制只有前景请求才重试 529/过载错误.
精妙之处(CLEVER): 容量级联时,每次重试是 3-10× 网关放大. 后台请求(标题生成,摘要,分类器)失败了用户看不到, 重试只会让过载更严重.
升华改进(ELEVATED): 早期方案用 FOREGROUND_529_RETRY_SOURCES Set 硬编码 15+ 来源, 每加一种查询源要改重试模块.我们让调用方声明 IsForeground=true/false.
替代方案:<原方案在重试模块维护前景来源白名单 Set>
func (*ForegroundOnly) ShouldRetry ¶
func (f *ForegroundOnly) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 后台请求遇到过载直接放弃.
type MaxAttemptsLimit ¶
type MaxAttemptsLimit struct {
Max int
}
MaxAttemptsLimit 是独立的最大重试次数策略. 与 ExponentialBackoff.MaxRetries 不同,这是一个独立策略, 可以放在 Composite 的最前面作为硬上限.
func (*MaxAttemptsLimit) ShouldRetry ¶
func (m *MaxAttemptsLimit) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 超过最大次数直接放弃.
type ModelFallback ¶
type ModelFallback struct {
// ConsecutiveThreshold 触发降级的连续过载次数
ConsecutiveThreshold int
}
ModelFallback 在连续过载后触发模型降级信号.
对应早期方案 L327-365:
连续 3 次 529 → throw FallbackTriggeredError → 上层切换到 fallbackModel
精妙之处(CLEVER): 不直接切换模型(那是上层产品逻辑的事), 只返回 Retry=false + 特定 Reason,Retryer.Do 据此返回 FallbackTriggeredError. 但这个策略需要与 ConsecutiveLimit 配合--ConsecutiveLimit 先判断次数到了, 本策略再决定是否降级(而非直接失败).
注意:这个策略应该放在 ConsecutiveLimit 之后, 当 ConsecutiveLimit 拒绝重试时,Retryer 检查 FallbackModel 决定是否降级.
func (*ModelFallback) ShouldRetry ¶
func (m *ModelFallback) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 检查是否应触发模型降级.
type OverflowInfo ¶
type OverflowInfo struct {
InputTokens int // 实际输入 token 数
MaxTokens int // 当前设置的 max_tokens
ContextLimit int // 模型的上下文窗口上限
}
OverflowInfo 是从错误消息中解析出的溢出信息.
func ParseOverflow ¶
func ParseOverflow(message string) *OverflowInfo
ParseOverflow 从错误消息中提取溢出信息. 返回 nil 表示消息不匹配(不是 context overflow 错误).
type RetryContext ¶
type RetryContext struct {
// QuerySource labels the caller ("main_thread"/"summary"/"compact"/...)
// so retry-layer diagnostics can attribute a failed retry to its origin.
// It is a free-form string (not an enum) so third-party providers and
// call sites outside pkg/engine can add their own source labels without
// touching retry-layer types.
//
// Wire: callers inject via retry.WithQuerySource(ctx, s); transport
// Client reads it via retry.QuerySourceFromCtx(ctx) into this field at
// rctx construction (internal/transport/client.go). Retryer.Do embeds
// it into CannotRetryError.Reason so an operator watching the error
// stream sees "policy denied (source=compact)" instead of opaque
// "policy denied". Future retry policies may branch on this field
// (e.g. be more aggressive about dropping background sources), but the
// field is deliberately not yet policy-reading -- diagnostics first,
// behavior changes only after data.
//
// QuerySource 标记请求调用方 ("main_thread"/"summary"/"compact" 等)
// 让重试层诊断把失败重试归因到来源. 自由字符串 (非枚举), 第三方
// provider 和 pkg/engine 外的 call site 可以加自定义标签, 不需要碰
// 重试层类型.
//
// Wire: 调用方经 retry.WithQuerySource(ctx, s) 注入; transport Client
// 经 retry.QuerySourceFromCtx(ctx) 在 rctx 构造时填充此字段
// (internal/transport/client.go). Retryer.Do 将其嵌入
// CannotRetryError.Reason, 运维在错误流中看到 "policy denied
// (source=compact)" 而不是含糊的 "policy denied". 未来重试策略可
// 按此字段分支 (如对后台来源更激进地放弃), 但当前字段**不**被策略
// 读 -- 先诊断, 行为变更需数据支撑.
QuerySource string
// IsForeground 用户是否在等结果.
// 精妙之处(CLEVER): 调用方声明式标注,而非重试模块维护白名单.
// 早期方案用 FOREGROUND_529_RETRY_SOURCES Set 硬编码 15+ 来源--每加一种要改重试模块.
// 声明式方式:调用方设 IsForeground=true,重试模块只读.
// 替代方案:<原方案在重试模块中维护前景来源白名单>
IsForeground bool
// StartTime 首次尝试时间(用于计算总耗时上限)
StartTime time.Time
// MaxDuration 最大总重试时长(0=不限制,仅受 MaxRetries 控制)
// 升华改进(ELEVATED): 仓储 daemon 模式需要 "尽力重试但有上限",
// 不是早期方案的 "无限重试" 也不是 "固定次数后放弃".
MaxDuration time.Duration
// ConsecutiveCounts 按错误类别统计连续出现次数.
// 精妙之处(CLEVER): 由 Retryer 在每次失败后自动维护--
// 连续出现同一类错误递增,出现不同类错误清零.
// ConsecutiveLimit 策略读取此字段做决策.
ConsecutiveCounts map[string]int
// Model 当前使用的模型(用于模型降级策略)
Model string
// FallbackModel 降级模型(用于模型降级策略)
FallbackModel string
// MaxTokensOverride 被溢出修正调整过的 max_tokens(0=未调整)
MaxTokensOverride int
}
RetryContext 是重试的请求上下文.
type RetryDecision ¶
type RetryDecision struct {
// Retry 是否重试
Retry bool
// Delay 建议的等待时间
Delay time.Duration
// Reason 决策原因(可观测性:为什么重试/不重试)
Reason string
}
RetryDecision 是重试决策结果.
type RetryError ¶
type RetryError interface {
Error() string
Category() string
IsRetryable() bool
RetryDelay() time.Duration
Message() string
Headers() http.Header
RetryInfo() *RetryInfo
}
RetryError 是 retry 包对 API 错误的抽象. api.APIError 实现了这个接口,使 retry 包可以处理 api 错误而不需要导入 api 包.
type RetryPolicy ¶
type RetryPolicy interface {
ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
}
RetryPolicy 决定一个失败请求是否应该重试.
func NewAnthropicRetryPolicy ¶
func NewAnthropicRetryPolicy(opts AnthropicRetryOpts) RetryPolicy
NewAnthropicRetryPolicy 创建针对 Anthropic API 优化的组合重试策略.
策略优先级(从高到低):
- ForegroundOnly - 后台 529 直接失败(防容量级联)
- ServerDirective - 服务端说不重试就不重试
- SubscriptionAwareRetry - 订阅用户 429 不重试(等太久)
- FastModeCooldown - 快速模式降级逻辑
- ModelFallback - 连续过载触发模型降级
- ExponentialBackoff - 兜底退避
注意顺序很重要:ForegroundOnly 必须在最前面(高优先级), ExponentialBackoff 必须在最后面(兜底).
type RetryableFunc ¶
type RetryableFunc func(attempt int, rctx *RetryContext) error
RetryableFunc 是可重试的操作函数. attempt 从 1 开始,rctx 包含重试上下文(可能被溢出修正更新过).
type Retryer ¶
type Retryer struct {
// Policy 重试策略
Policy RetryPolicy
// OverflowHandler 溢出修正器(可选)
OverflowHandler *ContextOverflowHandler
// OnRetry 每次重试前的回调(用于可观测性,可选).
// 参数:错误,第几次尝试,等待时间,决策原因.
OnRetry func(err RetryError, attempt int, delay time.Duration, reason string)
// Clock 时间源(可测试,nil 使用 time.Now)
Clock func() time.Time
}
Retryer 执行带重试的操作.
func (*Retryer) Do ¶
func (r *Retryer) Do(ctx context.Context, rctx *RetryContext, fn RetryableFunc) error
Do 执行带重试的操作.
精妙之处(CLEVER): 主循环清晰分层--
- 执行操作
- 成功 → 返回
- 失败 → 提取 APIError
- 尝试溢出修正
- 更新连续计数
- 查询策略
- 等待 → 下一轮
每一步职责明确,不像早期方案 822 行混在一起.
type ServerDirective ¶
type ServerDirective struct{}
ServerDirective 尊重 APIError.RetryInfo.ServerSaid 字段 (来自 x-should-retry header,在 8.1 的 Classifier 中已解析).
精妙之处(CLEVER): 服务端最清楚"这个错误重试有没有意义". 比如 429 限流,服务端说 should-retry: false 可能是因为 窗口级限制要等几小时--自动重试毫无意义.
func (*ServerDirective) ShouldRetry ¶
func (s *ServerDirective) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 服务端明确表态时遵从.
type SubscriptionAwareRetry ¶
type SubscriptionAwareRetry struct {
// IsSubscriber 判断当前用户是否为订阅用户(Pro/Max)
IsSubscriber func() bool
// IsEnterprise 判断当前用户是否为企业用户
IsEnterprise func() bool
}
SubscriptionAwareRetry 根据用户订阅类型调整 429 重试行为.
对应早期方案 L737-741, L765-769:
- Pro/Max 用户 429 通常是窗口级限制(5小时/7天),retry-after 可能是几小时
- Enterprise 用户通常是 PAYG,429 是短暂限流,可以重试
- API Key 用户(非订阅)可以重试
精妙之处(CLEVER): 不是"所有 429 都不重试",而是根据用户类型判断. 订阅用户的 429 等待时间太长(小时级),自动重试毫无意义.
func (*SubscriptionAwareRetry) ShouldRetry ¶
func (s *SubscriptionAwareRetry) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
ShouldRetry 订阅用户遇到 429 不自动重试(除非是企业用户).