retry

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: None detected not legal advice Imports: 0 Imported by: 0

Documentation

Overview

Anthropic 特有重试策略 -- 针对 Anthropic API 的优化子集.

其他供应商无需使用,可通过 NewCompositeRetryPolicy 组合通用策略.

包含:

  • SubscriptionAwareRetry: 订阅用户 429 不自动重试(窗口级限制等待时间太长)
  • FastModeCooldown: 快速模式 429/529 降级到标准速度
  • ModelFallback: 连续过载时触发模型降级信号
  • StaleConnectionRecovery: ECONNRESET/EPIPE 时标记需要重建连接
  • NewAnthropicRetryPolicy: 工厂函数组合出最优策略

注意:这些策略对应早期方案 中的 Anthropic 特有逻辑, 在注释中标注了早期方案的行号和对应关系,方便交叉参考和维护.

通用重试策略 -- 跨行业可用的重试策略实现.

包含:

  • ExponentialBackoff: 指数退避 + 抖动(防 thundering herd)
  • ForegroundOnly: 后台请求不重试 529/过载(防容量级联放大)
  • ConsecutiveLimit: 连续 N 次同类错误后放弃
  • ServerDirective: 尊重服务端的 x-should-retry 指令
  • MaxAttemptsLimit: 最大重试次数限制

这些策略不依赖任何特定供应商,仓储/金融/编程场景通用. Anthropic 特有策略在 anthropic.go 中.

Context Overflow 修正 -- 自动调整 max_tokens 避免超出上下文窗口.

精妙之处(CLEVER): API 返回 "input length and `max_tokens` exceed context limit: 188059 + 20000 > 200000" 时,自动计算安全的 max_tokens 值并注入 RetryContext, 让 Retryer 的下次尝试使用修正后的值.用户完全无感知.

升华改进(ELEVATED): 早期方案把这段逻辑混在 822 行的 withRetry 主循环中. 我们提取为独立模块,可以被 Retryer 或其他组件复用.

替代方案:<原方案嵌在 withRetry 的 catch 块中,与重试/退避/降级代码交织>

重试策略系统 -- 可组合的重试决策框架.

升华改进(ELEVATED): 早期方案 withRetry 是 822 行的单函数巨兽, 把前景/后台分流,快速模式降级,退避计算,模型降级,认证恢复, max_tokens 修正 6 个正交关注点全部塞在一个 for 循环里.

本设计:RetryPolicy 接口 + CompositeRetryPolicy 叠加, 每个策略只关心一个维度,通过组合得到复杂行为. 工厂函数(如 NewAnthropicRetryPolicy)针对特定供应商组合最优策略.

替代方案:<原方案 822 行 AsyncGenerator 包含所有逻辑>

Retryer -- 带重试的操作执行器.

精妙之处(CLEVER): 不是 AsyncGenerator(Go 不需要 yield 心跳), 而是带回调的 Do() 方法.Observer 事件代替 yield 做可观测性.

设计要点:

  • 自动维护 RetryContext 的 ConsecutiveCounts
  • 每次重试前通过 Observer 发 "api_retry" 事件
  • 支持 context.Context 取消
  • ContextOverflowHandler 自动修正 max_tokens

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func QuerySourceFromCtx

func QuerySourceFromCtx(ctx context.Context) string

QuerySourceFromCtx reads the call-site source label from ctx. Returns empty string when not injected -- callers should handle this as "source unknown" rather than error out, so an uninstrumented path still works.

QuerySourceFromCtx 从 ctx 读调用方来源标签. 未注入时返回空串 -- 调用 方应按"来源未知"处理而不报错, 让未插桩路径仍能工作.

func WithQuerySource

func WithQuerySource(ctx context.Context, source string) context.Context

WithQuerySource injects a call-site source label into ctx so the downstream transport Client can fill RetryContext.QuerySource. Callers pass labels like "main_thread" / "compact" / "summary"; the field is free-form so third-party providers can add their own labels.

WithQuerySource 将调用方来源标签注入 ctx, 下游 transport Client 从 中填充 RetryContext.QuerySource. 调用方传 "main_thread" / "compact" / "summary" 等; 字段自由格式, 第三方 provider 可自定义标签.

Types

type AnthropicRetryOpts

