package daemon // session_pool.go - 会话容量控制,带高效的"容量唤醒"原语. // // 解决问题:DaemonManager 同时运行的会话数量必须有上限,防止 OOM 和 LLM API 配额耗尽. // 当容量已满时,新请求应等待(而非立即拒绝),直到有会话完成释放槽位. // // 升华改进(ELEVATED): 早期方案 用 capacityWake: (() => void) | null 实现-- // 一个可空函数指针,被 Release() 调用唤醒单个等待者. // Go 版本用 chan struct{} 替代:close(wakeCh) 一次性唤醒所有等待者, // 然后重建新 channel 供下次使用.这比单个函数指针更优雅, // 且支持多个并发等待者同时被唤醒后各自竞争(只有一个会成功获取槽位). // 替代方案: - 否决:Cond.Wait() 不支持 context,select 无法监听; // 需要额外的 goroutine 做 ctx 取消唤醒,代码复杂度增加. import ( "context" "sync" ) // SessionPool 管理会话槽位,带阻塞的 Acquire 和高效的 Release 唤醒机制. type SessionPool struct { mu sync.Mutex active int // 当前活跃会话数 max int // 最大并发会话数(0 = 无限制) wake chan struct{} // 关闭时唤醒所有 Acquire 等待者 closed bool } // NewSessionPool 创建会话池. // max <= 0 表示无限制(适合开发环境,生产建议设置具体值). func NewSessionPool(max int) *SessionPool { return &SessionPool{ max: max, wake: make(chan struct{}), } } // Acquire 申请一个会话槽位,阻塞直到有槽位可用或 ctx 取消. // // 精妙之处(CLEVER): 获取 wake channel 的快照后释放锁,再在锁外 select-- // 这样 Release() 可以在 Acquire() select 时安全地 close(wake) + 重建. // 如果在持锁状态下 select,Release() 无法获取锁,造成死锁. // 这是 Go 中"锁内取 channel 快照,锁外 select"的标准模式. func (p *SessionPool) Acquire(ctx context.Context) error { for { p.mu.Lock() if p.closed { p.mu.Unlock() return ErrPoolClosed } if p.max <= 0 || p.active < p.max { p.active++ p.mu.Unlock() return nil } // 容量已满:取 wake channel 快照,释放锁,等待唤醒 wake := p.wake p.mu.Unlock() select { case <-wake: // 有槽位释放,重新尝试获取 case <-ctx.Done(): return ctx.Err() } } } // Release 释放一个会话槽位,唤醒所有等待的 Acquire. // // 精妙之处(CLEVER): 用 close(wake) + 重建新 channel 的方式广播唤醒-- // close 会唤醒所有正在 <-wake 等待的 goroutine, // 然后重建新 channel 供下次 Release 使用(不能重复 close 已关闭的 channel). // 所有被唤醒的 goroutine 重试 Acquire,最终只有一个成功(最多空转一圈). // 替代方案:<每次 Release 只唤醒一个等待者> - 否决:需要额外的等待者队列维护, // 且在多 CPU 下广播 + 竞争比手动队列更高效(无单点锁竞争). func (p *SessionPool) Release() { p.mu.Lock() defer p.mu.Unlock() if p.active > 0 { p.active-- } // 广播唤醒所有等待者 old := p.wake p.wake = make(chan struct{}) close(old) } // Close 关闭会话池,让所有阻塞的 Acquire 立即返回 ErrPoolClosed. func (p *SessionPool) Close() { p.mu.Lock() defer p.mu.Unlock() if p.closed { return } p.closed = true close(p.wake) } // Active 返回当前活跃会话数(调试/监控用). func (p *SessionPool) Active() int { p.mu.Lock() defer p.mu.Unlock() return p.active } // Max 返回最大并发会话数(0 = 无限制). func (p *SessionPool) Max() int { return p.max } // ErrPoolClosed 表示会话池已关闭,不接受新请求. var ErrPoolClosed = &daemonError{code: "pool_closed", msg: "会话池已关闭"} // daemonError 是 daemon 包的结构化错误. type daemonError struct { code string msg string } func (e *daemonError) Error() string { return e.code + ": " + e.msg }