package engine import ( "fmt" "os" "path/filepath" "strings" "testing" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/memory" ) // --- 门槛检查测试 --- func TestDreamEngine_ShouldRun_TimeNotMet(t *testing.T) { de := newTestDreamEngine(t) de.lastConsolidationAt = time.Now().Add(-1 * time.Hour) de.sessionCount = 10 if de.shouldRun() { t.Error("shouldRun() returned true when time threshold not met") } } func TestDreamEngine_ShouldRun_SessionsNotMet(t *testing.T) { de := newTestDreamEngine(t) de.lastConsolidationAt = time.Now().Add(-48 * time.Hour) de.sessionCount = 2 if de.shouldRun() { t.Error("shouldRun() returned true when session count not met") } } func TestDreamEngine_ShouldRun_AllMet(t *testing.T) { de := newTestDreamEngine(t) de.lastConsolidationAt = time.Now().Add(-48 * time.Hour) de.sessionCount = 10 de.lastScanAt = time.Time{} if !de.shouldRun() { t.Error("shouldRun() returned false when all thresholds met") } } func TestDreamEngine_ShouldRun_ScanThrottle(t *testing.T) { de := newTestDreamEngine(t) de.lastConsolidationAt = time.Now().Add(-48 * time.Hour) de.sessionCount = 10 de.lastScanAt = time.Now().Add(-5 * time.Minute) // 不够 10 分钟 if de.shouldRun() { t.Error("shouldRun() returned true when scan throttle active") } } func TestDreamEngine_ShouldRun_FirstRun(t *testing.T) { de := newTestDreamEngine(t) de.sessionCount = 5 // lastConsolidationAt 零值 = 很久以前 if !de.shouldRun() { t.Error("shouldRun() returned false on first run with enough sessions") } } // --- 会话计数测试 --- func TestDreamEngine_RecordSession(t *testing.T) { de := newTestDreamEngine(t) if de.getSessionCount() != 0 { t.Errorf("initial session count = %d, want 0", de.getSessionCount()) } de.RecordSession() de.RecordSession() de.RecordSession() if de.getSessionCount() != 3 { t.Errorf("session count after 3 records = %d, want 3", de.getSessionCount()) } } // --- mtime-as-state 测试 --- func TestReadLockMtime_FileAbsent(t *testing.T) { // 文件不存在 → 返回零值(视为从未巩固) got := ReadLockMtime(filepath.Join(t.TempDir(), "nonexistent.lock")) if !got.IsZero() { t.Errorf("ReadLockMtime absent file = %v, want zero", got) } } func TestReadLockMtime_FileExists(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, "test.lock") // 创建锁文件并设置特定 mtime if err := os.WriteFile(lockPath, []byte(""), 0644); err != nil { t.Fatal(err) } want := time.Now().Add(-12 * time.Hour).Truncate(time.Second) if err := os.Chtimes(lockPath, want, want); err != nil { t.Fatal(err) } got := ReadLockMtime(lockPath) if got.Sub(want).Abs() > time.Second { t.Errorf("ReadLockMtime = %v, want %v", got, want) } } func TestDreamEngine_MtimeAsState_LoadFromLock(t *testing.T) { // 验证:loadSessionCount + lastConsolidationAt 从 lock mtime 读取 dir := t.TempDir() lockPath := filepath.Join(dir, "dream.lock") statePath := filepath.Join(dir, "dream_state.json") // 设置 lock mtime 为 "2 天前" twoDAgo := time.Now().Add(-48 * time.Hour).Truncate(time.Second) if err := os.WriteFile(lockPath, []byte(""), 0644); err != nil { t.Fatal(err) } if err := os.Chtimes(lockPath, twoDAgo, twoDAgo); err != nil { t.Fatal(err) } de := newTestDreamEngineWithPaths(t, lockPath, statePath) if de.lastConsolidationAt.Sub(twoDAgo).Abs() > time.Second { t.Errorf("lastConsolidationAt = %v, want ~%v (from lock mtime)", de.lastConsolidationAt, twoDAgo) } } // --- 会话计数持久化测试 --- func TestDreamEngine_SessionCountPersistence(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, "dream.lock") statePath := filepath.Join(dir, "dream_state.json") de1 := newTestDreamEngineWithPaths(t, lockPath, statePath) de1.RecordSession() de1.RecordSession() de1.RecordSession() if de1.getSessionCount() != 3 { t.Fatalf("de1 session count = %d, want 3", de1.getSessionCount()) } // 重新加载:sessionCount 应从 dream_state.json 恢复 de2 := newTestDreamEngineWithPaths(t, lockPath, statePath) if de2.getSessionCount() != 3 { t.Errorf("de2 session count after load = %d, want 3", de2.getSessionCount()) } } // --- 文件锁测试 --- func TestFileLock_AcquireAndRelease(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, "test.lock") lock := NewFileLock(lockPath) _, ok := lock.TryAcquire() if !ok { t.Fatal("TryAcquire() failed on first attempt") } if !lock.IsAcquired() { t.Error("IsAcquired() returned false after acquiring") } if _, err := os.Stat(lockPath); os.IsNotExist(err) { t.Error("lock file was not created") } lock.Release() if lock.IsAcquired() { t.Error("IsAcquired() returned true after releasing") } } func TestFileLock_DoubleAcquireFails(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, "test.lock") lock1 := NewFileLock(lockPath) lock2 := NewFileLock(lockPath) _, ok := lock1.TryAcquire() if !ok { t.Fatal("first TryAcquire() failed") } defer lock1.Release() _, ok = lock2.TryAcquire() if ok { lock2.Release() t.Error("second TryAcquire() succeeded while lock held") } } func TestFileLock_Rollback(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, "test.lock") oldTime := time.Now().Add(-24 * time.Hour) if err := os.WriteFile(lockPath, []byte(""), 0644); err != nil { t.Fatal(err) } if err := os.Chtimes(lockPath, oldTime, oldTime); err != nil { t.Fatal(err) } lock := NewFileLock(lockPath) priorMtime, ok := lock.TryAcquire() if !ok { t.Fatal("TryAcquire() failed") } if priorMtime.Sub(oldTime).Abs() > time.Second { t.Errorf("priorMtime = %v, want close to %v", priorMtime, oldTime) } lock.Release() lock.Rollback(priorMtime) info, err := os.Stat(lockPath) if err != nil { t.Fatal(err) } if info.ModTime().Sub(oldTime).Abs() > time.Second { t.Errorf("mtime after rollback = %v, want close to %v", info.ModTime(), oldTime) } } func TestFileLock_RollbackZero(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, "test.lock") if err := os.WriteFile(lockPath, []byte(""), 0644); err != nil { t.Fatal(err) } lock := NewFileLock(lockPath) lock.Rollback(time.Time{}) // 零值 = 删除文件 if _, err := os.Stat(lockPath); !os.IsNotExist(err) { t.Error("lock file was not deleted after Rollback(zero)") } } // --- Dream 提示词测试(新版叙事式格式)--- func TestBuildConsolidationPrompt_Basic(t *testing.T) { prompt := BuildConsolidationPrompt("/mem/root", "", nil) if prompt == "" { t.Error("prompt should not be empty") } for _, phase := range []string{"Phase 1", "Phase 2", "Phase 3", "Phase 4"} { if !strings.Contains(prompt, phase) { t.Errorf("prompt missing %q", phase) } } if !strings.Contains(prompt, "/mem/root") { t.Error("prompt missing memoryRoot path") } // 无 transcript 目录时不应包含 grep 示例 if strings.Contains(prompt, "grep -rn") { t.Error("prompt should not include grep examples when transcriptDir is empty") } } func TestBuildConsolidationPrompt_WithTranscript(t *testing.T) { prompt := BuildConsolidationPrompt("/mem/root", "/transcripts", nil) if !strings.Contains(prompt, "/transcripts") { t.Error("prompt missing transcriptDir") } if !strings.Contains(prompt, "grep -rn") { t.Error("prompt should include grep example when transcriptDir is set") } } func TestBuildConsolidationPrompt_WithSessionIDs(t *testing.T) { ids := []string{"session-abc", "session-xyz"} prompt := BuildConsolidationPrompt("/mem/root", "/transcripts", ids) if !strings.Contains(prompt, "session-abc") { t.Error("prompt missing session ID hint") } if !strings.Contains(prompt, "Tool constraints") { t.Error("prompt missing Bash constraints note") } } // --- FileSessionProvider 测试 --- func TestFileSessionProvider_ListSince_Empty(t *testing.T) { dir := t.TempDir() p := &FileSessionProvider{Dir: dir} ids, err := p.ListSince(time.Now().Add(-1 * time.Hour)) if err != nil { t.Fatalf("ListSince failed: %v", err) } if len(ids) != 0 { t.Errorf("expected 0 sessions, got %d", len(ids)) } } func TestFileSessionProvider_ListSince_FiltersOld(t *testing.T) { dir := t.TempDir() // 创建 3 个 session 文件,2 个新,1 个旧 writeJSONL := func(name string, mtime time.Time) { path := filepath.Join(dir, name+".jsonl") if err := os.WriteFile(path, []byte(`{}`), 0644); err != nil { t.Fatal(err) } if err := os.Chtimes(path, mtime, mtime); err != nil { t.Fatal(err) } } cutoff := time.Now().Add(-24 * time.Hour) writeJSONL("old-session", time.Now().Add(-48*time.Hour)) // 旧,应过滤 writeJSONL("new-session-1", time.Now().Add(-12*time.Hour)) // 新,应包含 writeJSONL("new-session-2", time.Now().Add(-6*time.Hour)) // 新,应包含 p := &FileSessionProvider{Dir: dir} ids, err := p.ListSince(cutoff) if err != nil { t.Fatalf("ListSince failed: %v", err) } if len(ids) != 2 { t.Errorf("expected 2 sessions, got %d: %v", len(ids), ids) } } func TestFileSessionProvider_ListSince_ExcludesCurrentSession(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "current-session.jsonl") if err := os.WriteFile(path, []byte(`{}`), 0644); err != nil { t.Fatal(err) } p := &FileSessionProvider{Dir: dir, ExcludeID: "current-session"} ids, err := p.ListSince(time.Now().Add(-1 * time.Hour)) if err != nil { t.Fatalf("ListSince failed: %v", err) } for _, id := range ids { if id == "current-session" { t.Error("current session should be excluded") } } } func TestFileSessionProvider_ListSince_DirAbsent(t *testing.T) { p := &FileSessionProvider{Dir: filepath.Join(t.TempDir(), "nonexistent")} ids, err := p.ListSince(time.Now()) if err != nil { t.Fatalf("ListSince should not error on absent dir, got: %v", err) } if ids != nil && len(ids) > 0 { t.Errorf("expected nil/empty, got %v", ids) } } // --- DreamTaskStore 测试 --- func TestDreamTaskStore_RegisterAndGet(t *testing.T) { store := NewDreamTaskStore() task := &DreamTaskState{ ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now(), } store.Register(task) got := store.Get("dream_1") if got == nil { t.Fatal("Get returned nil") } if got.Status != DreamStatusStarting { t.Errorf("status = %v, want %v", got.Status, DreamStatusStarting) } } func TestDreamTaskStore_Update(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now()}) store.Update("dream_1", DreamStatusConsolidating, "phase 3") got := store.Get("dream_1") if got.Status != DreamStatusConsolidating { t.Errorf("status = %v, want %v", got.Status, DreamStatusConsolidating) } if got.Phase != "phase 3" { t.Errorf("phase = %v, want %v", got.Phase, "phase 3") } } func TestDreamTaskStore_SetError(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now()}) store.SetError("dream_1", "something broke") got := store.Get("dream_1") if got.Status != DreamStatusFailed { t.Errorf("status = %v, want %v", got.Status, DreamStatusFailed) } if got.Error != "something broke" { t.Errorf("error = %v, want 'something broke'", got.Error) } if got.EndTime.IsZero() { t.Error("EndTime should be set on failure") } } func TestDreamTaskStore_AddFileTouched_Dedup(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now()}) store.AddFileTouched("dream_1", "file_a") store.AddFileTouched("dream_1", "file_a") // 重复,应被去重 store.AddFileTouched("dream_1", "file_b") got := store.Get("dream_1") if len(got.FilesTouched) != 2 { t.Errorf("FilesTouched len = %d, want 2 (dedup)", len(got.FilesTouched)) } } func TestDreamTaskStore_AddTurn_Basic(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now()}) store.AddTurn("dream_1", DreamTurn{Text: "Reviewing memories...", ToolUseCount: 2}) store.AddTurn("dream_1", DreamTurn{Text: "Updating index.", ToolUseCount: 1}) got := store.Get("dream_1") if len(got.Turns) != 2 { t.Errorf("Turns len = %d, want 2", len(got.Turns)) } if got.Turns[0].Text != "Reviewing memories..." { t.Errorf("Turns[0].Text = %q", got.Turns[0].Text) } } func TestDreamTaskStore_AddTurn_SkipsEmpty(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now()}) // 空轮应该被跳过 store.AddTurn("dream_1", DreamTurn{Text: "", ToolUseCount: 0}) // 有文本的轮次应该保留 store.AddTurn("dream_1", DreamTurn{Text: "Working...", ToolUseCount: 0}) got := store.Get("dream_1") if len(got.Turns) != 1 { t.Errorf("Turns len = %d, want 1 (empty turn skipped)", len(got.Turns)) } } func TestDreamTaskStore_AddTurn_SlidingWindow(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ID: "dream_1", Status: DreamStatusStarting, StartTime: time.Now()}) // 插入 maxDreamTurns + 5 条,应该只保留最近 maxDreamTurns 条 for i := 0; i < maxDreamTurns+5; i++ { store.AddTurn("dream_1", DreamTurn{Text: fmt.Sprintf("turn %d", i), ToolUseCount: 1}) } got := store.Get("dream_1") if len(got.Turns) != maxDreamTurns { t.Errorf("Turns len = %d, want %d (sliding window)", len(got.Turns), maxDreamTurns) } // 最后一条应该是最新的 last := got.Turns[len(got.Turns)-1] if last.Text != fmt.Sprintf("turn %d", maxDreamTurns+4) { t.Errorf("last turn text = %q, want latest", last.Text) } } func TestDreamTaskStore_Latest(t *testing.T) { store := NewDreamTaskStore() store.Register(&DreamTaskState{ ID: "dream_old", Status: DreamStatusCompleted, StartTime: time.Now().Add(-1 * time.Hour), }) store.Register(&DreamTaskState{ ID: "dream_new", Status: DreamStatusStarting, StartTime: time.Now(), }) latest := store.Latest() if latest == nil { t.Fatal("Latest returned nil") } if latest.ID != "dream_new" { t.Errorf("Latest ID = %v, want dream_new", latest.ID) } } // --- ToolUseInfo 测试(SubAgent.RunSyncWithCallback 辅助类型)--- func TestToolUseInfo_FilePathExtraction(t *testing.T) { // 验证 ToolUseInfo 的 FilePath 字段由 Edit/Write 的 file_path 参数填充 // 这里直接测试 ToolUseInfo 的使用方式,不依赖 SubAgent 完整调用链 info := ToolUseInfo{Name: "Edit", FilePath: "/mem/foo.md"} if info.Name != "Edit" || info.FilePath != "/mem/foo.md" { t.Errorf("unexpected ToolUseInfo: %+v", info) } // 非文件工具 FilePath 应为空 info2 := ToolUseInfo{Name: "Grep", FilePath: ""} if info2.FilePath != "" { t.Errorf("non-file tool should have empty FilePath") } } // ─── DreamEngine.Close() 测试 ──────────────────────────────────────────────── func TestDreamEngine_Close_WhenNotRunning(t *testing.T) { de := newTestDreamEngine(t) done := make(chan struct{}) go func() { de.Close(1 * time.Second) close(done) }() select { case <-done: case <-time.After(200 * time.Millisecond): t.Error("Close() hung when no goroutine was running") } } func TestDreamEngine_Close_WaitsForRunningGoroutine(t *testing.T) { de := newTestDreamEngine(t) doneCh := make(chan struct{}) de.mu.Lock() de.dreamDone = doneCh de.dreamRunning = true de.mu.Unlock() go func() { time.Sleep(50 * time.Millisecond) de.mu.Lock() de.dreamRunning = false de.dreamDone = nil de.mu.Unlock() close(doneCh) }() start := time.Now() de.Close(1 * time.Second) elapsed := time.Since(start) if elapsed < 30*time.Millisecond { t.Error("Close() should have waited for the goroutine to finish") } } func TestDreamEngine_Close_Timeout(t *testing.T) { de := newTestDreamEngine(t) doneCh := make(chan struct{}) // 永不关闭 de.mu.Lock() de.dreamDone = doneCh de.dreamRunning = true de.mu.Unlock() start := time.Now() de.Close(50 * time.Millisecond) elapsed := time.Since(start) if elapsed > 500*time.Millisecond { t.Errorf("Close() did not respect timeout: took %v", elapsed) } } // --- L1224: Observer 直接注入测试 --- // capturingObserver 是只为本测试服务的 EventObserver 实现--记录所有 Event/Error 调用, // 用于断言 DreamEngine 在不同路径下是否发射了预期的事件. // // 精妙之处(CLEVER): 不复用 BufferedObserver 或其他生产实现-- // 测试应该精准验证"我这次 refactor 关心的接口契约", 而不是借道其他组件间接验证. // 独立的 mock 让失败定位更直接: 事件没收到就是 dream.go 的 call site 出错, 不是 observer 链路. type capturingObserver struct { events []capturedEvent errors []capturedError } type capturedEvent struct { name string data map[string]any } type capturedError struct { err error context map[string]any } func (c *capturingObserver) Event(name string, data map[string]any) { c.events = append(c.events, capturedEvent{name: name, data: data}) } func (c *capturingObserver) Error(err error, context map[string]any) { c.errors = append(c.errors, capturedError{err: err, context: context}) } func (c *capturingObserver) Metric(name string, value float64, tags map[string]string) {} // TestDreamEngine_ObserverInjection 验证 L1224 refactor: DreamEngine 现在持有独立的 observer // 字段 (不再通过 parentEngine.Observer() 间接访问), 关键事件能被注入的 mock observer 捕获. // // 覆盖路径: // 1. NewDreamEngine 在 cfg.Observer==nil 时兜底 NoopObserver (不 panic) // 2. NewDreamEngine 在 cfg.Observer != nil 时使用注入的 observer // 3. Close() 的 timeout 分支发射 dream_close_timeout 事件 (L1224 红利: 早期方案无法记录此事件) func TestDreamEngine_ObserverInjection(t *testing.T) { t.Run("nil observer fallback to NoopObserver", func(t *testing.T) { // cfg.Observer == nil 时 NewDreamEngine 应兜底为 NoopObserver, Close() timeout 路径不 panic. dir := t.TempDir() de := NewDreamEngine(&DreamConfig{ MemStore: memory.NewFileStore(t.TempDir()), MinHours: 24, MinSess: 5, // Observer 故意不设置 }) // 覆盖 lockPath/statePath 到临时目录以隔离测试 de.lockPath = filepath.Join(dir, "dream.lock") de.statePath = filepath.Join(dir, "dream_state.json") if de.observer == nil { t.Fatal("NewDreamEngine should initialize observer to NoopObserver when cfg.Observer is nil") } if _, ok := de.observer.(*NoopObserver); !ok { t.Errorf("expected NoopObserver, got %T", de.observer) } // 构造一个永不完成的 dream goroutine, 触发 Close timeout 路径--不应 panic. doneCh := make(chan struct{}) de.mu.Lock() de.dreamDone = doneCh de.dreamRunning = true de.mu.Unlock() de.Close(20 * time.Millisecond) // NoopObserver.Event 空操作, 不 panic }) t.Run("injected observer captures close_timeout event", func(t *testing.T) { // 注入 capturingObserver, 验证 Close() timeout 分支发射 dream_close_timeout 事件. // 这是 L1224 带来的能力: 早期方案 DreamEngine 无法记录此事件 (只能通过 parentEngine.observer // 访问, 而 Close 路径下 parentEngine 可能已被 Engine.Close() 处理). obs := &capturingObserver{} dir := t.TempDir() de := NewDreamEngine(&DreamConfig{ MemStore: memory.NewFileStore(t.TempDir()), MinHours: 24, MinSess: 5, Observer: obs, }) de.lockPath = filepath.Join(dir, "dream.lock") de.statePath = filepath.Join(dir, "dream_state.json") if de.observer != obs { t.Fatalf("NewDreamEngine should use injected observer, got %T", de.observer) } // 构造永不完成的 dream goroutine, Close timeout 会触发事件发射. doneCh := make(chan struct{}) de.mu.Lock() de.dreamDone = doneCh de.dreamRunning = true de.mu.Unlock() de.Close(20 * time.Millisecond) // 断言: 恰好 1 个 dream_close_timeout 事件, 且 timeout_ms 字段正确. found := false for _, ev := range obs.events { if ev.name == "dream_close_timeout" { found = true if ms, ok := ev.data["timeout_ms"].(int64); !ok || ms != 20 { t.Errorf("dream_close_timeout event data incorrect: got %v, want timeout_ms=20", ev.data) } } } if !found { t.Errorf("expected dream_close_timeout event, got events: %v", obs.events) } }) t.Run("config injects activity and rootCtx", func(t *testing.T) { // 验证 Config 的另外两个字段 (Activity, RootCtx) 也正确落到 struct 字段上. // 这是防御性测试: 如果未来有人删了 NewDreamEngine 里的字段拷贝会立刻被捕获. obs := &capturingObserver{} act := &ActivityTracker{} // 零值 struct 作为 sentinel--测试只关心字段被拷贝而非其行为 // 注: 用 context.TODO() 作为 sentinel 验证字段被拷贝; 下面导入 "context" 于文件顶部无需 de := NewDreamEngine(&DreamConfig{ MemStore: memory.NewFileStore(t.TempDir()), Observer: obs, Activity: act, // RootCtx 留空, 验证保持 nil 的 fallback 分支可用 }) if de.observer != obs { t.Error("observer not injected") } if de.activity != act { t.Error("activity not injected") } if de.rootCtx != nil { t.Error("rootCtx should remain nil when not provided") } }) } // --- 辅助函数 --- func newTestDreamEngine(t *testing.T) *DreamEngine { t.Helper() dir := t.TempDir() return newTestDreamEngineWithPaths(t, filepath.Join(dir, "dream.lock"), filepath.Join(dir, "dream_state.json"), ) } func newTestDreamEngineWithPaths(t *testing.T, lockPath, statePath string) *DreamEngine { t.Helper() memDir := t.TempDir() store := memory.NewFileStore(memDir) de := &DreamEngine{ memStore: store, modelRole: config.DefaultRoles[config.RoleFast], minHours: 24, minSessions: 5, lockPath: lockPath, statePath: statePath, taskStore: NewDreamTaskStore(), // L1224: observer 必须非 nil(生产路径靠 NewDreamEngine 兜底 NoopObserver). // 测试直接 struct 构造绕过了兜底,必须显式补上 - 否则 Close timeout 路径 panic. observer: &NoopObserver{}, } // mtime-as-state:从 lock mtime 读取 lastConsolidationAt de.lastConsolidationAt = ReadLockMtime(lockPath) de.loadSessionCount() return de }