package engine // ActivityTracker -- 会话活动追踪系统. // // 升华改进(ELEVATED): 早期方案 是全局单例 + 全局 refcount, // 只支持 CLI 模式的心跳.我们的 ActivityTracker 是 Engine 级别的实例, // 通过回调机制支持所有消费模式: // - CLI: OnIdle 回调显示"Agent 已空闲" // - SDK: OnBusy/OnIdle 让嵌入方知道 Agent 是否在工作 // - HTTP API: OnHeartbeat 响应 Kubernetes liveness probe // - 仓储 daemon: OnHeartbeat 向 WMS 报告在线状态 // // 核心机制:引用计数 + 定时器 // - Start("api_call") → refcount++ → 0→1 时启动心跳 + 触发 OnBusy // - Stop("api_call") → refcount-- → 1→0 时停止心跳 + 延迟触发 OnIdle // // 替代方案:<原方案全局单例 + Node.js 单线程无需锁 + CLI-only 心跳> import ( "strconv" "sync" "time" ) // ============================================================ // ActivityReason - 活动原因枚举 // ============================================================ // ActivityReason 标识活动的原因/类型. type ActivityReason string const ( ActivityAPICall ActivityReason = "api_call" ActivityToolExec ActivityReason = "tool_exec" ActivitySubAgent ActivityReason = "subagent" ActivityDream ActivityReason = "dream" ActivityMemoryExtraction ActivityReason = "memory_extraction" ) // ============================================================ // ActivityTrackerConfig - 配置 // ============================================================ // ActivityTrackerConfig 配置活动追踪器. type ActivityTrackerConfig struct { // HeartbeatInterval 心跳间隔(busy 时定时触发).默认 30s. HeartbeatInterval time.Duration // IdleDelay 空闲延迟:refcount 归零后等多久才触发 OnIdle.默认 30s. // 精妙之处(CLEVER): 不是立刻触发 OnIdle-- // 工具执行完到下一次 API 调用之间有短暂间隙,不应算 idle. // 只有真正 30s 无活动才算空闲. IdleDelay time.Duration // OnBusy refcount 0→1 时触发(Agent 从空闲变为繁忙) OnBusy func() // OnIdle refcount 1→0 且延迟到期后触发(Agent 真正空闲) OnIdle func(idleDuration time.Duration) // OnHeartbeat busy 时每 HeartbeatInterval 触发一次 OnHeartbeat func(activeReasons map[ActivityReason]int, duration time.Duration) } // DefaultActivityTrackerConfig 返回默认配置. func DefaultActivityTrackerConfig() *ActivityTrackerConfig { return &ActivityTrackerConfig{ HeartbeatInterval: 30 * time.Second, IdleDelay: 30 * time.Second, } } // ============================================================ // ActivityTracker - 活动追踪器 // ============================================================ // ActivityTracker 追踪 Engine 的活动状态. // // 精妙之处(CLEVER): 引用计数模式--每个"工作单元"(API 调用/工具执行/SubAgent) // Start 时 +1,Stop 时 -1.refcount>0 = busy,refcount==0 = idle. // 比时间戳判断更准确--并行工具执行时,任何一个还在跑就是 busy. // // 线程安全:所有状态通过 sync.Mutex 保护. type ActivityTracker struct { mu sync.Mutex cfg *ActivityTrackerConfig // 引用计数和活跃原因 refcount int activeReasons map[ActivityReason]int // 时间追踪 busySince time.Time // 最近一次 0→1 的时间 lastActivity time.Time // 最后一次 Start/Stop 的时间 // 定时器 heartbeatTimer *time.Ticker idleTimer *time.Timer stopCh chan struct{} // Close 时通知定时器 goroutine 退出 // 状态 closed bool // Observer(可选,用于发事件) observer EventObserver } // NewActivityTracker 创建活动追踪器. func NewActivityTracker(cfg *ActivityTrackerConfig, observer EventObserver) *ActivityTracker { if cfg == nil { cfg = DefaultActivityTrackerConfig() } if cfg.HeartbeatInterval <= 0 { cfg.HeartbeatInterval = 30 * time.Second } if cfg.IdleDelay <= 0 { cfg.IdleDelay = 30 * time.Second } if observer == nil { observer = &NoopObserver{} } return &ActivityTracker{ cfg: cfg, activeReasons: make(map[ActivityReason]int), lastActivity: time.Now(), stopCh: make(chan struct{}), observer: observer, } } // Start 标记一个工作单元开始. // // 精妙之处(CLEVER): 按 reason 分类计数-- // shutdown 时能看到"还有 2 个 api_call 和 1 个 tool_exec 没结束", // 比纯 refcount 的"还有 3 个活动"信息量大得多. func (t *ActivityTracker) Start(reason ActivityReason) { t.mu.Lock() defer t.mu.Unlock() if t.closed { return } t.refcount++ t.activeReasons[reason]++ t.lastActivity = time.Now() // 0→1 转换:从 idle 变 busy if t.refcount == 1 { t.busySince = time.Now() // 停止空闲定时器 if t.idleTimer != nil { t.idleTimer.Stop() t.idleTimer = nil } // 启动心跳定时器 t.startHeartbeat() // 通知 OnBusy if t.cfg.OnBusy != nil { go t.cfg.OnBusy() } t.observer.Event("engine_busy", map[string]any{ "reason": string(reason), }) } } // Stop 标记一个工作单元结束. func (t *ActivityTracker) Stop(reason ActivityReason) { t.mu.Lock() defer t.mu.Unlock() if t.closed || t.refcount <= 0 { return } t.refcount-- if n := t.activeReasons[reason]; n > 1 { t.activeReasons[reason] = n - 1 } else { delete(t.activeReasons, reason) } t.lastActivity = time.Now() // 1→0 转换:从 busy 变 idle if t.refcount == 0 { // 停止心跳 t.stopHeartbeat() // 启动空闲延迟定时器 t.startIdleTimer() } } // IsBusy 返回是否有活跃工作. func (t *ActivityTracker) IsBusy() bool { t.mu.Lock() defer t.mu.Unlock() return t.refcount > 0 } // ActiveReasons 返回当前活跃的原因和计数. func (t *ActivityTracker) ActiveReasons() map[ActivityReason]int { t.mu.Lock() defer t.mu.Unlock() result := make(map[ActivityReason]int, len(t.activeReasons)) for k, v := range t.activeReasons { result[k] = v } return result } // Refcount 返回当前引用计数. func (t *ActivityTracker) Refcount() int { t.mu.Lock() defer t.mu.Unlock() return t.refcount } // LastActivity 返回最后一次活动时间. func (t *ActivityTracker) LastActivity() time.Time { t.mu.Lock() defer t.mu.Unlock() return t.lastActivity } // BusySince 返回最近一次从 idle→busy 的时间(当前非 busy 时返回零值). func (t *ActivityTracker) BusySince() time.Time { t.mu.Lock() defer t.mu.Unlock() if t.refcount > 0 { return t.busySince } return time.Time{} } // Close 停止所有定时器,释放资源. // 精妙之处(CLEVER): Close 时记录最终统计-- // 如果还有活跃工作,说明 Close 打断了正在进行的操作(可用于排查卡住问题). func (t *ActivityTracker) Close() { t.mu.Lock() defer t.mu.Unlock() if t.closed { return } t.closed = true // 停止定时器 t.stopHeartbeat() if t.idleTimer != nil { t.idleTimer.Stop() t.idleTimer = nil } close(t.stopCh) // 记录最终状态 t.observer.Event("activity_tracker_closed", map[string]any{ "refcount": t.refcount, "active_reasons": formatReasons(t.activeReasons), }) } // ============================================================ // 内部方法 // ============================================================ // startHeartbeat 启动心跳定时器(调用者持有锁). func (t *ActivityTracker) startHeartbeat() { if t.heartbeatTimer != nil { return // 已在运行 } t.heartbeatTimer = time.NewTicker(t.cfg.HeartbeatInterval) ticker := t.heartbeatTimer go func() { for { select { case <-t.stopCh: return case _, ok := <-ticker.C: if !ok { return } t.mu.Lock() if t.closed || t.refcount == 0 { t.mu.Unlock() return } reasons := make(map[ActivityReason]int, len(t.activeReasons)) for k, v := range t.activeReasons { reasons[k] = v } duration := time.Since(t.busySince) // 精妙之处(CLEVER): 在锁内捕获 refcount 快照,unlock 后使用快照值-- // observer.Event 可能耗时(如写日志/发 HTTP),不应在锁内调用. // 直接读 t.refcount(lock 外)会导致 race detector 报 DATA RACE. refcount := t.refcount t.mu.Unlock() // 触发回调 if t.cfg.OnHeartbeat != nil { t.cfg.OnHeartbeat(reasons, duration) } t.observer.Event("engine_heartbeat", map[string]any{ "refcount": refcount, "active_reasons": formatReasons(reasons), "busy_duration": duration.String(), }) } } }() } // stopHeartbeat 停止心跳定时器(调用者持有锁). func (t *ActivityTracker) stopHeartbeat() { if t.heartbeatTimer != nil { t.heartbeatTimer.Stop() t.heartbeatTimer = nil } } // startIdleTimer 启动空闲延迟定时器(调用者持有锁). func (t *ActivityTracker) startIdleTimer() { if t.idleTimer != nil { t.idleTimer.Stop() } t.idleTimer = time.AfterFunc(t.cfg.IdleDelay, func() { t.mu.Lock() if t.closed || t.refcount > 0 { t.mu.Unlock() return // 在延迟期间又变 busy 了 } idleDuration := time.Since(t.lastActivity) t.mu.Unlock() if t.cfg.OnIdle != nil { t.cfg.OnIdle(idleDuration) } t.observer.Event("engine_idle", map[string]any{ "idle_duration": idleDuration.String(), }) }) } // formatReasons 将 reason map 格式化为字符串. func formatReasons(reasons map[ActivityReason]int) string { if len(reasons) == 0 { return "" } s := "" for k, v := range reasons { if s != "" { s += "," } s += string(k) + ":" + strconv.Itoa(v) } return s }