type AnthropicRetryOpts struct {
	// MaxRetries 最大重试次数(默认 10)
	MaxRetries int
	// IsSubscriber 是否为订阅用户
	IsSubscriber func() bool
	// IsEnterprise 是否为企业用户
	IsEnterprise func() bool
	// IsFastMode 是否在快速模式
	IsFastMode func() bool
	// OnFastModeCooldown 快速模式冷却回调
	OnFastModeCooldown func(duration time.Duration, reason string)
	// OnFastModeDisable 快速模式永久禁用回调
	OnFastModeDisable func(reason string)
	// FallbackThreshold 触发模型降级的连续 529 次数(默认 3)
	FallbackThreshold int
}

AnthropicRetryOpts 是 Anthropic 重试策略的配置选项.

type CannotRetryError

type CannotRetryError struct {
	// Original 是最后一次尝试的错误
	Original error
	// Attempts 总尝试次数
	Attempts int
	// Reason 放弃重试的原因
	Reason string
}

CannotRetryError 当所有重试耗尽后包装原始错误返回.

func (*CannotRetryError) Error

func (e *CannotRetryError) Error() string

func (*CannotRetryError) Unwrap

func (e *CannotRetryError) Unwrap() error

type CompositeRetryPolicy

type CompositeRetryPolicy struct {
	// contains filtered or unexported fields
}

CompositeRetryPolicy 按顺序组合多个策略. 第一个返回非 nil 决策的策略胜出.

升华改进(ELEVATED): 策略的顺序就是优先级-- 前景/后台分流放在最前(优先级最高),退避兜底放在最后. 运行时可通过 Add 动态追加策略(如仓储场景追加 PLC 重试策略).

func NewCompositeRetryPolicy

func NewCompositeRetryPolicy(policies ...RetryPolicy) *CompositeRetryPolicy

NewCompositeRetryPolicy 创建组合策略.

func (*CompositeRetryPolicy) Add

Add 追加策略(优先级最低).

func (*CompositeRetryPolicy) ShouldRetry

func (c *CompositeRetryPolicy) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 按顺序评估,第一个明确决策的胜出.

type ConsecutiveLimit

type ConsecutiveLimit struct {
	// Category 要追踪的错误类别字符串(如 "server_overload")
	Category string
	// Limit 连续出现次数上限
	Limit int
}

ConsecutiveLimit 在连续 N 次同类错误后放弃重试.

精妙之处(CLEVER): 早期方案硬编码 MAX_529_RETRIES=3,然后触发模型降级. 我们泛化为任意错误类别 + 可配置上限,模型降级由上层或另一个策略负责.

用法示例:连续 3 次 ErrOverloaded → 放弃(上层可以据此触发降级)

func (*ConsecutiveLimit) ShouldRetry

func (c *ConsecutiveLimit) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 检查连续同类错误计数.

type ContextOverflowHandler

type ContextOverflowHandler struct {
	// FloorOutputTokens 输出 token 最低保障(默认 3000)
	// 即使上下文快满了,也至少留这么多 token 给输出.
	// 低于此值意味着没有足够空间生成有用回复,不如直接失败.
	FloorOutputTokens int
	// SafetyBuffer 安全余量(默认 1000)
	// 防止因 token 估算误差导致再次溢出.
	SafetyBuffer int
	// ThinkingBudget 当前 thinking token 预算(0=未启用 thinking)
	// max_tokens 必须能容纳 thinking budget + 至少 1 个 output token.
	ThinkingBudget int
}

ContextOverflowHandler 处理 "input + max_tokens > context limit" 错误, 自动计算安全的 max_tokens 值.

func DefaultOverflowHandler

func DefaultOverflowHandler() *ContextOverflowHandler

DefaultOverflowHandler 返回默认配置.

func (*ContextOverflowHandler) Adjust

func (h *ContextOverflowHandler) Adjust(info *OverflowInfo) (adjustedMax int, ok bool)

Adjust 计算修正后的 max_tokens. 返回修正值和 true,或 0 和 false(表示空间不足,不应重试).

精妙之处(CLEVER): 三层保护--

  1. available = contextLimit - inputTokens - safetyBuffer
  2. 必须 >= FloorOutputTokens(否则回复太短没意义)
  3. 必须 >= thinkingBudget + 1(否则 thinking 无法运行)

type ExponentialBackoff

type ExponentialBackoff struct {
	// BaseDelay 初始退避时间(默认 500ms)
	BaseDelay time.Duration
	// MaxDelay 最大退避时间上限(默认 32s)
	MaxDelay time.Duration
	// MaxRetries 最大重试次数(默认 10)
	MaxRetries int
	// JitterPct 抖动比例(默认 0.25 = 25%)
	JitterPct float64
}

