// Retryer -- 带重试的操作执行器. // // 精妙之处(CLEVER): 不是 AsyncGenerator(Go 不需要 yield 心跳), // 而是带回调的 Do() 方法.Observer 事件代替 yield 做可观测性. // // 设计要点: // - 自动维护 RetryContext 的 ConsecutiveCounts // - 每次重试前通过 Observer 发 "api_retry" 事件 // - 支持 context.Context 取消 // - ContextOverflowHandler 自动修正 max_tokens package retry import ( "context" "errors" "fmt" "time" ) // ============================================================ // Retryer - 执行器 // ============================================================ // Retryer 执行带重试的操作. type Retryer struct { // Policy 重试策略 Policy RetryPolicy // OverflowHandler 溢出修正器(可选) OverflowHandler *ContextOverflowHandler // OnRetry 每次重试前的回调(用于可观测性,可选). // 参数:错误,第几次尝试,等待时间,决策原因. OnRetry func(err RetryError, attempt int, delay time.Duration, reason string) // Clock 时间源(可测试,nil 使用 time.Now) Clock func() time.Time } // now 返回当前时间. func (r *Retryer) now() time.Time { if r.Clock != nil { return r.Clock() } return time.Now() } // RetryableFunc 是可重试的操作函数. // attempt 从 1 开始,rctx 包含重试上下文(可能被溢出修正更新过). type RetryableFunc func(attempt int, rctx *RetryContext) error // CannotRetryError 当所有重试耗尽后包装原始错误返回. type CannotRetryError struct { // Original 是最后一次尝试的错误 Original error // Attempts 总尝试次数 Attempts int // Reason 放弃重试的原因 Reason string } func (e *CannotRetryError) Error() string { return fmt.Sprintf("cannot retry after %d attempts (%s): %v", e.Attempts, e.Reason, e.Original) } func (e *CannotRetryError) Unwrap() error { return e.Original } // FallbackTriggeredError 表示触发了模型降级. // 上层捕获后可以用 FallbackModel 重新发起请求. type FallbackTriggeredError struct { OriginalModel string FallbackModel string Original error } func (e *FallbackTriggeredError) Error() string { return fmt.Sprintf("model fallback triggered: %s -> %s", e.OriginalModel, e.FallbackModel) } func (e *FallbackTriggeredError) Unwrap() error { return e.Original } // Do 执行带重试的操作. // // 精妙之处(CLEVER): 主循环清晰分层-- // 1. 执行操作 // 2. 成功 → 返回 // 3. 失败 → 提取 APIError // 4. 尝试溢出修正 // 5. 更新连续计数 // 6. 查询策略 // 7. 等待 → 下一轮 // // 每一步职责明确,不像早期方案 822 行混在一起. func (r *Retryer) Do(ctx context.Context, rctx *RetryContext, fn RetryableFunc) error { if rctx.StartTime.IsZero() { rctx.StartTime = r.now() } if rctx.ConsecutiveCounts == nil { rctx.ConsecutiveCounts = make(map[string]int) } var lastErr error // 上限保护:最多尝试 maxAttempts 次(防止策略 bug 导致无限循环) maxAttempts := 100 for attempt := 1; attempt <= maxAttempts; attempt++ { // 检查 context 取消 if cancelErr := ctx.Err(); cancelErr != nil { attempted := attempt - 1 if lastErr != nil { return &CannotRetryError{Original: lastErr, Attempts: attempted, Reason: reasonWithSource("context cancelled", rctx)} } // 第一次尝试前 context 就已取消--返回 CannotRetryError 保证一致性 return &CannotRetryError{Original: cancelErr, Attempts: attempted, Reason: reasonWithSource("context cancelled", rctx)} } // 执行操作 err := fn(attempt, rctx) if err == nil { return nil // 成功 } lastErr = err // 提取 RetryError var rErr RetryError if !errors.As(err, &rErr) { // 非 RetryError,不重试 return err } // 尝试溢出修正 if r.OverflowHandler != nil && rErr.Category() == "invalid_request" { info := ParseOverflow(rErr.Message()) if info != nil { adjusted, ok := r.OverflowHandler.Adjust(info) if ok { rctx.MaxTokensOverride = adjusted continue // 直接重试,不计入退避 } // 空间不足,按正常失败处理 } } // 更新连续计数 updateConsecutiveCounts(rctx, rErr.Category()) // 查询策略(nil Policy = 不重试,首次失败即返回) if r.Policy == nil { return err } decision := r.Policy.ShouldRetry(rErr, attempt, rctx) if decision == nil || !decision.Retry { reason := "policy denied" if decision != nil { reason = decision.Reason } // 检查是否是模型降级信号 if decision != nil && len(decision.Reason) > 15 && decision.Reason[:15] == "model_fallback:" { return &FallbackTriggeredError{ OriginalModel: rctx.Model, FallbackModel: decision.Reason[15:], Original: err, } } return &CannotRetryError{Original: err, Attempts: attempt, Reason: reasonWithSource(reason, rctx)} } // 通知观察者 if r.OnRetry != nil { r.OnRetry(rErr, attempt, decision.Delay, decision.Reason) } // 等待 if decision.Delay > 0 { select { case <-ctx.Done(): return &CannotRetryError{Original: err, Attempts: attempt, Reason: reasonWithSource("context cancelled during backoff", rctx)} case <-time.After(decision.Delay): } } } return &CannotRetryError{Original: lastErr, Attempts: maxAttempts, Reason: reasonWithSource("max safety limit reached", rctx)} } // reasonWithSource annotates a CannotRetryError reason with the // QuerySource from the RetryContext so operators reading the error stream // can attribute a failure to its call site ("main_thread" vs "compact" // vs "summary"). If either rctx is nil or the source label is unset, // returns the plain reason. // // This is the read site that makes RetryContext.QuerySource load-bearing // (rather than a silent write-only field) -- runtime diagnostics carry // the source forward to whoever catches the error. // // reasonWithSource 将 RetryContext 里的 QuerySource 附加到 CannotRetryError // 的 reason, 运维看错误流时可把失败归因到 call site ("main_thread" vs // "compact" vs "summary"). rctx 为 nil 或 source 标签为空时返回原始 reason. // // 这是让 RetryContext.QuerySource 承载 (而非静默 write-only 字段) 的读 // 位置 -- 运行时诊断把 source 带给捕获错误的调用方. func reasonWithSource(reason string, rctx *RetryContext) string { if rctx == nil || rctx.QuerySource == "" { return reason } return fmt.Sprintf("%s (source=%s)", reason, rctx.QuerySource) } // updateConsecutiveCounts 更新连续错误计数. // 精妙之处(CLEVER): 同类错误递增,不同类错误清零所有计数. // 这样 "529, 529, 429, 529" 中 529 的连续计数是 1(而非 3), // 因为中间被 429 打断了. func updateConsecutiveCounts(rctx *RetryContext, category string) { // 清零非当前类别的计数 for cat := range rctx.ConsecutiveCounts { if cat != category { delete(rctx.ConsecutiveCounts, cat) } } rctx.ConsecutiveCounts[category]++ }