package wire import ( "context" "io" "strings" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // sseReadCloser wraps a string as io.ReadCloser for SSE parsing tests. type sseReadCloser struct { io.Reader } func (s *sseReadCloser) Close() error { return nil } func newSSE(data string) io.ReadCloser { return &sseReadCloser{Reader: strings.NewReader(data)} } // collectEvents drains the event channel with a timeout. func collectEvents(ch <-chan flyto.Event) []flyto.Event { var events []flyto.Event timeout := time.After(5 * time.Second) for { select { case evt, ok := <-ch: if !ok { return events } events = append(events, evt) case <-timeout: return events } } } func TestParseAnthropicStream_TextOnly(t *testing.T) { // Simulate a simple text-only Anthropic response. sse := `event: message_start data: {"type":"message_start","message":{"usage":{"input_tokens":100,"output_tokens":0}}} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"text"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}} event: content_block_stop data: {"type":"content_block_stop","index":0} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":5}} event: message_stop data: {"type":"message_stop"} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) // Expect: TextDelta("Hello"), TextDelta(" world"), Text("Hello world"), Usage var textDeltas []string var fullText string var usage *flyto.UsageEvent for _, evt := range events { switch e := evt.(type) { case *flyto.TextDeltaEvent: textDeltas = append(textDeltas, e.Text) case *flyto.TextEvent: fullText = e.Text case *flyto.UsageEvent: usage = e } } if len(textDeltas) != 2 || textDeltas[0] != "Hello" || textDeltas[1] != " world" { t.Errorf("text deltas = %v, want [Hello, \" world\"]", textDeltas) } if fullText != "Hello world" { t.Errorf("full text = %q, want %q", fullText, "Hello world") } if usage == nil { t.Fatal("no UsageEvent received") } if usage.InputTokens != 100 { t.Errorf("InputTokens = %d, want 100", usage.InputTokens) } if usage.OutputTokens != 5 { t.Errorf("OutputTokens = %d, want 5", usage.OutputTokens) } if usage.StopReason != "end_turn" { t.Errorf("StopReason = %q, want %q", usage.StopReason, "end_turn") } } func TestParseAnthropicStream_ToolUse(t *testing.T) { sse := `event: message_start data: {"type":"message_start","message":{"usage":{"input_tokens":50,"output_tokens":0}}} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"tool_abc","name":"Bash"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"command\":"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"\"ls\"}"}} event: content_block_stop data: {"type":"content_block_stop","index":0} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":10}} event: message_stop data: {"type":"message_stop"} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) var toolEvent *flyto.ToolUseEvent for _, evt := range events { if te, ok := evt.(*flyto.ToolUseEvent); ok { toolEvent = te } } if toolEvent == nil { t.Fatal("no ToolUseEvent received") } if toolEvent.ID != "tool_abc" { t.Errorf("ToolUseEvent.ID = %q, want %q", toolEvent.ID, "tool_abc") } if toolEvent.ToolName != "Bash" { t.Errorf("ToolUseEvent.ToolName = %q, want %q", toolEvent.ToolName, "Bash") } if cmd, ok := toolEvent.Input["command"]; !ok || cmd != "ls" { t.Errorf("ToolUseEvent.Input = %v, want {command: ls}", toolEvent.Input) } } func TestParseAnthropicStream_Thinking(t *testing.T) { sse := `event: message_start data: {"type":"message_start","message":{"usage":{"input_tokens":200,"output_tokens":0}}} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"Let me think..."}} event: content_block_stop data: {"type":"content_block_stop","index":0} event: content_block_start data: {"type":"content_block_start","index":1,"content_block":{"type":"text"}} event: content_block_delta data: {"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"Answer"}} event: content_block_stop data: {"type":"content_block_stop","index":1} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":20}} event: message_stop data: {"type":"message_stop"} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) var thinkingDelta, textDelta string var thinkingFull, textFull string for _, evt := range events { switch e := evt.(type) { case *flyto.ThinkingDeltaEvent: thinkingDelta += e.Text case *flyto.ThinkingEvent: thinkingFull = e.Text case *flyto.TextDeltaEvent: textDelta += e.Text case *flyto.TextEvent: textFull = e.Text } } if thinkingDelta != "Let me think..." { t.Errorf("thinking delta = %q", thinkingDelta) } if thinkingFull != "Let me think..." { t.Errorf("thinking full = %q", thinkingFull) } if textDelta != "Answer" { t.Errorf("text delta = %q", textDelta) } if textFull != "Answer" { t.Errorf("text full = %q", textFull) } } func TestParseAnthropicStream_CacheTokens(t *testing.T) { // cache tokens should only come from message_start, not message_delta. sse := `event: message_start data: {"type":"message_start","message":{"usage":{"input_tokens":1000,"output_tokens":0,"cache_read_input_tokens":500,"cache_creation_input_tokens":200}}} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":50}} event: message_stop data: {"type":"message_stop"} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) var usage *flyto.UsageEvent for _, evt := range events { if u, ok := evt.(*flyto.UsageEvent); ok { usage = u } } if usage == nil { t.Fatal("no UsageEvent") } if usage.CacheReadTokens != 500 { t.Errorf("CacheReadTokens = %d, want 500", usage.CacheReadTokens) } if usage.CacheCreationTokens != 200 { t.Errorf("CacheCreationTokens = %d, want 200", usage.CacheCreationTokens) } } func TestParseAnthropicStream_ToolUseInvalidJSON(t *testing.T) { // Truncated JSON in tool_use input should still produce a ToolUseEvent with empty input. sse := `event: message_start data: {"type":"message_start","message":{"usage":{"input_tokens":10,"output_tokens":0}}} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"t1","name":"Read"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"file\":"}} event: content_block_stop data: {"type":"content_block_stop","index":0} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":5}} event: message_stop data: {"type":"message_stop"} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) var toolEvent *flyto.ToolUseEvent for _, evt := range events { if te, ok := evt.(*flyto.ToolUseEvent); ok { toolEvent = te } } if toolEvent == nil { t.Fatal("no ToolUseEvent for invalid JSON") } // Input should be empty map (not nil) due to invalid JSON if len(toolEvent.Input) != 0 { t.Errorf("Input should be empty for invalid JSON, got %v", toolEvent.Input) } } func TestParseAnthropicStream_ErrorEvent(t *testing.T) { sse := `event: error data: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) var errEvent *flyto.ErrorEvent for _, evt := range events { if e, ok := evt.(*flyto.ErrorEvent); ok { errEvent = e } } if errEvent == nil { t.Fatal("no ErrorEvent received") } if !errEvent.Retryable { t.Error("API error events should be retryable") } } func TestParseAnthropicStream_Ping(t *testing.T) { // ping events should be silently ignored. sse := `event: ping data: {} event: message_start data: {"type":"message_start","message":{"usage":{"input_tokens":10,"output_tokens":0}}} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":1}} event: message_stop data: {"type":"message_stop"} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) // Should only have UsageEvent, no ping-related events. for _, evt := range events { if evt.EventType() == "ping" { t.Error("ping should not produce an event") } } // At least UsageEvent should be present. if len(events) == 0 { t.Error("expected at least one event") } } func TestParseAnthropicStream_ContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // cancel immediately ch := ParseAnthropicStream(ctx, newSSE("event: message_start\ndata: {}\n\n")) events := collectEvents(ch) // Channel should close quickly with no or few events. _ = events // just verify no hang } func TestParseAnthropicStream_ParseError(t *testing.T) { // Invalid JSON in data should produce an ErrorEvent. sse := `event: message_start data: {invalid json} ` ch := ParseAnthropicStream(context.Background(), newSSE(sse)) events := collectEvents(ch) var errEvent *flyto.ErrorEvent for _, evt := range events { if e, ok := evt.(*flyto.ErrorEvent); ok { errEvent = e } } if errEvent == nil { t.Fatal("expected ErrorEvent for invalid JSON") } if errEvent.Code != "parse_error" { t.Errorf("error code = %q, want %q", errEvent.Code, "parse_error") } }