ExponentialBackoff 实现指数退避策略.

精妙之处(CLEVER): 公式 baseDelay * 2^(attempt-1) + random * jitter% * base 经典的退避算法,被所有主流 SDK 采用(AWS/GCP/Azure 都用类似实现). 抖动防止分布式环境下多个客户端同时重试导致 thundering herd.

替代方案:

  • 固定间隔重试:简单但不适应负载波动
  • 线性退避:退避太慢,高负载时效果差
  • 全随机退避:退避不可预测,调试困难

func DefaultExponentialBackoff

func DefaultExponentialBackoff() *ExponentialBackoff

DefaultExponentialBackoff 返回默认配置的指数退避策略.

func (*ExponentialBackoff) ShouldRetry

func (b *ExponentialBackoff) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 如果错误本身可重试且未超过最大次数,返回带退避的重试决策.

type FallbackTriggeredError

type FallbackTriggeredError struct {
	OriginalModel string
	FallbackModel string
	Original      error
}

FallbackTriggeredError 表示触发了模型降级. 上层捕获后可以用 FallbackModel 重新发起请求.

func (*FallbackTriggeredError) Error

func (e *FallbackTriggeredError) Error() string

func (*FallbackTriggeredError) Unwrap

func (e *FallbackTriggeredError) Unwrap() error

type FastModeCooldown

type FastModeCooldown struct {
	// IsFastMode 判断当前是否在快速模式
	IsFastMode func() bool
	// ShortRetryThreshold 短延迟阈值(低于此值保持快速模式)
	ShortRetryThreshold time.Duration
	// CooldownDuration 默认冷却时间(retry-after 未知时使用)
	CooldownDuration time.Duration
	// MinCooldown 最低冷却时间(防止过快切回导致反复抖动)
	MinCooldown time.Duration
	// OnCooldownStart 进入冷却期回调(上层据此切换模型)
	OnCooldownStart func(duration time.Duration, reason string)
	// OnPermanentDisable overage 被拒绝时永久禁用回调
	OnPermanentDisable func(reason string)
}

FastModeCooldown 处理快速模式遇到限流/过载时的降级逻辑.

对应早期方案 L262-314:

  • 短 retry-after(<20s):保持快速模式重试(保 prompt cache)
  • 长 retry-after 或 529:进入冷却期,降级到标准速度
  • overage 被拒绝:永久禁用快速模式

精妙之处(CLEVER): 短延迟保持快速模式是为了保住 prompt cache-- 切换模型名(快速→标准)会导致 cache 失效,得重新计算. 20 秒以内的等待比重建 cache 便宜.

func DefaultFastModeCooldown

func DefaultFastModeCooldown() *FastModeCooldown

DefaultFastModeCooldown 返回默认配置.

func (*FastModeCooldown) ShouldRetry

func (f *FastModeCooldown) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 快速模式下的限流/过载处理.

type ForegroundOnly

type ForegroundOnly struct{}

ForegroundOnly 限制只有前景请求才重试 529/过载错误.

精妙之处(CLEVER): 容量级联时,每次重试是 3-10× 网关放大. 后台请求(标题生成,摘要,分类器)失败了用户看不到, 重试只会让过载更严重.

升华改进(ELEVATED): 早期方案用 FOREGROUND_529_RETRY_SOURCES Set 硬编码 15+ 来源, 每加一种查询源要改重试模块.我们让调用方声明 IsForeground=true/false.

替代方案:<原方案在重试模块维护前景来源白名单 Set>

func (*ForegroundOnly) ShouldRetry

func (f *ForegroundOnly) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 后台请求遇到过载直接放弃.

type MaxAttemptsLimit

type MaxAttemptsLimit struct {
	Max int
}

MaxAttemptsLimit 是独立的最大重试次数策略. 与 ExponentialBackoff.MaxRetries 不同,这是一个独立策略, 可以放在 Composite 的最前面作为硬上限.

func (*MaxAttemptsLimit) ShouldRetry

func (m *MaxAttemptsLimit) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 超过最大次数直接放弃.

type ModelFallback

type ModelFallback struct {
	// ConsecutiveThreshold 触发降级的连续过载次数
	ConsecutiveThreshold int
}

ModelFallback 在连续过载后触发模型降级信号.

对应早期方案 L327-365:

连续 3 次 529 → throw FallbackTriggeredError → 上层切换到 fallbackModel

