package engine // DreamEngine 负责自动记忆巩固(AutoDream). // // 在查询循环结束时检查门槛,满足则异步启动 Dream 任务. // Dream 运行完全异步,不阻塞主流程.失败时优雅降级(记录状态,不影响主功能). // // 三层门槛(最廉价的检查先做): // 1. 时间门槛:自上次巩固以来的最小小时数(默认 24h)- 一次 stat 调用 // 2. 扫描节流:两次检查之间至少间隔 10 分钟 - 内存变量 // 3. 会话计数门槛:自上次巩固以来的最小会话数(默认 5)- 内存计数 // 4. 文件锁:防止多进程同时执行 Dream - flock(2) // // 升华改进(ELEVATED): 执行策略切换--旧版用 JSON-ops 格式让模型输出结构化操作, // 现在改为 SubAgent-with-tools(与早期方案 runForkedAgent 一致). // 模型可以自主 grep transcript 文件,读取现有记忆,然后直接 Edit/Write. // 旧 JSON-ops 方案的根本缺陷:模型无法访问文件,Phase 2(Gather)完全失效. // // mtime-as-state:lock 文件的 mtime = lastConsolidatedAt(见 dream_lock.go). // 好处:进程 crash 后重启,mtime 仍然保留上次巩固时间,防止重复触发. // // PeriodicInterval:可选的定时触发(用于 SDK/API 长驻进程). // CLI 每次会话结束触发一次(RecordSession + CheckAndRun)已足够; // SDK/API 嵌入场景 engine 长驻,无"会话结束"事件,需要定时器. import ( "context" "encoding/json" "fmt" "os" "path/filepath" "strings" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/memory" ) // SessionProvider 提供自上次巩固以来的会话 ID 列表. // // 升华改进(ELEVATED): 早期实现 直接扫描 transcript 目录(listSessionsTouchedSince). // 我们提取为接口,解耦具体存储格式-- // // CLI 场景:FileSessionProvider 扫描 JSONL 文件 // SDK/API 场景:实现者可以从数据库,消息队列或自定义存储查询 // 跨行业:仓储/金融 Agent 的"会话"可能是工单/交易,与 JSONL 完全无关 // // 替代方案: // - 否决原因:绑死了 CLI 场景,SDK/API 嵌入无法定制数据源. type SessionProvider interface { // ListSince 返回自 sinceTime 以来被修改/创建的会话 ID 列表. // 实现者可以过滤当前会话(避免把还没写完的会话计入). ListSince(sinceTime time.Time) ([]string, error) } // FileSessionProvider 扫描指定目录中自 sinceTime 以来被修改的 JSONL 文件. // 文件名(去掉 .jsonl 扩展名)即为 session ID. // // 这是 CLI 场景的标准实现,对应早期方案 listSessionsTouchedSince. // 目录结构:/.jsonl(每个会话一个文件). type FileSessionProvider struct { // Dir 是 transcript 目录,例如 ~/.flyto/projects// Dir string // ExcludeID 是要排除的会话 ID(通常是当前会话,其 mtime 总是很新) ExcludeID string } // ListSince 扫描 Dir 目录,返回 mtime > sinceTime 的 .jsonl 文件名(不含扩展名). func (p *FileSessionProvider) ListSince(sinceTime time.Time) ([]string, error) { entries, err := os.ReadDir(p.Dir) if err != nil { if os.IsNotExist(err) { return nil, nil // 目录不存在 = 无历史 session } return nil, fmt.Errorf("list sessions: %w", err) } var ids []string for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") { continue } info, err := e.Info() if err != nil { continue // stat 失败,跳过该文件 } if !info.ModTime().After(sinceTime) { continue } id := strings.TrimSuffix(e.Name(), ".jsonl") if id == p.ExcludeID { continue // 排除当前会话 } ids = append(ids, id) } return ids, nil } // DreamEngine 是 Dream 巩固系统的核心引擎. type DreamEngine struct { memStore memory.Store modelRole string // 从 ModelRegistry.GetRole(RoleFast) 获取的模型 ID // 升华改进(ELEVATED): parentEngine 引用,仅用于 SubAgent fork 模式. // Dream 巩固改为 fork 子 agent 执行,共享 prompt cache. // 替代方案:Dream 直接调用 API(不共享缓存,独立构建请求). // // L1224 (2026-04-13): 从"万能入口"收窄为"仅 ForkSubAgent". // observer / activity / rootCtx 改为独立字段直接注入,不再通过 parentEngine 间接访问. // 原因:散布 12+ 处 `if de.parentEngine != nil && de.parentEngine.Observer() != nil` // 嵌套 nil 检查难维护,且 L367/L546 曾有无守卫路径,parentEngine==nil 直接 panic. parentEngine EngineRef // L1224: 独立注入的可观测性依赖(不再通过 parentEngine 间接访问). // observer 永不为 nil--NewDreamEngine 在 cfg.Observer==nil 时兜底 &NoopObserver{}, // 所以调用点可以直接 de.observer.Event(...) 无需 nil 检查(NoopObserver 所有方法空操作). // activity 可为 nil(未启用 ActivityTracker 的测试/轻量场景),调用点保留单处 nil 检查. // rootCtx 可为 nil(未长驻的 CLI 场景),periodic check 路径保留 fallback. observer EventObserver activity *ActivityTracker rootCtx context.Context // 三层门槛 minHours float64 // 自上次巩固以来的最小小时数(默认 24) minSessions int // 上次巩固以来的最小会话数(默认 5) // SessionProvider:可选,提供自上次巩固以来的会话列表. // nil = 不传 session hint 给 Dream prompt(用于 SDK/API 无 transcript 的场景). sessionProvider SessionProvider // transcriptDir:transcript 目录路径,用于 Dream prompt 中的 grep 提示. // 空 = prompt 不包含 transcript grep 示例. transcriptDir string // 状态追踪 mu sync.Mutex lastConsolidationAt time.Time // 上次巩固完成时间(内存中;持久化见 lock 文件 mtime) sessionCount int // 自上次巩固以来的会话计数 lastScanAt time.Time // 扫描节流时间戳(10 分钟) // 路径 lockPath string // ~/.flyto/dream.lock(mtime = lastConsolidatedAt) statePath string // ~/.flyto/dream_state.json(只存 sessionCount) // 任务追踪 taskStore *DreamTaskStore // goroutine 生命周期追踪--用于 Close() 等待 Dream goroutine 退出. // 精妙之处(CLEVER): 用 chan struct{} 而非 WaitGroup-- // Close() 需要带超时等待(select + timer),WaitGroup.Wait() 不支持超时. // dreamDone 在 goroutine 启动时创建(make),在 goroutine 退出时 close. // Close() 通过 select + time.After 实现有限等待,避免永久阻塞. dreamDone chan struct{} // goroutine 完成信号(nil 表示未运行) dreamRunning bool // 是否有 Dream goroutine 正在运行(mu 保护) // periodicDone 用于停止定时器 goroutine(EnablePeriodicCheck 启动时设置) periodicDone chan struct{} // periodicCancel 取消 periodicCtx(parentEngine==nil 时使用) periodicCancel context.CancelFunc } // DreamConfig 是 DreamEngine 的配置. type DreamConfig struct { MemStore memory.Store Models *config.ModelRegistry MinHours float64 // 默认 24 MinSess int // 默认 5 TaskStore *DreamTaskStore ParentEngine EngineRef // 用于 fork 模式(可选;L1224 起仅 ForkSubAgent 需要它) // L1224: 独立注入的可观测性依赖.engine.go 构造顺序已调整为先建 observer/activity/rootCtx // 再 buildDreamEngine,所以这三项在 Config 里就可填,不必走 post-fill 回填. // Observer==nil 时 NewDreamEngine 兜底 NoopObserver;Activity/RootCtx 可保持 nil. Observer EventObserver Activity *ActivityTracker RootCtx context.Context // SessionProvider 提供会话列表(可选). // CLI 场景:传 &FileSessionProvider{Dir: transcriptDir}. // SDK/API 长驻:可传自定义实现;nil = 不提供 session hint. SessionProvider SessionProvider // TranscriptDir 是 transcript 目录,用于 Dream prompt 中的 grep 示例(可选). // CLI 场景下通常与 FileSessionProvider.Dir 相同. TranscriptDir string // PeriodicInterval 是定时触发的间隔(0 = 不启用定时器). // CLI per-session 不需要设置(session end 触发已足够). // SDK/API 长驻进程可设置,例如 6 * time.Hour. PeriodicInterval time.Duration } // dreamStateFile 是只持久化 sessionCount 的轻量状态文件. // lastConsolidatedAt 改为从 lock 文件 mtime 读取(见 ReadLockMtime). type dreamStateFile struct { SessionCount int `json:"session_count"` } // NewDreamEngine 初始化 DreamEngine,从磁盘读取上次巩固时间和会话计数. func NewDreamEngine(cfg *DreamConfig) *DreamEngine { home, err := os.UserHomeDir() if err != nil { home = os.TempDir() } flytoDir := filepath.Join(home, ".flyto") minHours := cfg.MinHours if minHours <= 0 { minHours = 24 } minSess := cfg.MinSess if minSess <= 0 { minSess = 5 } modelID := "" if cfg.Models != nil { modelID = cfg.Models.GetRole(config.RoleFast) } if modelID == "" { modelID = config.DefaultRoles[config.RoleFast] } taskStore := cfg.TaskStore if taskStore == nil { taskStore = NewDreamTaskStore() } lockPath := filepath.Join(flytoDir, "dream.lock") // L1224: observer 永不 nil--用 NoopObserver 兜底让调用点省掉 nil 检查. // Activity/RootCtx 允许 nil(对应场景:无 Activity 追踪的测试,无长驻 rootCtx 的 CLI). observer := cfg.Observer if observer == nil { observer = &NoopObserver{} } de := &DreamEngine{ memStore: cfg.MemStore, modelRole: modelID, parentEngine: cfg.ParentEngine, observer: observer, activity: cfg.Activity, rootCtx: cfg.RootCtx, minHours: minHours, minSessions: minSess, sessionProvider: cfg.SessionProvider, transcriptDir: cfg.TranscriptDir, lockPath: lockPath, statePath: filepath.Join(flytoDir, "dream_state.json"), taskStore: taskStore, } // 从磁盘恢复状态: // lastConsolidationAt ← lock 文件 mtime(mtime-as-state 模式) // sessionCount ← dream_state.json de.lastConsolidationAt = ReadLockMtime(lockPath) de.loadSessionCount() // 启动定时器(如果配置了 PeriodicInterval) if cfg.PeriodicInterval > 0 { de.enablePeriodicCheck(cfg.PeriodicInterval) } return de } // RecordSession 记录一次新会话完成. // 在查询循环正常结束时调用(Engine.runLoop 末尾). func (de *DreamEngine) RecordSession() { de.mu.Lock() defer de.mu.Unlock() de.sessionCount++ de.saveSessionCountLocked() } // CheckAndRun 检查三层门槛,满足则异步启动 Dream 巩固. // 调用方应以 go de.CheckAndRun(ctx) 方式调用,本方法本身也不阻塞. func (de *DreamEngine) CheckAndRun(ctx context.Context) { if !de.shouldRun() { return } // 精妙之处(CLEVER): 文件锁 + mtime 回滚机制--多进程安全的 Dream 巩固. // TryAcquire 是非阻塞的(获取不到立即返回),避免多个 flyto 实例同时执行 Dream. // 失败时 Rollback(priorMtime) 恢复锁文件的 mtime,让下次检查时重新触发而非永久跳过. lock := NewFileLock(de.lockPath) priorMtime, ok := lock.TryAcquire() if !ok { return // 另一个进程正在执行 Dream,跳过 } // 更新扫描时间戳,并记录 goroutine 开始状态 de.mu.Lock() de.lastScanAt = time.Now() doneCh := make(chan struct{}) de.dreamDone = doneCh de.dreamRunning = true de.mu.Unlock() // 异步执行 Dream 巩固 go func() { // 精妙之处(CLEVER): 双重清理--先 close doneCh(通知 Close() 等待者), // 再 Release 文件锁(释放多进程锁定).顺序很重要: // 1. doneCh 关闭通知 Engine.Close() 可以继续 // 2. Release 释放文件锁,让下次 Dream 可以运行 // 如果顺序反过来,Engine.Close() 可能在锁释放前就以为 Dream 已干净退出. defer func() { de.mu.Lock() de.dreamRunning = false de.dreamDone = nil de.mu.Unlock() close(doneCh) }() defer lock.Release() // 升华改进(ELEVATED): ActivityDream 追踪--Dream 执行期间引用计数 +1, // ActivityTracker 不会在 Dream 运行时触发空闲回调,不会被 Close() 误判为空闲. // L1224: 从 parentEngine.Activity() 间接访问改为 de.activity 直接字段. if de.activity != nil { de.activity.Start(ActivityDream) defer de.activity.Stop(ActivityDream) } // L1224: observer 永不 nil(NoopObserver 兜底),无需嵌套 nil 检查. de.observer.Event("dream_start", map[string]any{ "session_count": de.getSessionCount(), }) if err := de.runDream(ctx); err != nil { // 失败时恢复锁文件 mtime(让下次会话重新触发) lock.Rollback(priorMtime) de.observer.Error(err, map[string]any{ "phase": "dream", }) } else { de.observer.Event("dream_complete", nil) } }() } // shouldRun 检查是否满足启动门槛(不含文件锁). func (de *DreamEngine) shouldRun() bool { de.mu.Lock() defer de.mu.Unlock() // 门槛 1:时间门槛 // 升华改进(ELEVATED): 早期方案 float64 * time.Duration 有精度截断风险 // (time.Hour 是 int64,float64 乘法有舍入误差). // 改为 time.Duration(hours * 3600) * time.Second,先把 float64 换算成整秒, // 再乘以 time.Second(精确 int64),精度更可预测. if time.Since(de.lastConsolidationAt) < time.Duration(de.minHours*3600)*time.Second { return false } // 门槛 2:扫描节流(10 分钟) // 当时间门槛通过但会话数不够时,lock mtime 不更新,时间门槛下次仍然通过. // 节流防止每轮查询结束都触发一次"只扫了时间但没扫会话"的无效检查. if time.Since(de.lastScanAt) < 10*time.Minute { return false } // 门槛 3:会话计数 if de.sessionCount < de.minSessions { return false } return true } // runDream 执行四阶段巩固任务(通过 SubAgent fork 模式). // 返回 error 以便调用方决定是否回滚. func (de *DreamEngine) runDream(ctx context.Context) error { // 注册任务状态 taskID := fmt.Sprintf("dream_%s", time.Now().Format("20060102_150405")) task := &DreamTaskState{ ID: taskID, Status: DreamStatusStarting, Phase: "starting", StartTime: time.Now(), } de.taskStore.Register(task) de.taskStore.Update(taskID, DreamStatusOrienting, "orienting") // 获取自上次巩固以来的会话列表(用于提示词 hint) var sessionIDs []string if de.sessionProvider != nil { de.mu.Lock() sinceTime := de.lastConsolidationAt de.mu.Unlock() ids, err := de.sessionProvider.ListSince(sinceTime) if err != nil { // 非致命:获取失败时 prompt 不附加 session hint,Dream 仍然运行 // 但必须有观测信号--静默吞掉会让调试完全无从入手. // L1224: 早期方案直接 de.parentEngine.Observer() 无守卫, parentEngine==nil 会 panic. // 改为 de.observer (NoopObserver 兜底) 后彻底消除这一 latent crash 路径. de.observer.Error(err, map[string]any{ "phase": "dream_list_sessions", }) } else { sessionIDs = ids } } task.SessionsReviewed = de.getSessionCount() // 构建记忆目录路径 var memoryRoot string if de.memStore != nil { memoryRoot = de.memStore.Dir() } // 构建叙事式 4 阶段提示词 de.taskStore.Update(taskID, DreamStatusGathering, "building prompt") prompt := BuildConsolidationPrompt(memoryRoot, de.transcriptDir, sessionIDs) // 执行 Dream SubAgent de.taskStore.Update(taskID, DreamStatusConsolidating, "consolidating") if err := de.runSubAgent(ctx, taskID, prompt); err != nil { de.taskStore.SetError(taskID, fmt.Sprintf("subagent: %v", err)) return err } // 完成 de.taskStore.Update(taskID, DreamStatusCompleted, "completed") // 重置计数,更新内存中的巩固时间 // 注意:lastConsolidatedAt 的磁盘持久化已由 lock 文件 mtime 完成(TryAcquire 时设置). // 这里只更新内存中的值(供本进程的 shouldRun 判断). de.mu.Lock() de.lastConsolidationAt = time.Now() de.sessionCount = 0 de.saveSessionCountLocked() de.mu.Unlock() return nil } // runSubAgent 使用 fork 模式运行 Dream SubAgent,通过 onTurn 回调追踪进度. func (de *DreamEngine) runSubAgent(ctx context.Context, taskID, prompt string) error { if de.parentEngine == nil { // 没有父 engine,无法 fork--这是一个配置错误,Dream 应该总是有父 engine return fmt.Errorf("dream: parentEngine is nil, cannot fork SubAgent") } // 升华改进(ELEVATED): EngineRef.ForkSubAgent() 替代类型断言-- // Dream 不再需要知道父引擎是 *Engine,只通过接口调用 fork 能力. var memoryRoot string if de.memStore != nil { memoryRoot = de.memStore.Dir() } sa := de.parentEngine.ForkSubAgent(&SubAgentConfig{ Description: "dream-consolidation", Model: de.modelRole, // Bash(只读命令)+ Read/Grep/Glob(探索)+ Edit/Write(写记忆文件) // 精妙之处(CLEVER): Bash 加入白名单让 Dream 可以 grep transcript. // MemoryDirRestrict 确保 Edit/Write 只能写 memory dir, // 但 Bash 的写操作(重定向等)还需要靠提示词约束(三层防御:指令+参数+兜底). // 参数层:prompt 说明 Bash 只允许只读命令. // 兜底层:memory dir 以外的 Edit/Write 被 MemoryDirRestrict 拦截. AllowedTools: map[string]bool{ "Read": true, "Grep": true, "Glob": true, "Edit": true, "Write": true, "Bash": true, }, MaxTurns: 10, MemoryDirRestrict: memoryRoot, }) // onTurn 回调:每个 assistant 轮次完成后更新 DreamTaskState onTurn := func(text string, toolUses []ToolUseInfo) { // 更新轮次进度(用于 UI 展示) turn := DreamTurn{ Text: strings.TrimSpace(text), ToolUseCount: len(toolUses), } de.taskStore.AddTurn(taskID, turn) // 追踪 Edit/Write 触达的文件 for _, tu := range toolUses { if (tu.Name == "Edit" || tu.Name == "Write") && tu.FilePath != "" { de.taskStore.AddFileTouched(taskID, tu.FilePath) // 首次有文件写入时翻转阶段标记(starting → updating) de.taskStore.Update(taskID, DreamStatusConsolidating, "updating") } } } _, err := sa.RunSyncWithCallback(ctx, prompt, onTurn) return err } // enablePeriodicCheck 启动后台 goroutine,按 interval 定时调用 CheckAndRun. // 仅用于 SDK/API 长驻进程(engine 长生命周期,没有自然的"session end"触发点). // // goroutine 在以下情况退出: // - parentEngine.rootCtx 被取消(Engine.Close() 时) // - de.periodicDone 被关闭(DreamEngine.Close() 时) func (de *DreamEngine) enablePeriodicCheck(interval time.Duration) { done := make(chan struct{}) de.periodicDone = done go func() { ticker := time.NewTicker(interval) defer ticker.Stop() // 精妙之处(CLEVER): 优先使用注入的 rootCtx-- // Engine.Close() 会 cancel rootCtx,定时器 goroutine 应该随之退出. // periodicDone 作为备用(DreamEngine 被独立关闭时). // L1224: 从 de.parentEngine.Context() 间接访问改为 de.rootCtx 直接字段. var rootCtx context.Context if de.rootCtx != nil { rootCtx = de.rootCtx } else { // rootCtx 未注入(测试/独立模式)时使用可取消的 context rootCtx, de.periodicCancel = context.WithCancel(context.Background()) } for { select { case <-ticker.C: de.CheckAndRun(rootCtx) case <-rootCtx.Done(): return case <-done: return } } }() } // loadSessionCount 从 dream_state.json 加载持久化的会话计数. // lastConsolidatedAt 改为从 lock 文件 mtime 读取,不从 state 文件读. func (de *DreamEngine) loadSessionCount() { data, err := os.ReadFile(de.statePath) if err != nil { return // 文件不存在或不可读,使用默认值 0 } // 精妙之处(CLEVER): 手动解析只需要的字段-- // 旧的 dream_state.json 可能包含 "last_consolidation_at" 字段, // 我们静默忽略它(已改为从 lock mtime 读取),只取 session_count. // 用 map[string]any 解析可以容忍旧格式的额外字段. var raw map[string]any if err := json.Unmarshal(data, &raw); err != nil { return } if v, ok := raw["session_count"]; ok { switch n := v.(type) { case float64: de.sessionCount = int(n) case int: de.sessionCount = n } } } // saveSessionCountLocked 持久化当前会话计数. // 调用时必须持有 de.mu 锁. // 只写 session_count,不写 last_consolidation_at(由 lock mtime 存储). func (de *DreamEngine) saveSessionCountLocked() { state := dreamStateFile{SessionCount: de.sessionCount} data, err := json.MarshalIndent(state, "", " ") if err != nil { return } dir := filepath.Dir(de.statePath) if err := os.MkdirAll(dir, 0755); err != nil { return } if err := os.WriteFile(de.statePath, data, 0644); err != nil { // fail-open:写入失败不影响 Dream 运行,但必须有信号(磁盘满/权限问题难排查). // L1224: 早期方案直接 de.parentEngine.Observer() 无守卫, parentEngine==nil 会 panic. // 改为 de.observer (NoopObserver 兜底) 后彻底消除这一 latent crash 路径. de.observer.Error(err, map[string]any{ "phase": "dream_save_state", "path": de.statePath, }) } } // getSessionCount 线程安全地获取当前会话计数. func (de *DreamEngine) getSessionCount() int { de.mu.Lock() defer de.mu.Unlock() return de.sessionCount } // TaskStore 返回 Dream 任务状态存储. func (de *DreamEngine) TaskStore() *DreamTaskStore { return de.taskStore } // Close 停止 DreamEngine,等待正在进行的 Dream goroutine 退出. // // 调用方应先 cancel 传递给 CheckAndRun 的 context(如 Engine.rootCancel()), // 再调用 Close()--context 取消会让 Dream goroutine 感知到停止信号, // Close() 只是等待它干净退出. // // timeout 参数限制等待时长(0 使用默认 3s). // 超时后强制返回(goroutine 仍在后台运行,但文件锁会随进程退出自动释放). // // 精妙之处(CLEVER): 这不是"强制终止"--goroutine 通过 ctx.Done() 感知停止, // Close() 只是礼貌地等一下.强制终止异步 goroutine 在 Go 中没有安全的做法, // 也没有必要:goroutine 泄漏最坏情况是进程退出时 OS 回收资源. func (de *DreamEngine) Close(timeout time.Duration) { if timeout <= 0 { timeout = 3 * time.Second } // 停止定时器 goroutine(如果有). // // 升华改进(ELEVATED): 早期方案直接 close(de.periodicDone) 或用裸 select 检查, // 但没有在锁内原子地 nil 掉引用--两个并发 Close() 都通过 select default 分支, // 都执行 close() → panic. // 改进:在 de.mu 下原子读取并置 nil,保证 Close() 幂等. // close 操作在锁外执行,避免 goroutine 的 CheckAndRun 也持有 de.mu 时死锁. de.mu.Lock() periodicCh := de.periodicDone de.periodicDone = nil // 置 nil 保证后续 Close() 调用不再进入 close de.mu.Unlock() if periodicCh != nil { select { case <-periodicCh: // 已经关闭(goroutine 提前退出) default: if de.periodicCancel != nil { de.periodicCancel() } close(periodicCh) } } // 获取 doneCh(如果有 dream goroutine 在运行) de.mu.Lock() doneCh := de.dreamDone running := de.dreamRunning de.mu.Unlock() if !running || doneCh == nil { return // 没有活跃的 Dream goroutine,立即返回 } // 等待 goroutine 退出,带超时保护 select { case <-doneCh: // goroutine 干净退出 case <-time.After(timeout): // 超时,不阻塞 Engine.Close() 流程 // L1224 (2026-04-13) 修复: 之前因 DreamEngine 不持有 observer 引用而无法记录 // 超时事件(只通过 parentEngine.observer 间接访问,Close 路径下 parentEngine 可能已 nil). // 现在 de.observer 直接注入且 NoopObserver 兜底,可以安全发射超时事件-- // 这对线上排查"Dream goroutine 卡住导致 Close 慢"至关重要. de.observer.Event("dream_close_timeout", map[string]any{ "timeout_ms": timeout.Milliseconds(), }) } }