// team_test.go 测试 Team 协调器的核心逻辑. // // 测试策略: // - RunWorkers 依赖真实 Engine(需要 API),只测纯逻辑部分 // - enqueueTeamNotification / drainTeamNotifications 是纯内存操作,完整测试 // - PermissionHandler nil = 自动批准,不需要实现接口就能测试 // - WorkerSpec 验证逻辑 // // 不测: // - 真实并发 Worker 执行(需要 API key,属于集成测试) package engine import ( "context" "fmt" "strings" "sync" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/inbox" "git.flytoex.net/yuanwei/flyto-agent/pkg/permission" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // recordingPermissionHandler captures the request payload from a Leader-side // HandlePermissionRequest call and returns canned (approve / updatedInput / reason). // Used by end-to-end permission-bubble tests to assert the full payload reaches // the consumer-layer handler unmangled. // // recordingPermissionHandler 捕获 Leader 侧 HandlePermissionRequest 调用收到的 // 请求 payload, 返回预设的 (approve / updatedInput / reason). 端到端权限冒泡 // test 用它断言完整 payload 不变形地到达消费层 handler. type recordingPermissionHandler struct { mu sync.Mutex called bool payload inbox.PermissionRequestPayload approve bool updatedInput map[string]any reason string } func (h *recordingPermissionHandler) HandlePermissionRequest(_ context.Context, req inbox.PermissionRequestPayload) (bool, map[string]any, string) { h.mu.Lock() defer h.mu.Unlock() h.called = true h.payload = req return h.approve, h.updatedInput, h.reason } // --- enqueueTeamNotification / drainTeamNotifications --- func TestEnqueueDrainTeamNotification_Basic(t *testing.T) { eng := &Engine{ observer: &NoopObserver{}, } eng.enqueueTeamNotification("1") eng.enqueueTeamNotification("2") notifs := eng.drainTeamNotifications() if len(notifs) != 2 { t.Fatalf("expected 2 notifications, got %d", len(notifs)) } if notifs[0] != "1" { t.Errorf("unexpected notification[0]: %q", notifs[0]) } } func TestDrainTeamNotifications_ClearsAfterDrain(t *testing.T) { eng := &Engine{ observer: &NoopObserver{}, } eng.enqueueTeamNotification("xml-1") _ = eng.drainTeamNotifications() // 第二次 drain 应为 nil second := eng.drainTeamNotifications() if second != nil { t.Errorf("expected nil after drain, got %v", second) } } func TestDrainTeamNotifications_EmptyReturnsNil(t *testing.T) { eng := &Engine{ observer: &NoopObserver{}, } result := eng.drainTeamNotifications() if result != nil { t.Errorf("expected nil for empty queue, got %v", result) } } func TestEnqueueDrainTeamNotification_ThreadSafe(t *testing.T) { eng := &Engine{ observer: &NoopObserver{}, } // 100 个 goroutine 并发写入 const numProducers = 100 var wg sync.WaitGroup for i := 0; i < numProducers; i++ { wg.Add(1) go func() { defer wg.Done() eng.enqueueTeamNotification("") }() } wg.Wait() all := eng.drainTeamNotifications() if len(all) != numProducers { t.Errorf("expected %d notifications, got %d", numProducers, len(all)) } } func TestDrainTeamNotifications_AtomicSwap(t *testing.T) { eng := &Engine{ observer: &NoopObserver{}, } // 写入 3 条 for i := 0; i < 3; i++ { eng.enqueueTeamNotification("") } // 并发 drain(只有一个 goroutine 能获取全部消息) const numDrainers = 10 counts := make([]int, numDrainers) var wg sync.WaitGroup for i := 0; i < numDrainers; i++ { wg.Add(1) go func(idx int) { defer wg.Done() counts[idx] = len(eng.drainTeamNotifications()) }(i) } wg.Wait() total := 0 for _, c := range counts { total += c } // 总共应恰好取到 3 条(不多不少) if total != 3 { t.Errorf("concurrent drain: expected total 3, got %d", total) } } // --- NewTeam --- func TestNewTeam_BasicConstruction(t *testing.T) { cfg := testConfig() cfg.Model = "claude-sonnet-4-6" cfg.Cwd = "/tmp/test" eng := &Engine{ cfg: cfg, tools: tools.NewRegistry(), observer: &NoopObserver{}, } team := NewTeam(TeamConfig{ LeaderEngine: eng, }) if team == nil { t.Fatal("NewTeam returned nil") } if team.router == nil { t.Error("Team.router should not be nil") } if team.cfg.LeaderEngine != eng { t.Error("Team.cfg.LeaderEngine should be the provided engine") } } func TestNewTeam_NilPermissionHandlerAutoApprove(t *testing.T) { // nil PermissionHandler = 自动批准,不应 panic team := NewTeam(TeamConfig{ LeaderEngine: &Engine{ observer: &NoopObserver{}, }, PermissionHandler: nil, // 显式 nil }) if team.cfg.PermissionHandler != nil { t.Error("PermissionHandler should remain nil") } } // --- WorkerSpec 验证 --- func TestWorkerSpec_EmptyAgentTypeDefaultsToGeneralPurpose(t *testing.T) { spec := WorkerSpec{ AgentType: "", // 空 = general-purpose Prompt: "do something", Description: "test task", } // Team.runWorker 内部将空 AgentType 替换为 "general-purpose" // 这里直接测试该逻辑 agentType := spec.AgentType if agentType == "" { agentType = "general-purpose" } if agentType != "general-purpose" { t.Errorf("default agent type should be general-purpose, got %q", agentType) } } // --- WorkerResult --- func TestWorkerResult_HasDuration(t *testing.T) { start := time.Now() time.Sleep(1 * time.Millisecond) // 确保 Duration > 0 duration := time.Since(start) r := WorkerResult{ WorkerID: "sa-1", AgentType: "Explore", Description: "test", Result: "ok", Duration: duration, } if r.Duration <= 0 { t.Error("WorkerResult.Duration should be > 0") } } func TestWorkerResult_ErrorField(t *testing.T) { r := WorkerResult{ WorkerID: "sa-2", Error: context.Canceled, } if r.Error != context.Canceled { t.Errorf("expected context.Canceled, got %v", r.Error) } } // --- RunWorkers 边界情况 --- func TestRunWorkers_EmptySpecsReturnsNil(t *testing.T) { team := NewTeam(TeamConfig{ LeaderEngine: &Engine{ observer: &NoopObserver{}, }, }) results, err := team.RunWorkers(context.Background(), nil) if err != nil { t.Fatalf("unexpected error: %v", err) } if results != nil { t.Error("expected nil results for empty specs") } } func TestRunWorkers_NilLeaderReturnsError(t *testing.T) { team := NewTeam(TeamConfig{ LeaderEngine: nil, }) _, err := team.RunWorkers(context.Background(), []WorkerSpec{{Prompt: "x"}}) if err == nil { t.Error("expected error when LeaderEngine is nil") } } // TestRunWorkers_ForwardsSubAgentEventsToParentChannel — 集成测试: // Team.runWorker 从工具派发 ctx 继承 EventEmitter (因为父 agent 的 Agent // tool 会在 WithEventEmitter 包过的 ctx 下调 Team), worker 作为 SubAgent // 启动时 sa.runLoop 读 emitter 把 Start/End + 每个业务事件转发到父 Run // channel. 走 "pre-canceled ctx" 最小路径避免 mock LLM provider. // // TestRunWorkers_ForwardsSubAgentEventsToParentChannel — integration test: // Team.runWorker inherits the EventEmitter from the tool-dispatch ctx // (the parent agent's Agent tool would call Team under a ctx already // wrapped with WithEventEmitter), so when the worker SubAgent starts, // sa.runLoop reads the emitter and forwards Start/End + each business // event up to the parent Run channel. Uses the "pre-canceled ctx" minimal // path to avoid mocking an LLM provider. func TestRunWorkers_ForwardsSubAgentEventsToParentChannel(t *testing.T) { cfg := testConfig() cfg.Cwd = t.TempDir() leader := &Engine{ cfg: cfg, tools: tools.NewRegistry(), observer: &NoopObserver{}, } team := NewTeam(TeamConfig{LeaderEngine: leader}) // 捕获 emitter: 测试模拟"父 Agent 工具在此 ctx 下调用 Team"的场景. emit, snapshot := captureEmitter() ctx, cancel := context.WithCancel(context.Background()) cancel() // 预取消让 worker runLoop 命中 ctx.Done 快速退出 ctx = WithEventEmitter(ctx, emit) specs := []WorkerSpec{ { AgentType: "Explore", Prompt: "survey files", Description: "探索路径", }, } _, _ = team.RunWorkers(ctx, specs) captured := snapshot() if len(captured) == 0 { t.Fatalf("Team worker 应通过 ctx emitter 转发 Start/End, 但未捕获任何事件") } // 必须至少有 SubAgentStartEvent + SubAgentEndEvent 两个生命周期事件 var sawStart, sawEnd bool var startID string for _, e := range captured { switch v := e.(type) { case *SubAgentStartEvent: sawStart = true startID = v.SubAgentID if v.Description != "探索路径" { t.Errorf("Worker SubAgentStartEvent.Description=%q, want 探索路径", v.Description) } case *SubAgentEndEvent: sawEnd = true if v.SubAgentID != startID { t.Errorf("End.SubAgentID=%q, Start.SubAgentID=%q — 不串联", v.SubAgentID, startID) } } } if !sawStart { t.Errorf("未收到 SubAgentStartEvent (Team→Worker 转发链路断)") } if !sawEnd { t.Errorf("未收到 SubAgentEndEvent") } } // --- inbox.Router 集成(Team 使用 InboxRouter)--- func TestTeam_RouterSendReceive(t *testing.T) { // 测试 Team 内部 Router 的正常工作 team := NewTeam(TeamConfig{ LeaderEngine: &Engine{ observer: &NoopObserver{}, }, }) // 通过 router 发消息 msg, err := inbox.NewMessage("worker-1", "leader", inbox.MsgIdleNotification, inbox.IdleNotificationPayload{IdleReason: "available"}) if err != nil { t.Fatalf("NewMessage: %v", err) } if err := team.router.Send("leader", msg); err != nil { t.Fatalf("Send: %v", err) } leaderBox := team.router.Inbox("leader") ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() got, err := leaderBox.Recv(ctx) if err != nil { t.Fatalf("Recv: %v", err) } if got.ID != msg.ID { t.Errorf("ID mismatch: %q != %q", got.ID, msg.ID) } } // --- TeamConfig.PermissionHandler end-to-end (Worker → Leader bubble) --- // TestTeam_PermissionBubble_AllowWithUpdatedInput verifies the full Worker→Leader // permission-bubble round trip: Worker SubAgent's permissionChecker.Check builds // a PermissionRequestPayload with all 5 fields populated, routes it to the Leader // inbox, the Leader-side handleWorkerPermissionRequest invokes the consumer // PermissionHandler, the response (Approved + UpdatedInput + Reason) flows back // through MsgPermissionResponse, and the bubblingHandler returns a permission.Response // with Decision=Allow and the consumer-supplied UpdatedInput. // // This locks the godoc promise from team.go: "PermissionHandler 消费层实现的 // 权限确认" -- non-nil handler must be invoked per Worker tool exec, with the // full payload, and its UpdatedInput must reach the SubAgent. // // TestTeam_PermissionBubble_AllowWithUpdatedInput 验证完整 Worker→Leader 权限 // 冒泡 round trip: Worker SubAgent 的 permissionChecker.Check 构造 5 字段都 // 填的 PermissionRequestPayload, 路由到 Leader inbox, Leader 侧 // handleWorkerPermissionRequest 调用消费层 PermissionHandler, 响应 // (Approved + UpdatedInput + Reason) 通过 MsgPermissionResponse 流回, // bubblingHandler 返回 Decision=Allow 且 UpdatedInput 透传的 permission.Response. // // 锁住 team.go godoc 承诺: "PermissionHandler 消费层实现的权限确认" -- // 非 nil handler 必须每次 Worker 工具执行时被调用, 收到完整 payload, // 其 UpdatedInput 必须到达 SubAgent. func TestTeam_PermissionBubble_AllowWithUpdatedInput(t *testing.T) { handler := &recordingPermissionHandler{ approve: true, updatedInput: map[string]any{"command": "echo SAFE"}, reason: "approved by test handler", } leader := &Engine{observer: &NoopObserver{}} team := NewTeam(TeamConfig{ LeaderEngine: leader, PermissionHandler: handler, }) sa := &SubAgent{ ID: "test-worker-1", ParentEngine: leader, pendingPermissions: make(map[string]chan inbox.PermissionResponsePayload), done: make(chan struct{}), } sa.incomingInbox = team.router.Inbox(sa.ID) sa.permissionChecker = permission.NewEngine(permission.ModeDefault, team.bubblingHandlerFor(sa)) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Worker poll: receive MsgPermissionResponse and dispatch to pending chan. // Mirrors the runLoop branch added in subagent.go. // // Worker 端 poll: 收 MsgPermissionResponse 后 dispatch 到 pending chan. // 镜像 subagent.go 加的 runLoop 分支. go func() { msg, err := sa.incomingInbox.Recv(ctx) if err != nil || msg == nil { return } if msg.Type == inbox.MsgPermissionResponse { sa.dispatchPermissionResponse(msg) } }() // Leader poll: receive MsgPermissionRequest and resolve via consumer handler. // Mirrors the runLoop branch added in engine.go. // // Leader 端 poll: 收 MsgPermissionRequest 通过消费层 handler 解决. // 镜像 engine.go 加的 runLoop 分支. go func() { msg, err := leader.incomingInbox.Recv(ctx) if err != nil || msg == nil { return } if msg.Type == inbox.MsgPermissionRequest { leader.handleWorkerPermissionRequest(ctx, msg) } }() permResp, err := sa.permissionChecker.Check(ctx, &permission.Request{ ToolName: "Bash", ToolID: "tool-use-id-1", Input: map[string]any{"command": "echo hello"}, }) if err != nil { t.Fatalf("permissionChecker.Check failed: %v", err) } handler.mu.Lock() defer handler.mu.Unlock() if !handler.called { t.Fatal("consumer PermissionHandler never invoked -- bubble path broken") } if handler.payload.RequestID == "" { t.Error("PermissionRequestPayload.RequestID empty -- bubblingHandler must set it") } if handler.payload.ToolName != "Bash" { t.Errorf("PermissionRequestPayload.ToolName: got %q, want %q", handler.payload.ToolName, "Bash") } if handler.payload.ToolUseID != "tool-use-id-1" { t.Errorf("PermissionRequestPayload.ToolUseID: got %q, want %q", handler.payload.ToolUseID, "tool-use-id-1") } if handler.payload.Description == "" { t.Error("PermissionRequestPayload.Description empty -- permission.Engine should fill req.Message via ExplainPermissionRequest") } if got, _ := handler.payload.Input["command"].(string); got != "echo hello" { t.Errorf("PermissionRequestPayload.Input.command: got %v, want %q", handler.payload.Input["command"], "echo hello") } if permResp.Decision != permission.DecisionAllow { t.Errorf("permission.Response.Decision: got %v, want Allow", permResp.Decision) } if permResp.Reason != "approved by test handler" { t.Errorf("permission.Response.Reason: got %q, want %q", permResp.Reason, "approved by test handler") } if got, _ := permResp.UpdatedInput["command"].(string); got != "echo SAFE" { t.Errorf("permission.Response.UpdatedInput.command: got %v, want %q", permResp.UpdatedInput["command"], "echo SAFE") } } // TestTeam_PermissionBubble_NilHandlerAutoApproves locks the godoc promise from // team.go TeamConfig.PermissionHandler: "nil = 自动批准所有请求". Without this // test, a future refactor could change nil-handler semantics (e.g. fail-closed // deny) and silently break any consumer that relied on "I'll wire it later". // // TestTeam_PermissionBubble_NilHandlerAutoApproves 锁住 team.go // TeamConfig.PermissionHandler godoc 承诺: "nil = 自动批准所有请求". 没这个 test, // 未来重构可能改 nil handler 语义 (如 fail-closed deny), 默默破坏任何依赖 // "之后再 wire" 的消费层. func TestTeam_PermissionBubble_NilHandlerAutoApproves(t *testing.T) { leader := &Engine{observer: &NoopObserver{}} team := NewTeam(TeamConfig{ LeaderEngine: leader, PermissionHandler: nil, }) sa := &SubAgent{ ID: "test-worker-noh", ParentEngine: leader, pendingPermissions: make(map[string]chan inbox.PermissionResponsePayload), done: make(chan struct{}), } sa.incomingInbox = team.router.Inbox(sa.ID) sa.permissionChecker = permission.NewEngine(permission.ModeDefault, team.bubblingHandlerFor(sa)) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() go func() { msg, err := sa.incomingInbox.Recv(ctx) if err != nil || msg == nil { return } if msg.Type == inbox.MsgPermissionResponse { sa.dispatchPermissionResponse(msg) } }() go func() { msg, err := leader.incomingInbox.Recv(ctx) if err != nil || msg == nil { return } if msg.Type == inbox.MsgPermissionRequest { leader.handleWorkerPermissionRequest(ctx, msg) } }() permResp, err := sa.permissionChecker.Check(ctx, &permission.Request{ ToolName: "Read", ToolID: "tool-use-id-2", Input: map[string]any{"file_path": "/tmp/x"}, }) if err != nil { t.Fatalf("Check failed: %v", err) } if permResp.Decision != permission.DecisionAllow { t.Errorf("nil handler should auto-approve, got Decision=%v Reason=%q", permResp.Decision, permResp.Reason) } } // TestTaskNotificationTmpl_FormatOK validates taskNotificationTmpl renders // all declared fields — including , which is the Worker intent // tag the Leader uses to reverse-map a notification back to its business task. // // TestTaskNotificationTmpl_FormatOK 验证 taskNotificationTmpl 正确渲染所有字段 -- // 尤其是 , 它是 Leader 反向把通知映射回业务任务的意图标签. func TestTaskNotificationTmpl_FormatOK(t *testing.T) { xml := fmt.Sprintf(taskNotificationTmpl, "task-1", "completed", "南路径探索", "All checks passed", "Verification", int64(1234), ) if xml == "" { t.Error("task notification XML should not be empty") } if len(xml) < 50 { t.Errorf("XML too short: %q", xml) } if !strings.Contains(xml, "南路径探索") { t.Errorf("XML missing field or content mismatch:\n%s", xml) } } // TestRunWorkers_TaskNotification_CarriesDescription locks the wire: // runWorker fills WorkerResult.Description from WorkerSpec.Description, and // RunWorkers propagates it into the task-notification XML queued for the // Leader. The pre-canceled ctx path lets Worker fail fast without needing a // scripted provider, but the enqueue-notification branch still fires on the // failure path (team.go RunWorkers — both completed and failed paths build // XML and enqueue). // // TestRunWorkers_TaskNotification_CarriesDescription 锁定 wire: // runWorker 把 WorkerSpec.Description 填入 WorkerResult.Description, 然后 // RunWorkers 把它嵌进发给 Leader 的 task-notification XML. 用 pre-canceled // ctx 让 Worker 快速失败, 无需 scripted provider; enqueue-notification 分支 // 在失败路径也会触发 (team.go RunWorkers completed/failed 两路都会构造 XML // 并入队). func TestRunWorkers_TaskNotification_CarriesDescription(t *testing.T) { cfg := testConfig() cfg.Cwd = t.TempDir() leader := &Engine{ cfg: cfg, tools: tools.NewRegistry(), observer: &NoopObserver{}, } team := NewTeam(TeamConfig{LeaderEngine: leader}) ctx, cancel := context.WithCancel(context.Background()) cancel() specs := []WorkerSpec{ {AgentType: "Explore", Prompt: "survey north", Description: "北路径探索"}, {AgentType: "Explore", Prompt: "survey south", Description: "南路径探索"}, } _, _ = team.RunWorkers(ctx, specs) notifs := leader.drainTeamNotifications() if len(notifs) != 2 { t.Fatalf("drainTeamNotifications = %d notifs, want 2", len(notifs)) } joined := strings.Join(notifs, "\n") if !strings.Contains(joined, "北路径探索") { t.Errorf("notification missing 北路径探索 description:\n%s", joined) } if !strings.Contains(joined, "南路径探索") { t.Errorf("notification missing 南路径探索 description:\n%s", joined) } }