精妙之处(CLEVER): 不直接切换模型(那是上层产品逻辑的事), 只返回 Retry=false + 特定 Reason,Retryer.Do 据此返回 FallbackTriggeredError. 但这个策略需要与 ConsecutiveLimit 配合--ConsecutiveLimit 先判断次数到了, 本策略再决定是否降级(而非直接失败).

注意:这个策略应该放在 ConsecutiveLimit 之后, 当 ConsecutiveLimit 拒绝重试时,Retryer 检查 FallbackModel 决定是否降级.

func (*ModelFallback) ShouldRetry

func (m *ModelFallback) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 检查是否应触发模型降级.

type OverflowInfo

type OverflowInfo struct {
	InputTokens  int // 实际输入 token 数
	MaxTokens    int // 当前设置的 max_tokens
	ContextLimit int // 模型的上下文窗口上限
}

OverflowInfo 是从错误消息中解析出的溢出信息.

func ParseOverflow

func ParseOverflow(message string) *OverflowInfo

ParseOverflow 从错误消息中提取溢出信息. 返回 nil 表示消息不匹配(不是 context overflow 错误).

type RetryContext

type RetryContext struct {
	// QuerySource labels the caller ("main_thread"/"summary"/"compact"/...)
	// so retry-layer diagnostics can attribute a failed retry to its origin.
	// It is a free-form string (not an enum) so third-party providers and
	// call sites outside pkg/engine can add their own source labels without
	// touching retry-layer types.
	//
	// Wire: callers inject via retry.WithQuerySource(ctx, s); transport
	// Client reads it via retry.QuerySourceFromCtx(ctx) into this field at
	// rctx construction (internal/transport/client.go). Retryer.Do embeds
	// it into CannotRetryError.Reason so an operator watching the error
	// stream sees "policy denied (source=compact)" instead of opaque
	// "policy denied". Future retry policies may branch on this field
	// (e.g. be more aggressive about dropping background sources), but the
	// field is deliberately not yet policy-reading -- diagnostics first,
	// behavior changes only after data.
	//
	// QuerySource 标记请求调用方 ("main_thread"/"summary"/"compact" 等)
	// 让重试层诊断把失败重试归因到来源. 自由字符串 (非枚举), 第三方
	// provider 和 pkg/engine 外的 call site 可以加自定义标签, 不需要碰
	// 重试层类型.
	//
	// Wire: 调用方经 retry.WithQuerySource(ctx, s) 注入; transport Client
	// 经 retry.QuerySourceFromCtx(ctx) 在 rctx 构造时填充此字段
	// (internal/transport/client.go). Retryer.Do 将其嵌入
	// CannotRetryError.Reason, 运维在错误流中看到 "policy denied
	// (source=compact)" 而不是含糊的 "policy denied". 未来重试策略可
	// 按此字段分支 (如对后台来源更激进地放弃), 但当前字段**不**被策略
	// 读 -- 先诊断, 行为变更需数据支撑.
	QuerySource string
	// IsForeground 用户是否在等结果.
	// 精妙之处(CLEVER): 调用方声明式标注,而非重试模块维护白名单.
	// 早期方案用 FOREGROUND_529_RETRY_SOURCES Set 硬编码 15+ 来源--每加一种要改重试模块.
	// 声明式方式:调用方设 IsForeground=true,重试模块只读.
	// 替代方案:<原方案在重试模块中维护前景来源白名单>
	IsForeground bool
	// StartTime 首次尝试时间(用于计算总耗时上限)
	StartTime time.Time
	// MaxDuration 最大总重试时长(0=不限制,仅受 MaxRetries 控制)
	// 升华改进(ELEVATED): 仓储 daemon 模式需要 "尽力重试但有上限",
	// 不是早期方案的 "无限重试" 也不是 "固定次数后放弃".
	MaxDuration time.Duration
	// ConsecutiveCounts 按错误类别统计连续出现次数.
	// 精妙之处(CLEVER): 由 Retryer 在每次失败后自动维护--
	// 连续出现同一类错误递增,出现不同类错误清零.
	// ConsecutiveLimit 策略读取此字段做决策.
	ConsecutiveCounts map[string]int
	// Model 当前使用的模型(用于模型降级策略)
	Model string
	// FallbackModel 降级模型(用于模型降级策略)
	FallbackModel string
	// MaxTokensOverride 被溢出修正调整过的 max_tokens(0=未调整)
	MaxTokensOverride int
}

RetryContext 是重试的请求上下文.

type RetryDecision

type RetryDecision struct {
	// Retry 是否重试
	Retry bool
	// Delay 建议的等待时间
	Delay time.Duration
	// Reason 决策原因(可观测性:为什么重试/不重试)
	Reason string
}

