// team_test.go 测试 Team 协调器的核心逻辑.
//
// 测试策略:
// - RunWorkers 依赖真实 Engine(需要 API),只测纯逻辑部分
// - enqueueTeamNotification / drainTeamNotifications 是纯内存操作,完整测试
// - PermissionHandler nil = 自动批准,不需要实现接口就能测试
// - WorkerSpec 验证逻辑
//
// 不测:
// - 真实并发 Worker 执行(需要 API key,属于集成测试)
package engine
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
"git.flytoex.net/yuanwei/flyto-agent/pkg/inbox"
"git.flytoex.net/yuanwei/flyto-agent/pkg/permission"
"git.flytoex.net/yuanwei/flyto-agent/pkg/tools"
)
// recordingPermissionHandler captures the request payload from a Leader-side
// HandlePermissionRequest call and returns canned (approve / updatedInput / reason).
// Used by end-to-end permission-bubble tests to assert the full payload reaches
// the consumer-layer handler unmangled.
//
// recordingPermissionHandler 捕获 Leader 侧 HandlePermissionRequest 调用收到的
// 请求 payload, 返回预设的 (approve / updatedInput / reason). 端到端权限冒泡
// test 用它断言完整 payload 不变形地到达消费层 handler.
type recordingPermissionHandler struct {
mu sync.Mutex
called bool
payload inbox.PermissionRequestPayload
approve bool
updatedInput map[string]any
reason string
}
func (h *recordingPermissionHandler) HandlePermissionRequest(_ context.Context, req inbox.PermissionRequestPayload) (bool, map[string]any, string) {
h.mu.Lock()
defer h.mu.Unlock()
h.called = true
h.payload = req
return h.approve, h.updatedInput, h.reason
}
// --- enqueueTeamNotification / drainTeamNotifications ---
func TestEnqueueDrainTeamNotification_Basic(t *testing.T) {
eng := &Engine{
observer: &NoopObserver{},
}
eng.enqueueTeamNotification("1")
eng.enqueueTeamNotification("2")
notifs := eng.drainTeamNotifications()
if len(notifs) != 2 {
t.Fatalf("expected 2 notifications, got %d", len(notifs))
}
if notifs[0] != "1" {
t.Errorf("unexpected notification[0]: %q", notifs[0])
}
}
func TestDrainTeamNotifications_ClearsAfterDrain(t *testing.T) {
eng := &Engine{
observer: &NoopObserver{},
}
eng.enqueueTeamNotification("xml-1")
_ = eng.drainTeamNotifications()
// 第二次 drain 应为 nil
second := eng.drainTeamNotifications()
if second != nil {
t.Errorf("expected nil after drain, got %v", second)
}
}
func TestDrainTeamNotifications_EmptyReturnsNil(t *testing.T) {
eng := &Engine{
observer: &NoopObserver{},
}
result := eng.drainTeamNotifications()
if result != nil {
t.Errorf("expected nil for empty queue, got %v", result)
}
}
func TestEnqueueDrainTeamNotification_ThreadSafe(t *testing.T) {
eng := &Engine{
observer: &NoopObserver{},
}
// 100 个 goroutine 并发写入
const numProducers = 100
var wg sync.WaitGroup
for i := 0; i < numProducers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
eng.enqueueTeamNotification("")
}()
}
wg.Wait()
all := eng.drainTeamNotifications()
if len(all) != numProducers {
t.Errorf("expected %d notifications, got %d", numProducers, len(all))
}
}
func TestDrainTeamNotifications_AtomicSwap(t *testing.T) {
eng := &Engine{
observer: &NoopObserver{},
}
// 写入 3 条
for i := 0; i < 3; i++ {
eng.enqueueTeamNotification("")
}
// 并发 drain(只有一个 goroutine 能获取全部消息)
const numDrainers = 10
counts := make([]int, numDrainers)
var wg sync.WaitGroup
for i := 0; i < numDrainers; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
counts[idx] = len(eng.drainTeamNotifications())
}(i)
}
wg.Wait()
total := 0
for _, c := range counts {
total += c
}
// 总共应恰好取到 3 条(不多不少)
if total != 3 {
t.Errorf("concurrent drain: expected total 3, got %d", total)
}
}
// --- NewTeam ---
func TestNewTeam_BasicConstruction(t *testing.T) {
cfg := testConfig()
cfg.Model = "claude-sonnet-4-6"
cfg.Cwd = "/tmp/test"
eng := &Engine{
cfg: cfg,
tools: tools.NewRegistry(),
observer: &NoopObserver{},
}
team := NewTeam(TeamConfig{
LeaderEngine: eng,
})
if team == nil {
t.Fatal("NewTeam returned nil")
}
if team.router == nil {
t.Error("Team.router should not be nil")
}
if team.cfg.LeaderEngine != eng {
t.Error("Team.cfg.LeaderEngine should be the provided engine")
}
}
func TestNewTeam_NilPermissionHandlerAutoApprove(t *testing.T) {
// nil PermissionHandler = 自动批准,不应 panic
team := NewTeam(TeamConfig{
LeaderEngine: &Engine{
observer: &NoopObserver{},
},
PermissionHandler: nil, // 显式 nil
})
if team.cfg.PermissionHandler != nil {
t.Error("PermissionHandler should remain nil")
}
}
// --- WorkerSpec 验证 ---
func TestWorkerSpec_EmptyAgentTypeDefaultsToGeneralPurpose(t *testing.T) {
spec := WorkerSpec{
AgentType: "", // 空 = general-purpose
Prompt: "do something",
Description: "test task",
}
// Team.runWorker 内部将空 AgentType 替换为 "general-purpose"
// 这里直接测试该逻辑
agentType := spec.AgentType
if agentType == "" {
agentType = "general-purpose"
}
if agentType != "general-purpose" {
t.Errorf("default agent type should be general-purpose, got %q", agentType)
}
}
// --- WorkerResult ---
func TestWorkerResult_HasDuration(t *testing.T) {
start := time.Now()
time.Sleep(1 * time.Millisecond) // 确保 Duration > 0
duration := time.Since(start)
r := WorkerResult{
WorkerID: "sa-1",
AgentType: "Explore",
Description: "test",
Result: "ok",
Duration: duration,
}
if r.Duration <= 0 {
t.Error("WorkerResult.Duration should be > 0")
}
}
func TestWorkerResult_ErrorField(t *testing.T) {
r := WorkerResult{
WorkerID: "sa-2",
Error: context.Canceled,
}
if r.Error != context.Canceled {
t.Errorf("expected context.Canceled, got %v", r.Error)
}
}
// --- RunWorkers 边界情况 ---
func TestRunWorkers_EmptySpecsReturnsNil(t *testing.T) {
team := NewTeam(TeamConfig{
LeaderEngine: &Engine{
observer: &NoopObserver{},
},
})
results, err := team.RunWorkers(context.Background(), nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if results != nil {
t.Error("expected nil results for empty specs")
}
}
func TestRunWorkers_NilLeaderReturnsError(t *testing.T) {
team := NewTeam(TeamConfig{
LeaderEngine: nil,
})
_, err := team.RunWorkers(context.Background(), []WorkerSpec{{Prompt: "x"}})
if err == nil {
t.Error("expected error when LeaderEngine is nil")
}
}
// TestRunWorkers_ForwardsSubAgentEventsToParentChannel — 集成测试:
// Team.runWorker 从工具派发 ctx 继承 EventEmitter (因为父 agent 的 Agent
// tool 会在 WithEventEmitter 包过的 ctx 下调 Team), worker 作为 SubAgent
// 启动时 sa.runLoop 读 emitter 把 Start/End + 每个业务事件转发到父 Run
// channel. 走 "pre-canceled ctx" 最小路径避免 mock LLM provider.
//
// TestRunWorkers_ForwardsSubAgentEventsToParentChannel — integration test:
// Team.runWorker inherits the EventEmitter from the tool-dispatch ctx
// (the parent agent's Agent tool would call Team under a ctx already
// wrapped with WithEventEmitter), so when the worker SubAgent starts,
// sa.runLoop reads the emitter and forwards Start/End + each business
// event up to the parent Run channel. Uses the "pre-canceled ctx" minimal
// path to avoid mocking an LLM provider.
func TestRunWorkers_ForwardsSubAgentEventsToParentChannel(t *testing.T) {
cfg := testConfig()
cfg.Cwd = t.TempDir()
leader := &Engine{
cfg: cfg,
tools: tools.NewRegistry(),
observer: &NoopObserver{},
}
team := NewTeam(TeamConfig{LeaderEngine: leader})
// 捕获 emitter: 测试模拟"父 Agent 工具在此 ctx 下调用 Team"的场景.
emit, snapshot := captureEmitter()
ctx, cancel := context.WithCancel(context.Background())
cancel() // 预取消让 worker runLoop 命中 ctx.Done 快速退出
ctx = WithEventEmitter(ctx, emit)
specs := []WorkerSpec{
{
AgentType: "Explore",
Prompt: "survey files",
Description: "探索路径",
},
}
_, _ = team.RunWorkers(ctx, specs)
captured := snapshot()
if len(captured) == 0 {
t.Fatalf("Team worker 应通过 ctx emitter 转发 Start/End, 但未捕获任何事件")
}
// 必须至少有 SubAgentStartEvent + SubAgentEndEvent 两个生命周期事件
var sawStart, sawEnd bool
var startID string
for _, e := range captured {
switch v := e.(type) {
case *SubAgentStartEvent:
sawStart = true
startID = v.SubAgentID
if v.Description != "探索路径" {
t.Errorf("Worker SubAgentStartEvent.Description=%q, want 探索路径", v.Description)
}
case *SubAgentEndEvent:
sawEnd = true
if v.SubAgentID != startID {
t.Errorf("End.SubAgentID=%q, Start.SubAgentID=%q — 不串联", v.SubAgentID, startID)
}
}
}
if !sawStart {
t.Errorf("未收到 SubAgentStartEvent (Team→Worker 转发链路断)")
}
if !sawEnd {
t.Errorf("未收到 SubAgentEndEvent")
}
}
// --- inbox.Router 集成(Team 使用 InboxRouter)---
func TestTeam_RouterSendReceive(t *testing.T) {
// 测试 Team 内部 Router 的正常工作
team := NewTeam(TeamConfig{
LeaderEngine: &Engine{
observer: &NoopObserver{},
},
})
// 通过 router 发消息
msg, err := inbox.NewMessage("worker-1", "leader", inbox.MsgIdleNotification,
inbox.IdleNotificationPayload{IdleReason: "available"})
if err != nil {
t.Fatalf("NewMessage: %v", err)
}
if err := team.router.Send("leader", msg); err != nil {
t.Fatalf("Send: %v", err)
}
leaderBox := team.router.Inbox("leader")
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
got, err := leaderBox.Recv(ctx)
if err != nil {
t.Fatalf("Recv: %v", err)
}
if got.ID != msg.ID {
t.Errorf("ID mismatch: %q != %q", got.ID, msg.ID)
}
}
// --- TeamConfig.PermissionHandler end-to-end (Worker → Leader bubble) ---
// TestTeam_PermissionBubble_AllowWithUpdatedInput verifies the full Worker→Leader
// permission-bubble round trip: Worker SubAgent's permissionChecker.Check builds
// a PermissionRequestPayload with all 5 fields populated, routes it to the Leader
// inbox, the Leader-side handleWorkerPermissionRequest invokes the consumer
// PermissionHandler, the response (Approved + UpdatedInput + Reason) flows back
// through MsgPermissionResponse, and the bubblingHandler returns a permission.Response
// with Decision=Allow and the consumer-supplied UpdatedInput.
//
// This locks the godoc promise from team.go: "PermissionHandler 消费层实现的
// 权限确认" -- non-nil handler must be invoked per Worker tool exec, with the
// full payload, and its UpdatedInput must reach the SubAgent.
//
// TestTeam_PermissionBubble_AllowWithUpdatedInput 验证完整 Worker→Leader 权限
// 冒泡 round trip: Worker SubAgent 的 permissionChecker.Check 构造 5 字段都
// 填的 PermissionRequestPayload, 路由到 Leader inbox, Leader 侧
// handleWorkerPermissionRequest 调用消费层 PermissionHandler, 响应
// (Approved + UpdatedInput + Reason) 通过 MsgPermissionResponse 流回,
// bubblingHandler 返回 Decision=Allow 且 UpdatedInput 透传的 permission.Response.
//
// 锁住 team.go godoc 承诺: "PermissionHandler 消费层实现的权限确认" --
// 非 nil handler 必须每次 Worker 工具执行时被调用, 收到完整 payload,
// 其 UpdatedInput 必须到达 SubAgent.
func TestTeam_PermissionBubble_AllowWithUpdatedInput(t *testing.T) {
handler := &recordingPermissionHandler{
approve: true,
updatedInput: map[string]any{"command": "echo SAFE"},
reason: "approved by test handler",
}
leader := &Engine{observer: &NoopObserver{}}
team := NewTeam(TeamConfig{
LeaderEngine: leader,
PermissionHandler: handler,
})
sa := &SubAgent{
ID: "test-worker-1",
ParentEngine: leader,
pendingPermissions: make(map[string]chan inbox.PermissionResponsePayload),
done: make(chan struct{}),
}
sa.incomingInbox = team.router.Inbox(sa.ID)
sa.permissionChecker = permission.NewEngine(permission.ModeDefault, team.bubblingHandlerFor(sa))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Worker poll: receive MsgPermissionResponse and dispatch to pending chan.
// Mirrors the runLoop branch added in subagent.go.
//
// Worker 端 poll: 收 MsgPermissionResponse 后 dispatch 到 pending chan.
// 镜像 subagent.go 加的 runLoop 分支.
go func() {
msg, err := sa.incomingInbox.Recv(ctx)
if err != nil || msg == nil {
return
}
if msg.Type == inbox.MsgPermissionResponse {
sa.dispatchPermissionResponse(msg)
}
}()
// Leader poll: receive MsgPermissionRequest and resolve via consumer handler.
// Mirrors the runLoop branch added in engine.go.
//
// Leader 端 poll: 收 MsgPermissionRequest 通过消费层 handler 解决.
// 镜像 engine.go 加的 runLoop 分支.
go func() {
msg, err := leader.incomingInbox.Recv(ctx)
if err != nil || msg == nil {
return
}
if msg.Type == inbox.MsgPermissionRequest {
leader.handleWorkerPermissionRequest(ctx, msg)
}
}()
permResp, err := sa.permissionChecker.Check(ctx, &permission.Request{
ToolName: "Bash",
ToolID: "tool-use-id-1",
Input: map[string]any{"command": "echo hello"},
})
if err != nil {
t.Fatalf("permissionChecker.Check failed: %v", err)
}
handler.mu.Lock()
defer handler.mu.Unlock()
if !handler.called {
t.Fatal("consumer PermissionHandler never invoked -- bubble path broken")
}
if handler.payload.RequestID == "" {
t.Error("PermissionRequestPayload.RequestID empty -- bubblingHandler must set it")
}
if handler.payload.ToolName != "Bash" {
t.Errorf("PermissionRequestPayload.ToolName: got %q, want %q", handler.payload.ToolName, "Bash")
}
if handler.payload.ToolUseID != "tool-use-id-1" {
t.Errorf("PermissionRequestPayload.ToolUseID: got %q, want %q", handler.payload.ToolUseID, "tool-use-id-1")
}
if handler.payload.Description == "" {
t.Error("PermissionRequestPayload.Description empty -- permission.Engine should fill req.Message via ExplainPermissionRequest")
}
if got, _ := handler.payload.Input["command"].(string); got != "echo hello" {
t.Errorf("PermissionRequestPayload.Input.command: got %v, want %q", handler.payload.Input["command"], "echo hello")
}
if permResp.Decision != permission.DecisionAllow {
t.Errorf("permission.Response.Decision: got %v, want Allow", permResp.Decision)
}
if permResp.Reason != "approved by test handler" {
t.Errorf("permission.Response.Reason: got %q, want %q", permResp.Reason, "approved by test handler")
}
if got, _ := permResp.UpdatedInput["command"].(string); got != "echo SAFE" {
t.Errorf("permission.Response.UpdatedInput.command: got %v, want %q", permResp.UpdatedInput["command"], "echo SAFE")
}
}
// TestTeam_PermissionBubble_NilHandlerAutoApproves locks the godoc promise from
// team.go TeamConfig.PermissionHandler: "nil = 自动批准所有请求". Without this
// test, a future refactor could change nil-handler semantics (e.g. fail-closed
// deny) and silently break any consumer that relied on "I'll wire it later".
//
// TestTeam_PermissionBubble_NilHandlerAutoApproves 锁住 team.go
// TeamConfig.PermissionHandler godoc 承诺: "nil = 自动批准所有请求". 没这个 test,
// 未来重构可能改 nil handler 语义 (如 fail-closed deny), 默默破坏任何依赖
// "之后再 wire" 的消费层.
func TestTeam_PermissionBubble_NilHandlerAutoApproves(t *testing.T) {
leader := &Engine{observer: &NoopObserver{}}
team := NewTeam(TeamConfig{
LeaderEngine: leader,
PermissionHandler: nil,
})
sa := &SubAgent{
ID: "test-worker-noh",
ParentEngine: leader,
pendingPermissions: make(map[string]chan inbox.PermissionResponsePayload),
done: make(chan struct{}),
}
sa.incomingInbox = team.router.Inbox(sa.ID)
sa.permissionChecker = permission.NewEngine(permission.ModeDefault, team.bubblingHandlerFor(sa))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go func() {
msg, err := sa.incomingInbox.Recv(ctx)
if err != nil || msg == nil {
return
}
if msg.Type == inbox.MsgPermissionResponse {
sa.dispatchPermissionResponse(msg)
}
}()
go func() {
msg, err := leader.incomingInbox.Recv(ctx)
if err != nil || msg == nil {
return
}
if msg.Type == inbox.MsgPermissionRequest {
leader.handleWorkerPermissionRequest(ctx, msg)
}
}()
permResp, err := sa.permissionChecker.Check(ctx, &permission.Request{
ToolName: "Read",
ToolID: "tool-use-id-2",
Input: map[string]any{"file_path": "/tmp/x"},
})
if err != nil {
t.Fatalf("Check failed: %v", err)
}
if permResp.Decision != permission.DecisionAllow {
t.Errorf("nil handler should auto-approve, got Decision=%v Reason=%q", permResp.Decision, permResp.Reason)
}
}
// TestTaskNotificationTmpl_FormatOK validates taskNotificationTmpl renders
// all declared fields — including , which is the Worker intent
// tag the Leader uses to reverse-map a notification back to its business task.
//
// TestTaskNotificationTmpl_FormatOK 验证 taskNotificationTmpl 正确渲染所有字段 --
// 尤其是 , 它是 Leader 反向把通知映射回业务任务的意图标签.
func TestTaskNotificationTmpl_FormatOK(t *testing.T) {
xml := fmt.Sprintf(taskNotificationTmpl,
"task-1",
"completed",
"南路径探索",
"All checks passed",
"Verification",
int64(1234),
)
if xml == "" {
t.Error("task notification XML should not be empty")
}
if len(xml) < 50 {
t.Errorf("XML too short: %q", xml)
}
if !strings.Contains(xml, "南路径探索") {
t.Errorf("XML missing field or content mismatch:\n%s", xml)
}
}
// TestRunWorkers_TaskNotification_CarriesDescription locks the wire:
// runWorker fills WorkerResult.Description from WorkerSpec.Description, and
// RunWorkers propagates it into the task-notification XML queued for the
// Leader. The pre-canceled ctx path lets Worker fail fast without needing a
// scripted provider, but the enqueue-notification branch still fires on the
// failure path (team.go RunWorkers — both completed and failed paths build
// XML and enqueue).
//
// TestRunWorkers_TaskNotification_CarriesDescription 锁定 wire:
// runWorker 把 WorkerSpec.Description 填入 WorkerResult.Description, 然后
// RunWorkers 把它嵌进发给 Leader 的 task-notification XML. 用 pre-canceled
// ctx 让 Worker 快速失败, 无需 scripted provider; enqueue-notification 分支
// 在失败路径也会触发 (team.go RunWorkers completed/failed 两路都会构造 XML
// 并入队).
func TestRunWorkers_TaskNotification_CarriesDescription(t *testing.T) {
cfg := testConfig()
cfg.Cwd = t.TempDir()
leader := &Engine{
cfg: cfg,
tools: tools.NewRegistry(),
observer: &NoopObserver{},
}
team := NewTeam(TeamConfig{LeaderEngine: leader})
ctx, cancel := context.WithCancel(context.Background())
cancel()
specs := []WorkerSpec{
{AgentType: "Explore", Prompt: "survey north", Description: "北路径探索"},
{AgentType: "Explore", Prompt: "survey south", Description: "南路径探索"},
}
_, _ = team.RunWorkers(ctx, specs)
notifs := leader.drainTeamNotifications()
if len(notifs) != 2 {
t.Fatalf("drainTeamNotifications = %d notifs, want 2", len(notifs))
}
joined := strings.Join(notifs, "\n")
if !strings.Contains(joined, "北路径探索") {
t.Errorf("notification missing 北路径探索 description:\n%s", joined)
}
if !strings.Contains(joined, "南路径探索") {
t.Errorf("notification missing 南路径探索 description:\n%s", joined)
}
}