Documentation
¶
Overview ¶
Package daemon 实现 Flyto Agent Platform 的多会话守护进程管理器.
DaemonManager 是引擎"服务模式"的核心:
- 接受来自 BridgeTransport 的客户端连接
- 管理引擎 Session 的生命周期(创建/恢复/关闭)
- 控制并发会话数量(SessionPool)
- 维持会话心跳,检测空闲超时
- 崩溃恢复(会话异常结束后自动重连)
架构位置:
客户端 → BridgeTransport → DaemonManager → engine.Session → LLM
升华改进(ELEVATED): 早期方案 是 2999 行巨石文件, 会话管理/心跳/容量控制/崩溃恢复全部耦合在一起,无法单独测试. 我们将每个关注点拆分为独立类型(SessionPool/HeartbeatService/IdleTimer/ CrashRecovery),DaemonManager 是它们的组合器,每个子组件独立可测. 替代方案:<直接翻译早期方案的平铺结构> - 否决:2999 行文件在 Go 中同样不可测,同样是技术债.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrPoolClosed = &daemonError{code: "pool_closed", msg: "会话池已关闭"}
ErrPoolClosed 表示会话池已关闭,不接受新请求.
Functions ¶
This section is empty.
Types ¶
type CrashRecovery ¶
type CrashRecovery struct {
// contains filtered or unexported fields
}
CrashRecovery 执行带退避的会话重试策略.
func NewCrashRecovery ¶
func NewCrashRecovery(cfg CrashRecoveryConfig) *CrashRecovery
NewCrashRecovery 创建崩溃恢复执行器.
func (*CrashRecovery) RunWithRecovery ¶
func (cr *CrashRecovery) RunWithRecovery(ctx context.Context, sessionID string, fn func() error) error
RunWithRecovery 执行 fn,在崩溃时按配置重试.
fn: 会话运行函数,返回 error 表示异常结束(nil = 正常结束,不重试). sessionID: 用于日志和回调. ctx: 上下文取消时停止重试.
精妙之处(CLEVER): 正常结束(fn 返回 nil)不触发重试-- 只有 fn 返回 non-nil error 才被视为崩溃.这防止了会话"主动结束" (用户说"退出",引擎正常完成任务)后被误重启.
type CrashRecoveryConfig ¶
type CrashRecoveryConfig struct {
// MaxRetries 最大重试次数,0 = 不重试,-1 = 无限重试(不推荐).
MaxRetries int
// InitialDelay 首次重试的等待时间,默认 1s.
InitialDelay time.Duration
// MaxDelay 退避延迟上限,默认 60s.
// 防止指数增长后延迟过长(如第 10 次重试等待 512s).
MaxDelay time.Duration
// Multiplier 退避乘数,默认 2.0(标准指数退避).
Multiplier float64
// OnCrash 崩溃事件回调(用于接入 AuditSink 或告警系统).
// sessionID: 崩溃的会话 ID
// attempt: 第几次崩溃(从 1 开始)
// err: 崩溃原因
OnCrash func(sessionID string, attempt int, err error)
// OnGiveUp 达到最大重试次数时回调.
OnGiveUp func(sessionID string, totalAttempts int)
}
CrashRecoveryConfig 是崩溃恢复配置.
func DefaultCrashRecoveryConfig ¶
func DefaultCrashRecoveryConfig() CrashRecoveryConfig
DefaultCrashRecoveryConfig 返回生产合理的崩溃恢复配置.
type DaemonConfig ¶
type DaemonConfig struct {
// MaxSessions 最大并发会话数,0 = 无限制(不推荐生产使用).
MaxSessions int
// IdleTimeout 会话空闲超时.超时后会话自动关闭.
// 0 = 无超时(历史包袱(LEGACY): 早期方案固定 24h,我们改为可配置).
// 推荐值:30 分钟(多数仓储场景操作员不会中途离开超过 30 分钟).
IdleTimeout time.Duration
// HeartbeatInterval 心跳检测间隔,0 = 不检测.
HeartbeatInterval time.Duration
// HeartbeatTimeout 心跳超时阈值.超过此时间无心跳视为连接断开.
HeartbeatTimeout time.Duration
// Isolation 会话隔离模式,默认 SharedIsolation.
Isolation SessionIsolation
// BaseDir 工作目录基础路径(IsolatedIsolation 会在此下创建子目录).
BaseDir string
// CrashRecovery 崩溃恢复配置,nil = 不自动恢复.
CrashRecovery *CrashRecoveryConfig
}
DaemonConfig 是 DaemonManager 的配置.
type DaemonManager ¶
type DaemonManager struct {
// contains filtered or unexported fields
}
DaemonManager 管理多个并发的 Agent 会话.
func NewDaemonManager ¶
func NewDaemonManager( eng *engine.Engine, transport bridge.BridgeTransport, cfg DaemonConfig, ) *DaemonManager
NewDaemonManager 创建 DaemonManager.
eng: 引擎实例,所有会话共享同一引擎(但每个会话有独立的 engine.Session). transport: 客户端连接来源(SSETransport/WebSocketTransport/...). cfg: Daemon 配置.
func (*DaemonManager) SessionCount ¶
func (dm *DaemonManager) SessionCount() int
SessionCount 返回当前活跃会话数(监控用).
func (*DaemonManager) Shutdown ¶
func (dm *DaemonManager) Shutdown(ctx context.Context) error
Shutdown 优雅关闭 DaemonManager. 停止接受新连接,等待现有会话完成或超时.
func (*DaemonManager) Start ¶
func (dm *DaemonManager) Start()
Start 启动 DaemonManager,开始接受客户端连接. 非阻塞,返回后 DaemonManager 在后台运行.
type HeartbeatConfig ¶
type HeartbeatConfig struct {
// Interval 心跳检测间隔(扫描频率),默认 30s.
Interval time.Duration
// Timeout 心跳超时阈值.超过此时间无心跳视为连接断开,默认 90s.
// 应为 Interval 的 3 倍左右,容忍偶发延迟.
Timeout time.Duration
}
HeartbeatConfig 是心跳服务配置.
func DefaultHeartbeatConfig ¶
func DefaultHeartbeatConfig() HeartbeatConfig
DefaultHeartbeatConfig 返回生产合理的心跳配置.
type HeartbeatService ¶
type HeartbeatService struct {
// contains filtered or unexported fields
}
HeartbeatService 统一管理所有会话的心跳检测.
精妙之处(CLEVER): HeartbeatService 不持有会话引用,而是持有回调-- 超时时调用 onTimeout(sessionID),DaemonManager 自己决定如何处理. 这样 HeartbeatService 与 DaemonManager 解耦,可以独立测试.
func NewHeartbeatService ¶
func NewHeartbeatService(cfg HeartbeatConfig, onTimeout func(sessionID string)) *HeartbeatService
NewHeartbeatService 创建心跳服务. onTimeout: 当会话心跳超时时调用(通常是 DaemonManager.closeSession).
func (*HeartbeatService) Beat ¶
func (hs *HeartbeatService) Beat(sessionID string)
Beat 更新会话的最后活跃时间(收到任何客户端消息时调用).
func (*HeartbeatService) Register ¶
func (hs *HeartbeatService) Register(sessionID string)
Register 注册一个会话到心跳监控.
func (*HeartbeatService) Unregister ¶
func (hs *HeartbeatService) Unregister(sessionID string)
Unregister 从心跳监控中移除会话(会话正常关闭时调用).
type HeartbeatSession ¶
type HeartbeatSession interface {
// SessionID 返回会话 ID.
SessionID() string
// LastActiveAt 返回会话最后一次活跃的时间.
LastActiveAt() time.Time
// OnHeartbeatTimeout 是心跳超时回调,由 HeartbeatService 调用.
OnHeartbeatTimeout(sessionID string)
}
HeartbeatSession 是 HeartbeatService 需要的会话接口. DaemonManager 实现此接口,将会话注册到心跳服务.
type IdleTimer ¶
type IdleTimer struct {
// contains filtered or unexported fields
}
IdleTimer 是可重置的空闲超时计时器.
精妙之处(CLEVER): 不使用 time.AfterFunc + 手动 Stop/Reset 的组合-- AfterFunc 的 Reset 有竞争条件(Reset 和回调并发触发时行为不确定). 我们改用独立 goroutine + channel 信号控制:
- reset channel 接收重置信号,goroutine 重新开始计时
- stop channel 接收停止信号,goroutine 退出
这是 Go 中实现"可重置定时器"的标准模式(避免 time.Timer Reset race).
func NewIdleTimer ¶
NewIdleTimer 创建空闲计时器并立即启动.
timeout: 空闲超时时长(无活动多久后触发). onIdle: 超时后的回调(在后台 goroutine 中执行,不可重入).
type IsolatedIsolation ¶
type IsolatedIsolation struct {
SessionsDir string // 存放所有会话目录的根目录,默认 <baseDir>/sessions
}
IsolatedIsolation 为每个会话创建独立的工作目录(沙箱子目录). 适用场景:多用户 SaaS,飞驼多操作员并发,不同项目的并行会话.
工作目录结构:
baseDir/
sessions/
<sessionID>/ ← 每个会话的独立目录
workdir/ ← Agent 实际工作目录(写操作限制在此范围内)
升华改进(ELEVATED): 早期方案没有工作目录隔离--多用户场景下不同 Agent 会话 可能互相覆盖彼此的文件(同一目录并发写入). 我们为每个隔离会话创建独立的 workdir,配合引擎的 Cwd 配置实现路径限制. 替代方案:<chroot 或 namespace> - 否决:需要 root 权限,跨平台兼容性差; 应用层路径限制(Cwd + 引擎路径校验)安全性足够且零特权.
func (*IsolatedIsolation) Name ¶
func (iso *IsolatedIsolation) Name() string
type IsolationMode ¶
type IsolationMode string
IsolationMode 是隔离模式的枚举类型.
const ( IsolationShared IsolationMode = "shared" // IsolationIsolated 隔离模式,独立工作目录(多用户/生产环境). IsolationIsolated IsolationMode = "isolated" )
type ManagedSession ¶
type ManagedSession struct {
// contains filtered or unexported fields
}
ManagedSession 是被 DaemonManager 管理的会话状态.
type SessionIsolation ¶
type SessionIsolation interface {
// Setup 准备会话运行环境.
// 返回实际的工作目录(可能与传入的 baseDir 不同,如 Worktree 模式).
Setup(ctx context.Context, sessionID string, baseDir string) (workDir string, err error)
// Teardown 清理会话运行环境.
// ctx 已超时/取消时 Teardown 仍需执行(使用 context.Background() 或超时).
Teardown(ctx context.Context, sessionID string) error
// Name 返回隔离模式名称(用于日志和监控).
Name() string
}
SessionIsolation 定义会话的环境隔离策略.
Setup 在会话启动前调用,Teardown 在会话结束后调用(无论是否出错). 每个会话独立的 SessionIsolation 实例,不跨会话共享.
func NewIsolation ¶
func NewIsolation(mode IsolationMode) SessionIsolation
NewIsolation 根据模式创建对应的 SessionIsolation 实现.
type SessionPool ¶
type SessionPool struct {
// contains filtered or unexported fields
}
SessionPool 管理会话槽位,带阻塞的 Acquire 和高效的 Release 唤醒机制.
func NewSessionPool ¶
func NewSessionPool(max int) *SessionPool
NewSessionPool 创建会话池. max <= 0 表示无限制(适合开发环境,生产建议设置具体值).
func (*SessionPool) Acquire ¶
func (p *SessionPool) Acquire(ctx context.Context) error
Acquire 申请一个会话槽位,阻塞直到有槽位可用或 ctx 取消.
精妙之处(CLEVER): 获取 wake channel 的快照后释放锁,再在锁外 select-- 这样 Release() 可以在 Acquire() select 时安全地 close(wake) + 重建. 如果在持锁状态下 select,Release() 无法获取锁,造成死锁. 这是 Go 中"锁内取 channel 快照,锁外 select"的标准模式.
func (*SessionPool) Close ¶
func (p *SessionPool) Close()
Close 关闭会话池,让所有阻塞的 Acquire 立即返回 ErrPoolClosed.
func (*SessionPool) Release ¶
func (p *SessionPool) Release()
Release 释放一个会话槽位,唤醒所有等待的 Acquire.
精妙之处(CLEVER): 用 close(wake) + 重建新 channel 的方式广播唤醒-- close 会唤醒所有正在 <-wake 等待的 goroutine, 然后重建新 channel 供下次 Release 使用(不能重复 close 已关闭的 channel). 所有被唤醒的 goroutine 重试 Acquire,最终只有一个成功(最多空转一圈). 替代方案:<每次 Release 只唤醒一个等待者> - 否决:需要额外的等待者队列维护, 且在多 CPU 下广播 + 竞争比手动队列更高效(无单点锁竞争).
type SharedIsolation ¶
type SharedIsolation struct{}
SharedIsolation 不做任何隔离,所有会话共享同一工作目录和进程环境. 适用场景:单用户本地开发,CLI 模式,测试环境.
精妙之处(CLEVER): 即使是"无隔离"也实现 SessionIsolation 接口-- DaemonManager 代码路径统一,不需要 if isolation == nil 的特殊处理. 这是 Null Object Pattern 的应用.
func (SharedIsolation) Name ¶
func (SharedIsolation) Name() string