RetryDecision 是重试决策结果.

type RetryError

type RetryError interface {
	Error() string
	Category() string
	IsRetryable() bool
	RetryDelay() time.Duration
	Message() string
	Headers() http.Header
	RetryInfo() *RetryInfo
}

RetryError 是 retry 包对 API 错误的抽象. api.APIError 实现了这个接口,使 retry 包可以处理 api 错误而不需要导入 api 包.

type RetryInfo

type RetryInfo = apierror.RetryInfo

RetryInfo 是 apierror.RetryInfo 的别名,用于 retry.RetryError 接口.

type RetryPolicy

type RetryPolicy interface {
	ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision
}

RetryPolicy 决定一个失败请求是否应该重试.

func NewAnthropicRetryPolicy

func NewAnthropicRetryPolicy(opts AnthropicRetryOpts) RetryPolicy

NewAnthropicRetryPolicy 创建针对 Anthropic API 优化的组合重试策略.

策略优先级(从高到低):

  1. ForegroundOnly - 后台 529 直接失败(防容量级联)
  2. ServerDirective - 服务端说不重试就不重试
  3. SubscriptionAwareRetry - 订阅用户 429 不重试(等太久)
  4. FastModeCooldown - 快速模式降级逻辑
  5. ModelFallback - 连续过载触发模型降级
  6. ExponentialBackoff - 兜底退避

注意顺序很重要:ForegroundOnly 必须在最前面(高优先级), ExponentialBackoff 必须在最后面(兜底).

type RetryableFunc

type RetryableFunc func(attempt int, rctx *RetryContext) error

RetryableFunc 是可重试的操作函数. attempt 从 1 开始,rctx 包含重试上下文(可能被溢出修正更新过).

type Retryer

type Retryer struct {
	// Policy 重试策略
	Policy RetryPolicy
	// OverflowHandler 溢出修正器(可选)
	OverflowHandler *ContextOverflowHandler
	// OnRetry 每次重试前的回调(用于可观测性,可选).
	// 参数:错误,第几次尝试,等待时间,决策原因.
	OnRetry func(err RetryError, attempt int, delay time.Duration, reason string)
	// Clock 时间源(可测试,nil 使用 time.Now)
	Clock func() time.Time
}

Retryer 执行带重试的操作.

func (*Retryer) Do

func (r *Retryer) Do(ctx context.Context, rctx *RetryContext, fn RetryableFunc) error

Do 执行带重试的操作.

精妙之处(CLEVER): 主循环清晰分层--

  1. 执行操作
  2. 成功 → 返回
  3. 失败 → 提取 APIError
  4. 尝试溢出修正
  5. 更新连续计数
  6. 查询策略
  7. 等待 → 下一轮

每一步职责明确,不像早期方案 822 行混在一起.

type ServerDirective

type ServerDirective struct{}

ServerDirective 尊重 APIError.RetryInfo.ServerSaid 字段 (来自 x-should-retry header,在 8.1 的 Classifier 中已解析).

精妙之处(CLEVER): 服务端最清楚"这个错误重试有没有意义". 比如 429 限流,服务端说 should-retry: false 可能是因为 窗口级限制要等几小时--自动重试毫无意义.

func (*ServerDirective) ShouldRetry

func (s *ServerDirective) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 服务端明确表态时遵从.

type SubscriptionAwareRetry

type SubscriptionAwareRetry struct {
	// IsSubscriber 判断当前用户是否为订阅用户(Pro/Max)
	IsSubscriber func() bool
	// IsEnterprise 判断当前用户是否为企业用户
	IsEnterprise func() bool
}

SubscriptionAwareRetry 根据用户订阅类型调整 429 重试行为.

对应早期方案 L737-741, L765-769:

  • Pro/Max 用户 429 通常是窗口级限制(5小时/7天),retry-after 可能是几小时
  • Enterprise 用户通常是 PAYG,429 是短暂限流,可以重试
  • API Key 用户(非订阅)可以重试

精妙之处(CLEVER): 不是"所有 429 都不重试",而是根据用户类型判断. 订阅用户的 429 等待时间太长(小时级),自动重试毫无意义.

func (*SubscriptionAwareRetry) ShouldRetry

func (s *SubscriptionAwareRetry) ShouldRetry(err RetryError, attempt int, ctx *RetryContext) *RetryDecision

ShouldRetry 订阅用户遇到 429 不自动重试(除非是企业用户).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL