package engine // subagent_forward_test.go — 父引擎事件转发路径端到端测试. // // 验证 sa.runLoop 经 ctx-embedded EventEmitter 向父引擎 Run channel 转发: // - SubAgentStartEvent 在首个业务事件前 emit, 带正确 ID / Description / // Cwd / Model / StartTime // - 每个业务事件包装为 SubAgentEvent (SubAgentID + Inner) 上送 // - SubAgentEndEvent 在所有退出路径 (cancel / error / maxTurns / normal) // 都 emit, 带正确 Duration / Status / Result / Error // - SilentEvents=true 时不 emit 任何 lifecycle / 包装事件 // - 并发多个 SubAgent 时 SubAgentID 不串 // // 走 "ctx 预取消" 最小路径避免 mock LLM provider: sa.runLoop 进入 for-select // 后立刻命中 ctx.Done 分支, pushEvt ErrorEvent + defer emit End. 这条路径 // 不碰 provider, 用最少依赖验证 forwarding 链路完整性. // // subagent_forward_test.go — end-to-end test for parent-engine event // forwarding via ctx-embedded EventEmitter. // // Verifies sa.runLoop: // - Emits SubAgentStartEvent before any business event, with correct // ID / Description / Cwd / Model / StartTime. // - Wraps each business event as SubAgentEvent (SubAgentID + Inner). // - Emits SubAgentEndEvent from every exit path (cancel / error / // maxTurns / normal) with correct Duration / Status / Result / Error. // - Skips all lifecycle / wrapping events when SilentEvents=true. // - Keeps SubAgentID separate across concurrent sub-agents. // // Uses the "pre-cancelled ctx" minimal path to avoid mocking an LLM // provider: sa.runLoop's for-select hits the ctx.Done branch immediately, // pushEvt sends an ErrorEvent, and the defer emits End. This path never // touches the provider — minimal dependency surface for verifying the // forwarding chain is intact. import ( "context" "encoding/json" "sync" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // newForwardTestEngine 构造最小 *Engine 足以让 SpawnSubAgent + sa.runLoop // 跑到 cancel 分支. 不需要 provider / model registry 因为 pre-cancel 路径 // 不 touch API. newForwardTestEngine builds a minimal *Engine just // sufficient to run SpawnSubAgent + sa.runLoop into the cancel branch. No // provider / model registry needed because the pre-cancel path never // touches the API. func newForwardTestEngine(t *testing.T, cwd string) *Engine { t.Helper() cfg := testConfig() cfg.Cwd = cwd return &Engine{ cfg: cfg, tools: tools.NewRegistry(), observer: &NoopObserver{}, } } // captureEmitter 返回一个 EventEmitter + 指向 captured events 切片的指针, // 供测试断言 emit 顺序和内容. 加锁保护跨 goroutine 的并发 append. // captureEmitter returns an EventEmitter and a pointer to the captured // events slice, letting tests assert emit order and content. Locks guard // concurrent append from multiple goroutines. func captureEmitter() (EventEmitter, func() []Event) { var mu sync.Mutex var events []Event emit := func(evt Event) { mu.Lock() events = append(events, evt) mu.Unlock() } snapshot := func() []Event { mu.Lock() defer mu.Unlock() out := make([]Event, len(events)) copy(out, events) return out } return emit, snapshot } // TestSubAgent_Forward_StartEvent_Populates — sub-claim: Start event 字段 // 与 SpawnSubAgent 构造参数对应一致. 断言 SubAgentID 非空, Description / // Cwd / Model 与 cfg 对应, StartTime 非零. func TestSubAgent_Forward_StartEvent_Populates(t *testing.T) { cwd := t.TempDir() parent := newForwardTestEngine(t, cwd) sa := SpawnSubAgent(parent, &SubAgentConfig{ Description: "explore-files", Model: "claude-haiku-4-5-20251001", }) emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() // 预取消: runLoop 进入 for-select 后立即走 ctx.Done 分支 ctx = WithEventEmitter(ctx, emit) // 走 sa.Run 让 runLoop 在 goroutine 内跑完 defer events := sa.Run(ctx, "任务提示") for range events { // drain sa 自己的 channel, 但我们只关心 emit 的转发路径 } captured := snapshot() if len(captured) == 0 { t.Fatalf("expected at least SubAgentStart and SubAgentEnd, got 0 events") } startEvt, ok := captured[0].(*SubAgentStartEvent) if !ok { t.Fatalf("expected captured[0] = *SubAgentStartEvent, got %T (%s)", captured[0], captured[0].EventType()) } if startEvt.SubAgentID == "" { t.Errorf("SubAgentStartEvent.SubAgentID 空串") } if startEvt.Description != "explore-files" { t.Errorf("Description=%q, want %q", startEvt.Description, "explore-files") } if startEvt.Cwd != cwd { t.Errorf("Cwd=%q, want %q", startEvt.Cwd, cwd) } if startEvt.Model != "claude-haiku-4-5-20251001" { t.Errorf("Model=%q, want claude-haiku-4-5-20251001", startEvt.Model) } if startEvt.StartTime.IsZero() { t.Errorf("StartTime 未填值") } } // TestSubAgent_Forward_EndEvent_Populates — sub-claim: End event 在取消 // 路径 emit, 带 Cancelled status + Duration > 0 + SubAgentID 与 Start // 匹配. func TestSubAgent_Forward_EndEvent_Populates(t *testing.T) { parent := newForwardTestEngine(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{Description: "t"}) emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() ctx = WithEventEmitter(ctx, emit) events := sa.Run(ctx, "prompt") for range events { } captured := snapshot() if len(captured) < 2 { t.Fatalf("expected at least Start + End, got %d events", len(captured)) } // End 事件应是最后一个 endEvt, ok := captured[len(captured)-1].(*SubAgentEndEvent) if !ok { t.Fatalf("expected last = *SubAgentEndEvent, got %T", captured[len(captured)-1]) } startEvt := captured[0].(*SubAgentStartEvent) if endEvt.SubAgentID != startEvt.SubAgentID { t.Errorf("End.SubAgentID=%q 与 Start.SubAgentID=%q 不一致", endEvt.SubAgentID, startEvt.SubAgentID) } if endEvt.Status != SubAgentStatusCancelled { t.Errorf("Status=%v, want Cancelled (pre-cancel 路径)", endEvt.Status) } if endEvt.Duration <= 0 { t.Errorf("Duration=%v, want > 0", endEvt.Duration) } _ = endEvt.Result // 允许空串 (取消前无文本); 字段存在即可读 _ = endEvt.Error // 允许空串 (取消不一定产生 sa.Error) } // TestSubAgent_Forward_InnerEvents_Wrapped — sub-claim: 中间业务事件 (此 // 路径只有 ErrorEvent 从 ctx.Done 分支 pushEvt) 被包装为 SubAgentEvent, // SubAgentID 与 Start 一致, Inner 是 *ErrorEvent. func TestSubAgent_Forward_InnerEvents_Wrapped(t *testing.T) { parent := newForwardTestEngine(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{Description: "t"}) emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() ctx = WithEventEmitter(ctx, emit) events := sa.Run(ctx, "prompt") for range events { } captured := snapshot() // 寻找 SubAgentEvent 包装器 var wrappers []*SubAgentEvent for _, e := range captured { if w, ok := e.(*SubAgentEvent); ok { wrappers = append(wrappers, w) } } if len(wrappers) == 0 { t.Fatalf("expected at least 1 SubAgentEvent wrapper, got 0; all events: %v", eventTypes(captured)) } // 取消路径应至少有一个 ErrorEvent 包装 var sawError bool for _, w := range wrappers { if w.SubAgentID != sa.ID { t.Errorf("wrapper SubAgentID=%q, want sa.ID=%q", w.SubAgentID, sa.ID) } if _, isErr := w.Inner.(*ErrorEvent); isErr { sawError = true } } if !sawError { t.Errorf("没看到 ErrorEvent 包装 (取消路径应 push 一个)") } } // TestSubAgent_Forward_SilentEvents_NoEmit — sub-claim: SilentEvents=true // 时 emit 不被调用 (记忆提取路径). func TestSubAgent_Forward_SilentEvents_NoEmit(t *testing.T) { parent := newForwardTestEngine(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{ Description: "silent-memory-extraction", SilentEvents: true, }) emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() ctx = WithEventEmitter(ctx, emit) events := sa.Run(ctx, "prompt") for range events { } captured := snapshot() if len(captured) != 0 { t.Errorf("SilentEvents=true 时不该 emit, 实际收到 %d 个: %v", len(captured), eventTypes(captured)) } } // TestSubAgent_Forward_NoEmitter_NoPanic — sub-claim: ctx 无 emitter 时 // runLoop 正常退出 (不 panic). 模拟 runMemoryExtraction 下 rootCtx 场景. func TestSubAgent_Forward_NoEmitter_NoPanic(t *testing.T) { parent := newForwardTestEngine(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{Description: "no-emitter"}) ctx, cancel := context.WithCancel(context.Background()) cancel() // 不 WithEventEmitter events := sa.Run(ctx, "prompt") for range events { } // 只要没 panic 就 pass } // TestSubAgent_Forward_ConcurrentSubAgents_IDsSeparate — sub-claim: 并发 // 2 个子 agent 时 emit 捕获的 SubAgentID 和 Start/End 字段不串. func TestSubAgent_Forward_ConcurrentSubAgents_IDsSeparate(t *testing.T) { parent := newForwardTestEngine(t, t.TempDir()) emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() ctx = WithEventEmitter(ctx, emit) var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func(idx int) { defer wg.Done() sa := SpawnSubAgent(parent, &SubAgentConfig{ Description: "worker-" + string(rune('0'+idx)), }) events := sa.Run(ctx, "prompt") for range events { } }(i) } wg.Wait() // 等一下让 defer emit 完成 time.Sleep(20 * time.Millisecond) captured := snapshot() // 收集所有 Start 事件 startIDs := map[string]string{} for _, e := range captured { if s, ok := e.(*SubAgentStartEvent); ok { startIDs[s.SubAgentID] = s.Description } } if len(startIDs) != 2 { t.Errorf("expected 2 distinct SubAgentIDs, got %d: %v", len(startIDs), startIDs) } // 验证每个 Start 都有对应 End 且 SubAgentID 匹配 (不错配) endIDs := map[string]bool{} for _, e := range captured { if endEvt, ok := e.(*SubAgentEndEvent); ok { endIDs[endEvt.SubAgentID] = true } } for id := range startIDs { if !endIDs[id] { t.Errorf("SubAgentID %q 有 Start 但无 End (ID 串了或漏 emit)", id) } } } // TestTruncateSubAgentResult — sub-claim: 超出 subAgentResultMaxBytes 后 // 截断 + 加 "..." 后缀; 以内原样返回. func TestTruncateSubAgentResult(t *testing.T) { short := "short result" if got := truncateSubAgentResult(short); got != short { t.Errorf("短串被截断: %q", got) } long := make([]byte, subAgentResultMaxBytes+100) for i := range long { long[i] = 'a' } got := truncateSubAgentResult(string(long)) want := string(long[:subAgentResultMaxBytes]) + "..." if got != want { t.Errorf("长串截断错: len=%d want len=%d", len(got), len(want)) } } // TestEventEmitter_WithAndFromContext — sub-claim: WithEventEmitter 设置 // 的 emitter 能通过 EventEmitterFromContext 取回; nil emit 不污染 ctx; // 未设置时 FromContext 返回 nil. func TestEventEmitter_WithAndFromContext(t *testing.T) { ctx := context.Background() if EventEmitterFromContext(ctx) != nil { t.Errorf("空 ctx 应返回 nil") } // nil emit 保持原 ctx ctx2 := WithEventEmitter(ctx, nil) if EventEmitterFromContext(ctx2) != nil { t.Errorf("nil emit 不该写入 ctx") } var called bool emit := func(evt Event) { called = true } ctx3 := WithEventEmitter(ctx, emit) got := EventEmitterFromContext(ctx3) if got == nil { t.Fatalf("FromContext 返回 nil, 应返回已注册的 emit") } got(&ErrorEvent{}) // 通过闭包验证确实是同一 emit if !called { t.Errorf("取回的 emitter 调用后 called=false, 说明不是同一函数") } } // newForwardTestEngineWithObserver 构造带 MockObserver 的 *Engine, 用于 // 验证 observer fallback 路径 (后台子 agent ctx 无 emitter 时走 observer). // newForwardTestEngineWithObserver builds a *Engine with MockObserver for // testing the observer fallback path (background sub-agents where ctx has // no emitter). func newForwardTestEngineWithObserver(t *testing.T, cwd string) (*Engine, *MockObserver) { t.Helper() cfg := testConfig() cfg.Cwd = cwd obs := &MockObserver{} return &Engine{ cfg: cfg, tools: tools.NewRegistry(), observer: obs, }, obs } // TestSubAgent_Forward_ObserverFallback_Start — sub-claim: ctx 无 emitter // 但父 engine 有 observer 时, Start 走 observer.Event("subagent_start", ...) // 带完整字段 (subagent_id / description / cwd / model / start_time_ms). // 覆盖后台子 agent 路径 (RunBackground 的 bgCtx 无 emitter). func TestSubAgent_Forward_ObserverFallback_Start(t *testing.T) { parent, obs := newForwardTestEngineWithObserver(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{ Description: "bg-task", Model: "claude-haiku-4-5-20251001", }) ctx, cancel := context.WithCancel(context.Background()) cancel() // 不 WithEventEmitter — 模拟后台路径 events := sa.Run(ctx, "prompt") for range events { } // 找 subagent_start var startEvt *MockEvent for i := range obs.Events { if obs.Events[i].Name == "subagent_start" { startEvt = &obs.Events[i] break } } if startEvt == nil { t.Fatalf("observer 未收到 subagent_start, 实际事件: %v", eventNames(obs.Events)) } if startEvt.Data["subagent_id"] != sa.ID { t.Errorf("subagent_id=%v, want %q", startEvt.Data["subagent_id"], sa.ID) } if startEvt.Data["description"] != "bg-task" { t.Errorf("description=%v, want bg-task", startEvt.Data["description"]) } if startEvt.Data["model"] != "claude-haiku-4-5-20251001" { t.Errorf("model=%v", startEvt.Data["model"]) } if _, ok := startEvt.Data["start_time_ms"]; !ok { t.Errorf("start_time_ms 缺失: %v", startEvt.Data) } } // TestSubAgent_Forward_ObserverFallback_End — sub-claim: ctx 无 emitter // 时 End 走 observer.Event("subagent_end", ...) 带 duration_ms / status / // subagent_id. func TestSubAgent_Forward_ObserverFallback_End(t *testing.T) { parent, obs := newForwardTestEngineWithObserver(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{Description: "t"}) ctx, cancel := context.WithCancel(context.Background()) cancel() events := sa.Run(ctx, "prompt") for range events { } var endEvt *MockEvent for i := range obs.Events { if obs.Events[i].Name == "subagent_end" { endEvt = &obs.Events[i] } } if endEvt == nil { t.Fatalf("observer 未收到 subagent_end, 实际: %v", eventNames(obs.Events)) } if endEvt.Data["subagent_id"] != sa.ID { t.Errorf("subagent_id=%v, want %q", endEvt.Data["subagent_id"], sa.ID) } if endEvt.Data["status"] != string(SubAgentStatusCancelled) { t.Errorf("status=%v, want cancelled", endEvt.Data["status"]) } if _, ok := endEvt.Data["duration_ms"]; !ok { t.Errorf("duration_ms 缺失") } } // TestSubAgent_Forward_ObserverFallback_InnerEvent — sub-claim: ctx 无 // emitter 时内层业务事件 (ErrorEvent from ctx.Done) 走 observer.Event( // "subagent_event", {subagent_id, event_type}) — lossy metadata-only 形态. func TestSubAgent_Forward_ObserverFallback_InnerEvent(t *testing.T) { parent, obs := newForwardTestEngineWithObserver(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{Description: "t"}) ctx, cancel := context.WithCancel(context.Background()) cancel() events := sa.Run(ctx, "prompt") for range events { } var innerEvt *MockEvent for i := range obs.Events { if obs.Events[i].Name == "subagent_event" { innerEvt = &obs.Events[i] } } if innerEvt == nil { t.Fatalf("observer 未收到 subagent_event, 实际: %v", eventNames(obs.Events)) } if innerEvt.Data["subagent_id"] != sa.ID { t.Errorf("subagent_id=%v", innerEvt.Data["subagent_id"]) } if innerEvt.Data["event_type"] != "error" { t.Errorf("event_type=%v, want error (ctx.Done 路径应 pushEvt ErrorEvent)", innerEvt.Data["event_type"]) } } // TestSubAgent_Forward_ChannelPreferredOverObserver — sub-claim: 两路都可 // 用时 channel 优先, observer 不被调用 (避免同一事件双发). func TestSubAgent_Forward_ChannelPreferredOverObserver(t *testing.T) { parent, obs := newForwardTestEngineWithObserver(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{Description: "t"}) emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() ctx = WithEventEmitter(ctx, emit) events := sa.Run(ctx, "prompt") for range events { } // Channel 应收到事件 captured := snapshot() if len(captured) == 0 { t.Fatalf("channel 路径没收到事件") } // Observer 不应收到 subagent_* 事件 (可能收到其他引擎内部事件, 但没子 // agent 转发). 只断言 subagent_start / subagent_end / subagent_event // 三个名字完全不出现. for _, e := range obs.Events { if e.Name == "subagent_start" || e.Name == "subagent_end" || e.Name == "subagent_event" { t.Errorf("channel 路径下不该走 observer fallback, 但收到 %q", e.Name) } } } // TestSubAgent_Forward_SilentEvents_ObserverAlsoSkipped — sub-claim: // SilentEvents=true 时即便父 engine 有 observer 也不 emit (记忆提取 // 场景: observer 通常有但这条路径刻意不污染). func TestSubAgent_Forward_SilentEvents_ObserverAlsoSkipped(t *testing.T) { parent, obs := newForwardTestEngineWithObserver(t, t.TempDir()) sa := SpawnSubAgent(parent, &SubAgentConfig{ Description: "silent", SilentEvents: true, }) ctx, cancel := context.WithCancel(context.Background()) cancel() events := sa.Run(ctx, "prompt") for range events { } for _, e := range obs.Events { if e.Name == "subagent_start" || e.Name == "subagent_end" || e.Name == "subagent_event" { t.Errorf("SilentEvents=true 时 observer 不该收到 %q", e.Name) } } } // eventNames 是 MockObserver.Events 的事件名切片, 诊断输出用. // eventNames is the event name slice of MockObserver.Events for diagnostics. func eventNames(events []MockEvent) []string { out := make([]string, len(events)) for i, e := range events { out[i] = e.Name } return out } // eventTypes 提取事件类型名切片, 用于断言失败时的诊断输出. // eventTypes extracts the event type names for diagnostic output on // assertion failure. func eventTypes(events []Event) []string { out := make([]string, len(events)) for i, e := range events { out[i] = e.EventType() } // Encoded via json to make diagnostic string compact (use fmt fallback // if marshalling fails, but types are []string so always succeeds). _, _ = json.Marshal(out) return out }