package api import ( "context" "sync/atomic" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // ============================================================ // StreamStats 测试 // ============================================================ func TestStreamStats_IsEmpty(t *testing.T) { // 完全空流 s := &StreamStats{} if !s.IsEmpty() { t.Error("no content, no usage should be empty") } // 有内容但没有 UsageEvent(不算 empty,算 incomplete) s2 := &StreamStats{HasContent: true} if s2.IsEmpty() { t.Error("with content should not be empty") } // 有 UsageEvent(UsageEvent 也设置 HasContent) s3 := &StreamStats{HasContent: true, HasUsage: true} if s3.IsEmpty() { t.Error("with usage should not be empty") } } func TestStreamStats_IsIncomplete(t *testing.T) { // 有内容但没有 UsageEvent s := &StreamStats{HasContent: true} if !s.IsIncomplete() { t.Error("content without usage should be incomplete") } // 有内容 + UsageEvent(正常结束) s2 := &StreamStats{HasContent: true, HasUsage: true} if s2.IsIncomplete() { t.Error("with usage should not be incomplete") } // 完全空流(不算 incomplete,算 empty) s3 := &StreamStats{} if s3.IsIncomplete() { t.Error("empty stream should not be incomplete (it's empty)") } } // ============================================================ // StreamGuard 正常流测试 // ============================================================ func TestStreamGuard_NormalFlow(t *testing.T) { cfg := &StreamGuardConfig{ IdleTimeout: 5 * time.Second, StallThreshold: 1 * time.Second, } guard := NewStreamGuard(cfg) // 精妙之处(CLEVER): flyto.Event 是接口,channel 元素类型也是接口-- // 发送方使用 *flyto.TextDeltaEvent 等具体类型,接收方通过类型断言区分. // 这与 wire.ParseAnthropicStream 产出的事件类型完全一致. rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "hel"} rawCh <- &flyto.TextDeltaEvent{Text: "lo"} rawCh <- &flyto.TextEvent{Text: "hello"} rawCh <- &flyto.UsageEvent{ InputTokens: 100, OutputTokens: 20, StopReason: "end_turn", } close(rawCh) }() guardedCh := guard.Watch(context.Background(), rawCh) var events []flyto.Event for evt := range guardedCh { events = append(events, evt) } // 4 个事件,无额外错误事件 if len(events) != 4 { t.Errorf("expected 4 events, got %d", len(events)) for i, evt := range events { t.Logf(" event[%d]: %T", i, evt) } } stats := guard.Stats() if !stats.HasContent { t.Error("should track HasContent") } if !stats.HasUsage { t.Error("should track HasUsage (UsageEvent received)") } if stats.StopReason != "end_turn" { t.Errorf("StopReason = %q, want 'end_turn'", stats.StopReason) } if stats.ContentBlockCount != 1 { t.Errorf("ContentBlockCount = %d, want 1 (one TextEvent)", stats.ContentBlockCount) } if stats.EventCount != 4 { t.Errorf("EventCount = %d, want 4", stats.EventCount) } if stats.StallCount != 0 { t.Errorf("StallCount = %d, want 0", stats.StallCount) } if stats.IdleAborted { t.Error("should not be idle aborted") } } // ============================================================ // 空响应检测测试 // ============================================================ func TestStreamGuard_EmptyResponse(t *testing.T) { guard := NewStreamGuard(DefaultStreamGuardConfig()) rawCh := make(chan flyto.Event, 10) close(rawCh) // 立即关闭--无任何事件 guardedCh := guard.Watch(context.Background(), rawCh) var events []flyto.Event for evt := range guardedCh { events = append(events, evt) } // 应该收到一个 stream_empty 错误事件 if len(events) != 1 { t.Fatalf("expected 1 error event, got %d", len(events)) } errEvt, ok := events[0].(*flyto.ErrorEvent) if !ok { t.Fatalf("expected *flyto.ErrorEvent, got %T", events[0]) } if errEvt.Code != "stream_empty" { t.Errorf("expected code 'stream_empty', got %q", errEvt.Code) } if errEvt.Err == nil || errEvt.Err.Error() == "" { t.Error("error event should have non-empty Err") } stats := guard.Stats() if !stats.IsEmpty() { t.Error("stats should report empty") } } // ============================================================ // 部分流检测测试 // ============================================================ func TestStreamGuard_IncompleteStream(t *testing.T) { guard := NewStreamGuard(DefaultStreamGuardConfig()) rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "partial..."} // 没有 UsageEvent close(rawCh) }() guardedCh := guard.Watch(context.Background(), rawCh) var events []flyto.Event for evt := range guardedCh { events = append(events, evt) } // 1 个正常事件 + 1 个 stream_truncated 错误事件 if len(events) != 2 { t.Fatalf("expected 2 events (1 normal + 1 error), got %d", len(events)) } lastEvt, ok := events[len(events)-1].(*flyto.ErrorEvent) if !ok { t.Fatalf("last event should be *flyto.ErrorEvent, got %T", events[len(events)-1]) } if lastEvt.Code != "stream_truncated" { t.Errorf("expected code 'stream_truncated', got %q", lastEvt.Code) } stats := guard.Stats() if !stats.IsIncomplete() { t.Error("stats should report incomplete") } if stats.HasUsage { t.Error("should not have HasUsage") } } // ============================================================ // 合法空响应(有 UsageEvent 但无内容 block) // ============================================================ func TestStreamGuard_LegitEmptyResponse(t *testing.T) { guard := NewStreamGuard(DefaultStreamGuardConfig()) rawCh := make(chan flyto.Event, 10) go func() { // 精妙之处(CLEVER): UsageEvent 同时设置 HasContent=true 和 HasUsage=true-- // 这代表模型正常结束但没有产生任何文字(如工具调用后的纯 stop). // IsEmpty() = false, IsIncomplete() = false,正确! rawCh <- &flyto.UsageEvent{StopReason: "end_turn"} close(rawCh) }() guardedCh := guard.Watch(context.Background(), rawCh) var events []flyto.Event for evt := range guardedCh { events = append(events, evt) } // 1 个事件,无错误(这是合法的响应) if len(events) != 1 { t.Fatalf("expected 1 event (no error for legit empty), got %d", len(events)) } stats := guard.Stats() if stats.IsEmpty() { t.Error("has UsageEvent, should not be empty") } if stats.IsIncomplete() { t.Error("has UsageEvent, should not be incomplete") } } // ============================================================ // 空闲看门狗测试 // ============================================================ func TestStreamGuard_IdleTimeout(t *testing.T) { var timedOut int32 cfg := &StreamGuardConfig{ IdleTimeout: 200 * time.Millisecond, IdleWarningAt: 100 * time.Millisecond, StallThreshold: 50 * time.Millisecond, OnIdleTimeout: func() { atomic.AddInt32(&timedOut, 1) }, } guard := NewStreamGuard(cfg) rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "x"} // 之后不再发事件,等待超时 time.Sleep(500 * time.Millisecond) close(rawCh) }() guardedCh := guard.Watch(context.Background(), rawCh) var events []flyto.Event for evt := range guardedCh { events = append(events, evt) } // 应该有 TextDeltaEvent + idle timeout error(可能还有 incomplete error) hasIdleError := false for _, evt := range events { if e, ok := evt.(*flyto.ErrorEvent); ok && containsStr(e.Err.Error(), "idle timeout") { hasIdleError = true } } if !hasIdleError { t.Error("should have idle timeout error event") for i, evt := range events { t.Logf(" event[%d]: %T %v", i, evt, evt) } } if atomic.LoadInt32(&timedOut) != 1 { t.Error("OnIdleTimeout callback should have fired") } stats := guard.Stats() if !stats.IdleAborted { t.Error("stats should report idle aborted") } } func TestStreamGuard_IdleResetOnEvent(t *testing.T) { cfg := &StreamGuardConfig{ IdleTimeout: 300 * time.Millisecond, StallThreshold: 1 * time.Second, } guard := NewStreamGuard(cfg) rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "start"} // 每 100ms 发一个事件,总共 5 个--每次重置看门狗 for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) rawCh <- &flyto.TextDeltaEvent{Text: "chunk"} } rawCh <- &flyto.UsageEvent{StopReason: "end_turn"} close(rawCh) }() guardedCh := guard.Watch(context.Background(), rawCh) var events []flyto.Event for evt := range guardedCh { events = append(events, evt) } // 不应有超时错误(每次事件都重置了看门狗) for _, evt := range events { if e, ok := evt.(*flyto.ErrorEvent); ok { t.Errorf("should not have error events, got: %v", e.Err) } } stats := guard.Stats() if stats.IdleAborted { t.Error("should not be idle aborted") } } // ============================================================ // 停顿检测测试 // ============================================================ func TestStreamGuard_StallDetection(t *testing.T) { var stallCount int32 cfg := &StreamGuardConfig{ IdleTimeout: 5 * time.Second, StallThreshold: 100 * time.Millisecond, OnStall: func(gap time.Duration, count int, total time.Duration) { atomic.AddInt32(&stallCount, 1) }, } guard := NewStreamGuard(cfg) rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "x"} time.Sleep(200 * time.Millisecond) // > 100ms 阈值 rawCh <- &flyto.TextDeltaEvent{Text: "after stall"} rawCh <- &flyto.UsageEvent{StopReason: "end_turn"} close(rawCh) }() guardedCh := guard.Watch(context.Background(), rawCh) for range guardedCh { } if atomic.LoadInt32(&stallCount) != 1 { t.Errorf("expected 1 stall, got %d", atomic.LoadInt32(&stallCount)) } stats := guard.Stats() if stats.StallCount != 1 { t.Errorf("StallCount = %d, want 1", stats.StallCount) } if stats.TotalStallTime < 100*time.Millisecond { t.Errorf("TotalStallTime = %v, want >= 100ms", stats.TotalStallTime) } } // ============================================================ // OnStreamEnd 回调测试 // ============================================================ func TestStreamGuard_OnStreamEnd(t *testing.T) { var endStats *StreamStats cfg := &StreamGuardConfig{ IdleTimeout: 5 * time.Second, StallThreshold: 1 * time.Second, OnStreamEnd: func(s *StreamStats) { endStats = s }, } guard := NewStreamGuard(cfg) rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "x"} rawCh <- &flyto.UsageEvent{StopReason: "end_turn"} close(rawCh) }() for range guard.Watch(context.Background(), rawCh) { } if endStats == nil { t.Fatal("OnStreamEnd should have been called") } if endStats.EventCount != 2 { t.Errorf("EventCount = %d, want 2", endStats.EventCount) } if endStats.Duration <= 0 { t.Error("Duration should be positive") } } // ============================================================ // Context 取消测试 // ============================================================ func TestStreamGuard_ContextCancel(t *testing.T) { guard := NewStreamGuard(DefaultStreamGuardConfig()) rawCh := make(chan flyto.Event, 10) ctx, cancel := context.WithCancel(context.Background()) go func() { rawCh <- &flyto.TextDeltaEvent{Text: "x"} time.Sleep(50 * time.Millisecond) cancel() // 取消 context time.Sleep(100 * time.Millisecond) close(rawCh) }() guardedCh := guard.Watch(ctx, rawCh) var count int for range guardedCh { count++ } // 至少收到 TextDeltaEvent if count < 1 { t.Error("should receive at least 1 event before cancel") } } // ============================================================ // 多 content block 追踪测试 // ============================================================ func TestStreamGuard_MultipleContentBlocks(t *testing.T) { guard := NewStreamGuard(DefaultStreamGuardConfig()) rawCh := make(chan flyto.Event, 20) go func() { // Block 0: text (TextDeltaEvent + TextEvent) rawCh <- &flyto.TextDeltaEvent{Text: "hello"} rawCh <- &flyto.TextEvent{Text: "hello"} // ContentBlockCount++ // Block 1: tool_use rawCh <- &flyto.ToolUseEvent{ ID: "toolu_01", ToolName: "read_file", Input: map[string]any{"file": "test.go"}, } // ContentBlockCount++ rawCh <- &flyto.UsageEvent{StopReason: "tool_use"} close(rawCh) }() for range guard.Watch(context.Background(), rawCh) { } stats := guard.Stats() if stats.ContentBlockCount != 2 { t.Errorf("ContentBlockCount = %d, want 2", stats.ContentBlockCount) } if stats.StopReason != "tool_use" { t.Errorf("StopReason = %q, want 'tool_use'", stats.StopReason) } } // ============================================================ // Thinking block 测试 // ============================================================ func TestStreamGuard_ThinkingBlock(t *testing.T) { guard := NewStreamGuard(DefaultStreamGuardConfig()) rawCh := make(chan flyto.Event, 10) go func() { rawCh <- &flyto.ThinkingDeltaEvent{Text: "Let me think..."} rawCh <- &flyto.ThinkingEvent{Text: "Let me think... OK."} rawCh <- &flyto.TextDeltaEvent{Text: "Answer"} rawCh <- &flyto.TextEvent{Text: "Answer"} // ContentBlockCount++ rawCh <- &flyto.UsageEvent{StopReason: "end_turn"} close(rawCh) }() for range guard.Watch(context.Background(), rawCh) { } stats := guard.Stats() if !stats.HasContent { t.Error("should have content (ThinkingDeltaEvent sets HasContent)") } if !stats.HasUsage { t.Error("should have usage") } // ThinkingEvent 不增加 ContentBlockCount(只有 TextEvent/ToolUseEvent 增加) if stats.ContentBlockCount != 1 { t.Errorf("ContentBlockCount = %d, want 1 (only TextEvent counts)", stats.ContentBlockCount) } } // ============================================================ // DefaultStreamGuardConfig 测试 // ============================================================ func TestDefaultStreamGuardConfig(t *testing.T) { cfg := DefaultStreamGuardConfig() if cfg.IdleTimeout != 90*time.Second { t.Errorf("IdleTimeout = %v, want 90s", cfg.IdleTimeout) } if cfg.StallThreshold != 30*time.Second { t.Errorf("StallThreshold = %v, want 30s", cfg.StallThreshold) } } // ============================================================ // 辅助 // ============================================================ func containsStr(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false }