// response_reflector_emit_test.go — runLoop 6.5 段 ResponseValidatedEvent // emit 实证. ADR-0008 v2 § 引擎层反射器契约 follow-up (2026-05-02): 反射器 // 每次调用都 emit ResponseValidatedEvent (PASS 与 Fail 一视同仁), 让消费 // 者从事件流直接观察反射器行为, 不必从代码路径推断. r31 v4 round 1 实测 // log 0 行反射器痕迹 (PASS 完全静默) 暴露此 gap, 此测试堵 emit 通路. // // response_reflector_emit_test.go — runLoop §6.5 ResponseValidatedEvent // emit verification. package engine import ( "context" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" "git.flytoex.net/yuanwei/flyto-agent/pkg/validator" ) // sequencedReflector returns a verdict from a preset sequence on each // Validate call. Used to drive the engine through PASS / Fail / Fail-at-cap // paths deterministically without needing a real LLM-backed validator. // // sequencedReflector 按预设序列在每次 Validate 调用时返一个 verdict. // 让测试不调真 LLM 也能驱动引擎走 PASS / Fail / 触底三条路径. type sequencedReflector struct { validator.StructuralMarker name string verdicts []validator.Verdict calls int } func (s *sequencedReflector) Name() string { return s.name } func (s *sequencedReflector) Validate(_ context.Context, _ validator.DiffInput) (validator.Verdict, error) { if s.calls >= len(s.verdicts) { // Out-of-script defaults to Approved=true so a misconfigured test // fails loudly via missing event count rather than silent loop. return validator.Verdict{Approved: true, ValidatorName: s.name}, nil } v := s.verdicts[s.calls] s.calls++ return v, nil } // drainEvents reads all events from ch (or until ctx cancels). Used to // collect everything emitted by a single engine.Run for assertion. // // drainEvents 把 ch 上事件读到关 (或 ctx 取消), 给单次 engine.Run 收完 // 全部 emit 给后续断言. func drainEvents(ctx context.Context, ch <-chan Event) []Event { var out []Event for { select { case <-ctx.Done(): return out case ev, ok := <-ch: if !ok { return out } out = append(out, ev) } } } // countResponseValidated returns the slice of *flyto.ResponseValidatedEvent // from a drained event list. // // countResponseValidated 从 drained 事件列里筛 *flyto.ResponseValidatedEvent. func countResponseValidated(events []Event) []*flyto.ResponseValidatedEvent { var out []*flyto.ResponseValidatedEvent for _, e := range events { if rv, ok := e.(*flyto.ResponseValidatedEvent); ok { out = append(out, rv) } } return out } // TestResponseReflector_EmitsValidatedEvent_OnPass verifies the happy // path: reflector returns Approved=true on first try, engine emits // exactly one ResponseValidatedEvent with Approved=true, BlockCount=0. // This is the gap that r31 v4 round 1 surfaced — log 0 lines about the // reflector despite it having run. // // TestResponseReflector_EmitsValidatedEvent_OnPass 验证 happy path: // 反射器一次过 (Approved=true), 引擎 emit 唯一一个 // ResponseValidatedEvent{Approved=true, BlockCount=0}. 这就是 r31 v4 // round 1 没痕迹的反射器实际跑过现在能直接观测的 gap. func TestResponseReflector_EmitsValidatedEvent_OnPass(t *testing.T) { cfg := testConfig() cfg.Toolset = tools.None() cfg.ResponseReflector = &sequencedReflector{ name: "test_reflector", verdicts: []validator.Verdict{{Approved: true, ValidatorName: "test_reflector"}}, } cfg.ResponseReflectorMaxBlocks = 3 cfg.Provider = &scriptedProvider{ events: []flyto.Event{ &flyto.TextEvent{Text: "final answer"}, &flyto.UsageEvent{StopReason: "end_turn", InputTokens: 1, OutputTokens: 1}, }, } eng, err := New(cfg) if err != nil { t.Fatalf("New: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() events := drainEvents(ctx, eng.Run(ctx, "hi")) rvs := countResponseValidated(events) if len(rvs) != 1 { t.Fatalf("expected 1 ResponseValidatedEvent on PASS, got %d", len(rvs)) } if !rvs[0].Approved { t.Errorf("Approved: got false, want true") } if rvs[0].BlockCount != 0 { t.Errorf("BlockCount: got %d, want 0 (first-try pass)", rvs[0].BlockCount) } if rvs[0].MaxBlocks != 3 { t.Errorf("MaxBlocks: got %d, want 3 (cfg pass-through)", rvs[0].MaxBlocks) } if rvs[0].ValidatorName != "test_reflector" { t.Errorf("ValidatorName: got %q, want %q", rvs[0].ValidatorName, "test_reflector") } if rvs[0].Reason != "" { t.Errorf("Reason: got %q, want empty (PASS has no violations)", rvs[0].Reason) } } // TestResponseReflector_EmitsValidatedEvent_OnFailThenPass verifies // mid-turn correction: reflector rejects first call, accepts second. // Engine should emit two ResponseValidatedEvents (Fail then Pass) with // BlockCount progression 1 → 1 (post-increment in Fail, no increment in // PASS). After Fail, engine injects user message and continues; provider // re-streams, reflector PASS this time, turn loop ends cleanly. // // TestResponseReflector_EmitsValidatedEvent_OnFailThenPass 验证 turn // 内自纠路径: 反射器第一次 reject 第二次 pass. 引擎 emit 两个 // ResponseValidatedEvent (Fail / Pass), BlockCount 进度 1 → 1 // (Fail 时 post-increment, PASS 时不增). Fail 后引擎注 user message // 继续, provider 再次 Stream, 反射器二次 PASS, turn loop 干净退出. func TestResponseReflector_EmitsValidatedEvent_OnFailThenPass(t *testing.T) { cfg := testConfig() cfg.Toolset = tools.None() cfg.ResponseReflector = &sequencedReflector{ name: "test_reflector", verdicts: []validator.Verdict{ {Approved: false, ValidatorName: "test_reflector", Reason: "schema field missing"}, {Approved: true, ValidatorName: "test_reflector"}, }, } cfg.ResponseReflectorMaxBlocks = 3 cfg.Provider = &scriptedProvider{ events: []flyto.Event{ &flyto.TextEvent{Text: "first answer"}, &flyto.UsageEvent{StopReason: "end_turn", InputTokens: 1, OutputTokens: 1}, }, } eng, err := New(cfg) if err != nil { t.Fatalf("New: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() events := drainEvents(ctx, eng.Run(ctx, "hi")) rvs := countResponseValidated(events) if len(rvs) != 2 { t.Fatalf("expected 2 ResponseValidatedEvents (Fail then Pass), got %d", len(rvs)) } // First: Fail with BlockCount=1 (post-increment). if rvs[0].Approved { t.Errorf("event[0].Approved: got true, want false (first call should reject)") } if rvs[0].BlockCount != 1 { t.Errorf("event[0].BlockCount: got %d, want 1 (post-increment)", rvs[0].BlockCount) } if rvs[0].Reason != "schema field missing" { t.Errorf("event[0].Reason: got %q, want %q", rvs[0].Reason, "schema field missing") } // Second: Pass with BlockCount=1 (carries over from Fail, no further increment). if !rvs[1].Approved { t.Errorf("event[1].Approved: got false, want true (second call should pass)") } if rvs[1].BlockCount != 1 { t.Errorf("event[1].BlockCount: got %d, want 1 (no increment on PASS)", rvs[1].BlockCount) } if rvs[1].Reason != "" { t.Errorf("event[1].Reason: got %q, want empty (PASS has no violations)", rvs[1].Reason) } } // TestResponseReflector_EmitsValidatedEvent_AtCap verifies the graceful // break path: reflector rejects every call, engine breaks at MaxBlocks. // Should emit exactly MaxBlocks ResponseValidatedEvents (all Fail) plus // the response_reflector_max_blocks WarningEvent. BlockCount progression // 1 → 2 → 3 (each post-increment). // // TestResponseReflector_EmitsValidatedEvent_AtCap 验证触底 graceful // break: 反射器永远 reject, 引擎在 MaxBlocks 触底 break. 应 emit 恰好 // MaxBlocks 个 ResponseValidatedEvent (全 Fail) + response_reflector_max_blocks // WarningEvent. BlockCount 进度 1 → 2 → 3 (各 post-increment). func TestResponseReflector_EmitsValidatedEvent_AtCap(t *testing.T) { maxBlocks := 3 verdicts := make([]validator.Verdict, maxBlocks) for i := range verdicts { verdicts[i] = validator.Verdict{ Approved: false, ValidatorName: "test_reflector", Reason: "still failing schema", } } cfg := testConfig() cfg.Toolset = tools.None() cfg.ResponseReflector = &sequencedReflector{ name: "test_reflector", verdicts: verdicts, } cfg.ResponseReflectorMaxBlocks = maxBlocks cfg.MaxTurns = maxBlocks + 1 // give turn loop room beyond cap to confirm cap-not-MaxTurns-broke cfg.Provider = &scriptedProvider{ events: []flyto.Event{ &flyto.TextEvent{Text: "still wrong"}, &flyto.UsageEvent{StopReason: "end_turn", InputTokens: 1, OutputTokens: 1}, }, } eng, err := New(cfg) if err != nil { t.Fatalf("New: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() events := drainEvents(ctx, eng.Run(ctx, "hi")) rvs := countResponseValidated(events) if len(rvs) != maxBlocks { t.Fatalf("expected %d ResponseValidatedEvents at cap, got %d", maxBlocks, len(rvs)) } for i, rv := range rvs { if rv.Approved { t.Errorf("event[%d].Approved: got true, want false (all should reject at cap path)", i) } if rv.BlockCount != i+1 { t.Errorf("event[%d].BlockCount: got %d, want %d (1-indexed post-increment)", i, rv.BlockCount, i+1) } } // Cap-hit must also emit response_reflector_max_blocks WarningEvent. var sawMaxBlocks bool for _, e := range events { if w, ok := e.(*WarningEvent); ok && w.Code == "response_reflector_max_blocks" { sawMaxBlocks = true break } } if !sawMaxBlocks { t.Error("expected WarningEvent code=response_reflector_max_blocks at cap, not found") } }