// session_failloud_test.go - trackEvents 取消路径 fail-loud 单测. // quote-engine-probe r15/r16/r17 silent-close 修复回归. // // session_failloud_test.go - trackEvents cancel-path fail-loud unit // tests. Regression for the silent-close bug seen by quote-engine-probe // runs r15/r16/r17 (sub-agent ctx 600s deadline expired mid-stream, // trackEvents drained DoneEvent silently, orchestrator main agent then // fired "操作已取消" with no usable upstream signal). package engine import ( "context" "errors" "testing" "time" ) // TestFailLoudCancel_EmitsCurrentAndMarker verifies that failLoudCancel // pushes both the in-flight current event and the cancel marker into a // buffered outCh. // // 验 failLoudCancel 在 outCh 有空间时把当前 evt + marker 都推入. func TestFailLoudCancel_EmitsCurrentAndMarker(t *testing.T) { s := &Session{} outCh := make(chan Event, 5) cur := &TextEvent{Text: "in-flight"} marker := &ErrorEvent{Code: "ctx_done_test"} s.failLoudCancel(cur, outCh, marker) if got := len(outCh); got != 2 { t.Fatalf("expected 2 events in outCh, got %d", got) } first := <-outCh if first != Event(cur) { t.Errorf("first event should be the in-flight current evt, got %T", first) } second, ok := (<-outCh).(*ErrorEvent) if !ok { t.Fatalf("second event should be ErrorEvent") } if second.Code != "ctx_done_test" { t.Errorf("second event Code = %q, want %q", second.Code, "ctx_done_test") } } // TestFailLoudCancel_NoBlockOnFullChan verifies failLoudCancel does // NOT block when outCh has no room and no reader. Drop on saturation // is intentional (consumer-not-reading is a separate failure mode). // // 验 failLoudCancel 在 outCh 满 + 无读者时不阻塞 (饱和丢弃是设计行为). func TestFailLoudCancel_NoBlockOnFullChan(t *testing.T) { s := &Session{} outCh := make(chan Event) // unbuffered, no reader done := make(chan struct{}) go func() { s.failLoudCancel(&TextEvent{Text: "x"}, outCh, &ErrorEvent{Code: "y"}) close(done) }() select { case <-done: // expected: returned without blocking case <-time.After(100 * time.Millisecond): t.Fatal("failLoudCancel blocked when outCh has no reader/space") } } // TestTrackEvents_CtxCancel_EmitsErrorEvent verifies that when ctx is // cancelled while events are being forwarded, the consumer sees a // definitive ErrorEvent rather than a silent close. Go select is // pseudo-random when multiple cases are ready (forward to outCh has // room AND ctx.Done is signalled), so we run iterations -- with 50/50 // per-iteration odds, 12 iterations gives < 1/4096 false-pass rate. // // 验 ctx 在事件转发期间取消时, 消费层看到 ErrorEvent 而非 silent close. // Go select 多 case 同时 ready 时伪随机, 12 次迭代单次 false-pass < 1/4096. func TestTrackEvents_CtxCancel_EmitsErrorEvent(t *testing.T) { const iterations = 12 sawError := false for i := 0; i < iterations; i++ { s := newSession("ctx-cancel-test", &fakeSessionEngineRef{}) rawCh := make(chan Event, 1) rawCh <- &TextEvent{Text: "before-cancel"} close(rawCh) outCh := make(chan Event, 5) ctx, cancel := context.WithCancel(context.Background()) cancel() // pre-cancel so select sees ctx.Done ready s.trackEvents(ctx, "p", rawCh, outCh) for ev := range outCh { if errEv, ok := ev.(*ErrorEvent); ok { if !errors.Is(errEv.Err, context.Canceled) { t.Errorf("iter %d: ErrorEvent.Err = %v, want context.Canceled", i, errEv.Err) } sawError = true } } if sawError { break } } if !sawError { t.Errorf("ctx.Done branch never fired ErrorEvent in %d iterations", iterations) } } // TestTrackEvents_SessionClose_EmitsErrorEvent verifies the parallel // fail-loud path on Session.Close(): consumer sees ErrorEvent with // session_closed code rather than silent outCh close. // // 验 Session.Close() 路径同样 fail-loud, ErrorEvent code=session_closed. func TestTrackEvents_SessionClose_EmitsErrorEvent(t *testing.T) { const iterations = 12 sawError := false for i := 0; i < iterations; i++ { s := newSession("close-test", &fakeSessionEngineRef{}) rawCh := make(chan Event, 1) rawCh <- &TextEvent{Text: "before-close"} close(rawCh) outCh := make(chan Event, 5) // pre-close session so select sees s.done ready. We close // s.done directly (rather than s.Close()) to avoid the // observer.Event() call inside Close() -- fakeSessionEngineRef // returns nil for Observer() and the test only needs the // s.done broadcast that trackEvents listens on. close(s.done) s.closed = true s.trackEvents(context.Background(), "p", rawCh, outCh) for ev := range outCh { if errEv, ok := ev.(*ErrorEvent); ok { if errEv.Code != string(ErrSessionClosed) { t.Errorf("iter %d: ErrorEvent.Code = %q, want %q", i, errEv.Code, string(ErrSessionClosed)) } sawError = true } } if sawError { break } } if !sawError { t.Errorf("s.done branch never fired ErrorEvent in %d iterations", iterations) } } // TestDrainRawEvents_DrainsFully verifies drainRawEvents consumes all // remaining events in rawCh until close, allowing the upstream // producer to exit without blocking. // // 验 drainRawEvents 把 rawCh 里 buffer 全消费完直到 close, 让上游 // 生产 goroutine 正常退出. func TestDrainRawEvents_DrainsFully(t *testing.T) { rawCh := make(chan Event, 5) for i := 0; i < 3; i++ { rawCh <- &TextEvent{Text: "x"} } close(rawCh) done := make(chan struct{}) go func() { drainRawEvents(rawCh) close(done) }() select { case <-done: case <-time.After(100 * time.Millisecond): t.Fatal("drainRawEvents did not return after rawCh closed") } }