package daemon // heartbeat.go - 会话心跳检测服务. // // 解决问题:客户端连接可能因网络问题"半死"--TCP 连接看似存活但数据无法传输. // HeartbeatService 周期性检查所有会话,发现超时未收到心跳的会话并关闭它. // // 升华改进(ELEVATED): 早期方案 为每个会话独立维护心跳定时器-- // N 个会话 = N 个 setInterval,大量会话时定时器管理开销显著. // 我们用单个 HeartbeatService 统一扫描所有会话,O(n) 遍历一次完成所有检查. // 替代方案:<每会话独立 time.Ticker> - 否决:100 个会话 = 100 个 goroutine, // 统一扫描只需 1 个 goroutine,资源开销降低 100x. import ( "context" "log" "sync" "time" ) // HeartbeatConfig 是心跳服务配置. type HeartbeatConfig struct { // Interval 心跳检测间隔(扫描频率),默认 30s. Interval time.Duration // Timeout 心跳超时阈值.超过此时间无心跳视为连接断开,默认 90s. // 应为 Interval 的 3 倍左右,容忍偶发延迟. Timeout time.Duration } // DefaultHeartbeatConfig 返回生产合理的心跳配置. func DefaultHeartbeatConfig() HeartbeatConfig { return HeartbeatConfig{ Interval: 30 * time.Second, Timeout: 90 * time.Second, } } // HeartbeatSession 是 HeartbeatService 需要的会话接口. // DaemonManager 实现此接口,将会话注册到心跳服务. type HeartbeatSession interface { // SessionID 返回会话 ID. SessionID() string // LastActiveAt 返回会话最后一次活跃的时间. LastActiveAt() time.Time // OnHeartbeatTimeout 是心跳超时回调,由 HeartbeatService 调用. OnHeartbeatTimeout(sessionID string) } // HeartbeatService 统一管理所有会话的心跳检测. // // 精妙之处(CLEVER): HeartbeatService 不持有会话引用,而是持有回调-- // 超时时调用 onTimeout(sessionID),DaemonManager 自己决定如何处理. // 这样 HeartbeatService 与 DaemonManager 解耦,可以独立测试. type HeartbeatService struct { cfg HeartbeatConfig mu sync.RWMutex sessions map[string]time.Time // sessionID → 最后活跃时间 onTimeout func(sessionID string) ctx context.Context cancel context.CancelFunc } // NewHeartbeatService 创建心跳服务. // onTimeout: 当会话心跳超时时调用(通常是 DaemonManager.closeSession). func NewHeartbeatService(cfg HeartbeatConfig, onTimeout func(sessionID string)) *HeartbeatService { if cfg.Interval <= 0 { cfg.Interval = 30 * time.Second } if cfg.Timeout <= 0 { cfg.Timeout = 90 * time.Second } ctx, cancel := context.WithCancel(context.Background()) return &HeartbeatService{ cfg: cfg, sessions: make(map[string]time.Time), onTimeout: onTimeout, ctx: ctx, cancel: cancel, } } // Register 注册一个会话到心跳监控. func (hs *HeartbeatService) Register(sessionID string) { hs.mu.Lock() hs.sessions[sessionID] = time.Now() hs.mu.Unlock() } // Unregister 从心跳监控中移除会话(会话正常关闭时调用). func (hs *HeartbeatService) Unregister(sessionID string) { hs.mu.Lock() delete(hs.sessions, sessionID) hs.mu.Unlock() } // Beat 更新会话的最后活跃时间(收到任何客户端消息时调用). func (hs *HeartbeatService) Beat(sessionID string) { hs.mu.Lock() if _, exists := hs.sessions[sessionID]; exists { hs.sessions[sessionID] = time.Now() } hs.mu.Unlock() } // Start 启动心跳检测循环(非阻塞). func (hs *HeartbeatService) Start() { go hs.scanLoop() } // Stop 停止心跳检测. func (hs *HeartbeatService) Stop() { hs.cancel() } // scanLoop 周期性扫描所有会话,发现超时者调用 onTimeout. func (hs *HeartbeatService) scanLoop() { ticker := time.NewTicker(hs.cfg.Interval) defer ticker.Stop() for { select { case <-hs.ctx.Done(): return case <-ticker.C: hs.scan() } } } // scan 执行一次心跳扫描,O(n) 遍历所有会话. func (hs *HeartbeatService) scan() { now := time.Now() hs.mu.RLock() // 先收集超时会话(不在读锁内调用 onTimeout,避免死锁) var timedOut []string for id, lastActive := range hs.sessions { if now.Sub(lastActive) > hs.cfg.Timeout { timedOut = append(timedOut, id) } } hs.mu.RUnlock() for _, id := range timedOut { log.Printf("heartbeat: session %s timed out (last active > %v ago)", id, hs.cfg.Timeout) hs.Unregister(id) hs.onTimeout(id) } }