// SSE 流状态守卫 -- 边界情况检测 + 空闲看门狗 + 停顿诊断. // // 精妙之处(CLEVER): parseAnthropicSSE 是裸解析器(收到什么推什么),StreamGuard 叠加 // 状态追踪和边界防御.两层职责清晰--解析器负责"解析正确", // StreamGuard 负责"检测异常". // // 覆盖的生产边界情况: // 1. 空响应:200 OK 但无 SSE 事件(代理故障,返回 HTML/空体) // 2. 部分流:有内容事件但未收到 UsageEvent(网络中断) // 3. 空闲挂起:流中间长时间无数据(代理/防火墙静默断开) // 4. 停顿:两次事件间隔过长(网络抖动诊断) // 5. Scanner 错误:行过长或 I/O 错误 // // 升华改进(ELEVATED): 从 StreamEvent(Anthropic 专有中间类型)改为 flyto.Event-- // StreamGuard 现在对所有 provider 通用,不绑定 Anthropic 语义. // 替代方案:<每个 provider 实现自己的流守卫> - 否决:重复代码, // 且 Gemini / OpenAI 的可靠性检测逻辑与 Anthropic 完全相同. package api import ( "context" "errors" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // ============================================================ // StreamGuardConfig - 守卫配置 // ============================================================ // StreamGuardConfig 配置流守卫的行为. type StreamGuardConfig struct { // IdleTimeout 空闲超时:流中间多久无事件视为挂起(默认 90s). // 精妙之处(CLEVER): SDK 的 request timeout 只管初始连接, // 不管流中间断了.没有看门狗,静默断开的连接会让进程永远挂起. IdleTimeout time.Duration // IdleWarningAt 空闲警告时间点(默认 IdleTimeout/2). IdleWarningAt time.Duration // StallThreshold 停顿阈值(默认 30s). StallThreshold time.Duration // OnIdleWarning 空闲警告回调(可选) OnIdleWarning func(elapsed time.Duration) // OnIdleTimeout 空闲超时回调(可选) OnIdleTimeout func() // OnStall 停顿检测回调(可选) OnStall func(gap time.Duration, stallCount int, totalStallTime time.Duration) // OnStreamEnd 流结束回调(可选) OnStreamEnd func(stats *StreamStats) } // DefaultStreamGuardConfig 返回默认配置. func DefaultStreamGuardConfig() *StreamGuardConfig { return &StreamGuardConfig{ IdleTimeout: 90 * time.Second, StallThreshold: 30 * time.Second, } } // ============================================================ // StreamStats - 流统计信息 // ============================================================ // StreamStats 是流结束时的统计信息. type StreamStats struct { // HasContent 是否收到过内容事件(TextDelta / ToolUse / ThinkingDelta) HasContent bool // HasUsage 是否收到 UsageEvent(流正常结束的标志) HasUsage bool // StopReason 最终的 stop_reason(来自 UsageEvent) StopReason string // ContentBlockCount 完成的内容块数量(Text + ToolUse) ContentBlockCount int // EventCount 总事件数量 EventCount int // StallCount 停顿次数 StallCount int // TotalStallTime 累计停顿时间 TotalStallTime time.Duration // Duration 流的总持续时间 Duration time.Duration // IdleAborted 是否因空闲超时被中止 IdleAborted bool } // IsEmpty 检查流是否为空响应(200 但无实质事件). func (s *StreamStats) IsEmpty() bool { return !s.HasContent && !s.HasUsage } // IsIncomplete 检查流是否不完整(有内容但未正常结束). func (s *StreamStats) IsIncomplete() bool { return s.HasContent && !s.HasUsage } // ============================================================ // StreamGuard - 流守卫 // ============================================================ // StreamGuard 包装 flyto.Event channel,添加边界检测和空闲看门狗. type StreamGuard struct { config *StreamGuardConfig stats StreamStats mu sync.Mutex lastEventTime time.Time startTime time.Time cancelIdle context.CancelFunc } // NewStreamGuard 创建流守卫. func NewStreamGuard(cfg *StreamGuardConfig) *StreamGuard { if cfg == nil { cfg = DefaultStreamGuardConfig() } if cfg.IdleTimeout <= 0 { cfg.IdleTimeout = 90 * time.Second } if cfg.IdleWarningAt <= 0 { cfg.IdleWarningAt = cfg.IdleTimeout / 2 } if cfg.StallThreshold <= 0 { cfg.StallThreshold = 30 * time.Second } return &StreamGuard{config: cfg} } // Watch 包装原始 flyto.Event channel,返回加守卫的新 channel. func (g *StreamGuard) Watch(ctx context.Context, rawCh <-chan flyto.Event) <-chan flyto.Event { g.startTime = time.Now() g.lastEventTime = g.startTime guardedCh := make(chan flyto.Event, cap(rawCh)) idleCtx, idleCancel := context.WithCancel(ctx) g.cancelIdle = idleCancel var wgIdle sync.WaitGroup wgIdle.Add(1) go func() { defer wgIdle.Done() g.idleWatchdog(idleCtx, guardedCh) }() go func() { defer func() { idleCancel() wgIdle.Wait() close(guardedCh) }() for evt := range rawCh { now := time.Now() g.mu.Lock() gap := now.Sub(g.lastEventTime) g.lastEventTime = now g.mu.Unlock() if g.stats.EventCount > 0 && gap > g.config.StallThreshold { g.stats.StallCount++ g.stats.TotalStallTime += gap if g.config.OnStall != nil { g.config.OnStall(gap, g.stats.StallCount, g.stats.TotalStallTime) } } g.stats.EventCount++ g.trackState(evt) select { case guardedCh <- evt: case <-ctx.Done(): return } } g.stats.Duration = time.Since(g.startTime) // 空响应检测 if g.stats.IsEmpty() { select { case guardedCh <- &flyto.ErrorEvent{ Err: errors.New("stream ended without any events (possible proxy failure or non-SSE response)"), Code: "stream_empty", Retryable: true, }: case <-ctx.Done(): } } // 部分流检测 if g.stats.IsIncomplete() { select { case guardedCh <- &flyto.ErrorEvent{ Err: errors.New("stream ended without UsageEvent (incomplete response, possible network interruption)"), Code: "stream_truncated", Retryable: true, }: case <-ctx.Done(): } } if g.config.OnStreamEnd != nil { g.config.OnStreamEnd(&g.stats) } }() return guardedCh } // Stats 返回流统计信息. func (g *StreamGuard) Stats() *StreamStats { return &g.stats } // trackState 根据 flyto.Event 类型更新流状态. func (g *StreamGuard) trackState(evt flyto.Event) { switch e := evt.(type) { case *flyto.TextDeltaEvent: g.stats.HasContent = true case *flyto.ThinkingDeltaEvent: g.stats.HasContent = true case *flyto.ToolUseEvent: g.stats.HasContent = true g.stats.ContentBlockCount++ case *flyto.TextEvent: g.stats.ContentBlockCount++ case *flyto.UsageEvent: g.stats.HasUsage = true g.stats.HasContent = true // 收到 UsageEvent 说明至少完成了一轮 g.stats.StopReason = e.StopReason } } // idleWatchdog 空闲看门狗 goroutine. func (g *StreamGuard) idleWatchdog(ctx context.Context, ch chan<- flyto.Event) { warningDuration := g.config.IdleWarningAt remainingDuration := g.config.IdleTimeout - warningDuration if remainingDuration <= 0 { remainingDuration = g.config.IdleTimeout warningDuration = 0 } warningFired := false if warningDuration > 0 { timer := time.NewTimer(warningDuration) defer timer.Stop() for { select { case <-ctx.Done(): return case <-timer.C: g.mu.Lock() elapsed := time.Since(g.lastEventTime) g.mu.Unlock() if elapsed >= warningDuration { warningFired = true if g.config.OnIdleWarning != nil { g.config.OnIdleWarning(elapsed) } } else { timer.Reset(warningDuration - elapsed) continue } } if warningFired { break } } } timer2 := time.NewTimer(remainingDuration) defer timer2.Stop() for { select { case <-ctx.Done(): return case <-timer2.C: g.mu.Lock() elapsed := time.Since(g.lastEventTime) g.mu.Unlock() if elapsed >= g.config.IdleTimeout { g.stats.IdleAborted = true if g.config.OnIdleTimeout != nil { g.config.OnIdleTimeout() } select { case ch <- &flyto.ErrorEvent{ Err: errors.New("stream idle timeout: no events received for " + g.config.IdleTimeout.String()), Code: "stream_idle_timeout", Retryable: true, }: case <-ctx.Done(): } return } // elapsed < g.config.IdleTimeout 由上方 if 保证(否则已 return), // 所以 remaining > 0 是不变式,无需防御性检查. // 精妙之处(CLEVER): 在 case <-timer2.C: 分支内调用 Reset 是安全的-- // channel 已在 select 中被接收(drain),符合 Go timer 规范. // 替代方案: - 否决:此处 drain 已完成,额外操作是噪音. remaining := g.config.IdleTimeout - elapsed timer2.Reset(remaining) } } }