daemon

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: None detected not legal advice Imports: 0 Imported by: 0

Documentation

Overview

Package daemon 实现 Flyto Agent Platform 的多会话守护进程管理器.

DaemonManager 是引擎"服务模式"的核心:

  • 接受来自 BridgeTransport 的客户端连接
  • 管理引擎 Session 的生命周期(创建/恢复/关闭)
  • 控制并发会话数量(SessionPool)
  • 维持会话心跳,检测空闲超时
  • 崩溃恢复(会话异常结束后自动重连)

架构位置:

客户端 → BridgeTransport → DaemonManager → engine.Session → LLM

升华改进(ELEVATED): 早期方案 是 2999 行巨石文件, 会话管理/心跳/容量控制/崩溃恢复全部耦合在一起,无法单独测试. 我们将每个关注点拆分为独立类型(SessionPool/HeartbeatService/IdleTimer/ CrashRecovery),DaemonManager 是它们的组合器,每个子组件独立可测. 替代方案:<直接翻译早期方案的平铺结构> - 否决:2999 行文件在 Go 中同样不可测,同样是技术债.

Index

Constants

This section is empty.

Variables

View Source
var ErrPoolClosed = &daemonError{code: "pool_closed", msg: "会话池已关闭"}

ErrPoolClosed 表示会话池已关闭,不接受新请求.

Functions

This section is empty.

Types

type CrashRecovery

type CrashRecovery struct {
	// contains filtered or unexported fields
}

CrashRecovery 执行带退避的会话重试策略.

func NewCrashRecovery

func NewCrashRecovery(cfg CrashRecoveryConfig) *CrashRecovery

NewCrashRecovery 创建崩溃恢复执行器.

func (*CrashRecovery) RunWithRecovery

func (cr *CrashRecovery) RunWithRecovery(ctx context.Context, sessionID string, fn func() error) error

RunWithRecovery 执行 fn,在崩溃时按配置重试.

fn: 会话运行函数,返回 error 表示异常结束(nil = 正常结束,不重试). sessionID: 用于日志和回调. ctx: 上下文取消时停止重试.

精妙之处(CLEVER): 正常结束(fn 返回 nil)不触发重试-- 只有 fn 返回 non-nil error 才被视为崩溃.这防止了会话"主动结束" (用户说"退出",引擎正常完成任务)后被误重启.

type CrashRecoveryConfig

type CrashRecoveryConfig struct {
	// MaxRetries 最大重试次数,0 = 不重试,-1 = 无限重试(不推荐).
	MaxRetries int

	// InitialDelay 首次重试的等待时间,默认 1s.
	InitialDelay time.Duration

	// MaxDelay 退避延迟上限,默认 60s.
	// 防止指数增长后延迟过长(如第 10 次重试等待 512s).
	MaxDelay time.Duration

	// Multiplier 退避乘数,默认 2.0(标准指数退避).
	Multiplier float64

	// OnCrash 崩溃事件回调(用于接入 AuditSink 或告警系统).
	// sessionID: 崩溃的会话 ID
	// attempt: 第几次崩溃(从 1 开始)
	// err: 崩溃原因
	OnCrash func(sessionID string, attempt int, err error)

	// OnGiveUp 达到最大重试次数时回调.
	OnGiveUp func(sessionID string, totalAttempts int)
}

CrashRecoveryConfig 是崩溃恢复配置.

func DefaultCrashRecoveryConfig

func DefaultCrashRecoveryConfig() CrashRecoveryConfig

DefaultCrashRecoveryConfig 返回生产合理的崩溃恢复配置.

type DaemonConfig

type DaemonConfig struct {
	// MaxSessions 最大并发会话数,0 = 无限制(不推荐生产使用).
	MaxSessions int

	// IdleTimeout 会话空闲超时.超时后会话自动关闭.
	// 0 = 无超时(历史包袱(LEGACY): 早期方案固定 24h,我们改为可配置).
	// 推荐值:30 分钟(多数仓储场景操作员不会中途离开超过 30 分钟).
	IdleTimeout time.Duration

	// HeartbeatInterval 心跳检测间隔,0 = 不检测.
	HeartbeatInterval time.Duration

	// HeartbeatTimeout 心跳超时阈值.超过此时间无心跳视为连接断开.
	HeartbeatTimeout time.Duration

	// Isolation 会话隔离模式,默认 SharedIsolation.
	Isolation SessionIsolation

	// BaseDir 工作目录基础路径(IsolatedIsolation 会在此下创建子目录).
	BaseDir string

	// CrashRecovery 崩溃恢复配置,nil = 不自动恢复.
	CrashRecovery *CrashRecoveryConfig
}

DaemonConfig 是 DaemonManager 的配置.

type DaemonManager

type DaemonManager struct {
	// contains filtered or unexported fields
}

DaemonManager 管理多个并发的 Agent 会话.

func NewDaemonManager

func NewDaemonManager(
	eng *engine.Engine,
	transport bridge.BridgeTransport,
	cfg DaemonConfig,
) *DaemonManager

NewDaemonManager 创建 DaemonManager.

eng: 引擎实例,所有会话共享同一引擎(但每个会话有独立的 engine.Session). transport: 客户端连接来源(SSETransport/WebSocketTransport/...). cfg: Daemon 配置.

func (*DaemonManager) SessionCount

func (dm *DaemonManager) SessionCount() int

SessionCount 返回当前活跃会话数(监控用).

func (*DaemonManager) Shutdown

func (dm *DaemonManager) Shutdown(ctx context.Context) error

Shutdown 优雅关闭 DaemonManager. 停止接受新连接,等待现有会话完成或超时.

func (*DaemonManager) Start

func (dm *DaemonManager) Start()

Start 启动 DaemonManager,开始接受客户端连接. 非阻塞,返回后 DaemonManager 在后台运行.

type HeartbeatConfig

type HeartbeatConfig struct {
	// Interval 心跳检测间隔(扫描频率),默认 30s.
	Interval time.Duration

	// Timeout 心跳超时阈值.超过此时间无心跳视为连接断开,默认 90s.
	// 应为 Interval 的 3 倍左右,容忍偶发延迟.
	Timeout time.Duration
}

HeartbeatConfig 是心跳服务配置.

func DefaultHeartbeatConfig

func DefaultHeartbeatConfig() HeartbeatConfig

DefaultHeartbeatConfig 返回生产合理的心跳配置.

type HeartbeatService

type HeartbeatService struct {
	// contains filtered or unexported fields
}

HeartbeatService 统一管理所有会话的心跳检测.

精妙之处(CLEVER): HeartbeatService 不持有会话引用,而是持有回调-- 超时时调用 onTimeout(sessionID),DaemonManager 自己决定如何处理. 这样 HeartbeatService 与 DaemonManager 解耦,可以独立测试.

func NewHeartbeatService

func NewHeartbeatService(cfg HeartbeatConfig, onTimeout func(sessionID string)) *HeartbeatService

NewHeartbeatService 创建心跳服务. onTimeout: 当会话心跳超时时调用(通常是 DaemonManager.closeSession).

func (*HeartbeatService) Beat

func (hs *HeartbeatService) Beat(sessionID string)

Beat 更新会话的最后活跃时间(收到任何客户端消息时调用).

func (*HeartbeatService) Register

func (hs *HeartbeatService) Register(sessionID string)

Register 注册一个会话到心跳监控.

func (*HeartbeatService) Start

func (hs *HeartbeatService) Start()

Start 启动心跳检测循环(非阻塞).

func (*HeartbeatService) Stop

func (hs *HeartbeatService) Stop()

Stop 停止心跳检测.

func (*HeartbeatService) Unregister

func (hs *HeartbeatService) Unregister(sessionID string)

Unregister 从心跳监控中移除会话(会话正常关闭时调用).

type HeartbeatSession

type HeartbeatSession interface {
	// SessionID 返回会话 ID.
	SessionID() string
	// LastActiveAt 返回会话最后一次活跃的时间.
	LastActiveAt() time.Time
	// OnHeartbeatTimeout 是心跳超时回调,由 HeartbeatService 调用.
	OnHeartbeatTimeout(sessionID string)
}

HeartbeatSession 是 HeartbeatService 需要的会话接口. DaemonManager 实现此接口,将会话注册到心跳服务.

type IdleTimer

type IdleTimer struct {
	// contains filtered or unexported fields
}

IdleTimer 是可重置的空闲超时计时器.

精妙之处(CLEVER): 不使用 time.AfterFunc + 手动 Stop/Reset 的组合-- AfterFunc 的 Reset 有竞争条件(Reset 和回调并发触发时行为不确定). 我们改用独立 goroutine + channel 信号控制:

  • reset channel 接收重置信号,goroutine 重新开始计时
  • stop channel 接收停止信号,goroutine 退出

这是 Go 中实现"可重置定时器"的标准模式(避免 time.Timer Reset race).

func NewIdleTimer

func NewIdleTimer(timeout time.Duration, onIdle func()) *IdleTimer

NewIdleTimer 创建空闲计时器并立即启动.

timeout: 空闲超时时长(无活动多久后触发). onIdle: 超时后的回调(在后台 goroutine 中执行,不可重入).

func (*IdleTimer) Reset

func (t *IdleTimer) Reset()

Reset 重置计时器(有活动时调用). 安全地从任意 goroutine 调用,非阻塞.

精妙之处(CLEVER): resetCh 缓冲为 1--如果上一次 Reset 的信号还没被消费, 新的 Reset 不会再写入(select default 丢弃).这防止了大量并发活动导致 resetCh 堆积,同时保证"有活动"这一语义被正确传达. 两次快速 Reset 和一次 Reset 对计时器的效果是相同的.

func (*IdleTimer) Stop

func (t *IdleTimer) Stop()

Stop 永久停止计时器,回调不会再被触发.幂等.

type IsolatedIsolation

type IsolatedIsolation struct {
	SessionsDir string // 存放所有会话目录的根目录,默认 <baseDir>/sessions
}

IsolatedIsolation 为每个会话创建独立的工作目录(沙箱子目录). 适用场景:多用户 SaaS,飞驼多操作员并发,不同项目的并行会话.

工作目录结构:

baseDir/
  sessions/
    <sessionID>/   ← 每个会话的独立目录
      workdir/     ← Agent 实际工作目录(写操作限制在此范围内)

升华改进(ELEVATED): 早期方案没有工作目录隔离--多用户场景下不同 Agent 会话 可能互相覆盖彼此的文件(同一目录并发写入). 我们为每个隔离会话创建独立的 workdir,配合引擎的 Cwd 配置实现路径限制. 替代方案:<chroot 或 namespace> - 否决:需要 root 权限,跨平台兼容性差; 应用层路径限制(Cwd + 引擎路径校验)安全性足够且零特权.

func (*IsolatedIsolation) Name

func (iso *IsolatedIsolation) Name() string

func (*IsolatedIsolation) Setup

func (iso *IsolatedIsolation) Setup(_ context.Context, sessionID string, baseDir string) (string, error)

func (*IsolatedIsolation) Teardown

func (iso *IsolatedIsolation) Teardown(_ context.Context, sessionID string) error

type IsolationMode

type IsolationMode string

IsolationMode 是隔离模式的枚举类型.

const (
	// IsolationShared 共享模式,不隔离(单用户/开发环境).
	IsolationShared IsolationMode = "shared"
	// IsolationIsolated 隔离模式,独立工作目录(多用户/生产环境).
	IsolationIsolated IsolationMode = "isolated"
)

type ManagedSession

type ManagedSession struct {
	// contains filtered or unexported fields
}

ManagedSession 是被 DaemonManager 管理的会话状态.

type SessionIsolation

type SessionIsolation interface {
	// Setup 准备会话运行环境.
	// 返回实际的工作目录(可能与传入的 baseDir 不同,如 Worktree 模式).
	Setup(ctx context.Context, sessionID string, baseDir string) (workDir string, err error)

	// Teardown 清理会话运行环境.
	// ctx 已超时/取消时 Teardown 仍需执行(使用 context.Background() 或超时).
	Teardown(ctx context.Context, sessionID string) error

	// Name 返回隔离模式名称(用于日志和监控).
	Name() string
}

SessionIsolation 定义会话的环境隔离策略.

Setup 在会话启动前调用,Teardown 在会话结束后调用(无论是否出错). 每个会话独立的 SessionIsolation 实例,不跨会话共享.

func NewIsolation

func NewIsolation(mode IsolationMode) SessionIsolation

NewIsolation 根据模式创建对应的 SessionIsolation 实现.

type SessionPool

type SessionPool struct {
	// contains filtered or unexported fields
}

SessionPool 管理会话槽位,带阻塞的 Acquire 和高效的 Release 唤醒机制.

func NewSessionPool

func NewSessionPool(max int) *SessionPool

NewSessionPool 创建会话池. max <= 0 表示无限制(适合开发环境,生产建议设置具体值).

func (*SessionPool) Acquire

func (p *SessionPool) Acquire(ctx context.Context) error

Acquire 申请一个会话槽位,阻塞直到有槽位可用或 ctx 取消.

精妙之处(CLEVER): 获取 wake channel 的快照后释放锁,再在锁外 select-- 这样 Release() 可以在 Acquire() select 时安全地 close(wake) + 重建. 如果在持锁状态下 select,Release() 无法获取锁,造成死锁. 这是 Go 中"锁内取 channel 快照,锁外 select"的标准模式.

func (*SessionPool) Active

func (p *SessionPool) Active() int

Active 返回当前活跃会话数(调试/监控用).

func (*SessionPool) Close

func (p *SessionPool) Close()

Close 关闭会话池,让所有阻塞的 Acquire 立即返回 ErrPoolClosed.

func (*SessionPool) Max

func (p *SessionPool) Max() int

Max 返回最大并发会话数(0 = 无限制).

func (*SessionPool) Release

func (p *SessionPool) Release()

Release 释放一个会话槽位,唤醒所有等待的 Acquire.

精妙之处(CLEVER): 用 close(wake) + 重建新 channel 的方式广播唤醒-- close 会唤醒所有正在 <-wake 等待的 goroutine, 然后重建新 channel 供下次 Release 使用(不能重复 close 已关闭的 channel). 所有被唤醒的 goroutine 重试 Acquire,最终只有一个成功(最多空转一圈). 替代方案:<每次 Release 只唤醒一个等待者> - 否决:需要额外的等待者队列维护, 且在多 CPU 下广播 + 竞争比手动队列更高效(无单点锁竞争).

type SharedIsolation

type SharedIsolation struct{}

SharedIsolation 不做任何隔离,所有会话共享同一工作目录和进程环境. 适用场景:单用户本地开发,CLI 模式,测试环境.

精妙之处(CLEVER): 即使是"无隔离"也实现 SessionIsolation 接口-- DaemonManager 代码路径统一,不需要 if isolation == nil 的特殊处理. 这是 Null Object Pattern 的应用.

func (SharedIsolation) Name

func (SharedIsolation) Name() string

func (SharedIsolation) Setup

func (SharedIsolation) Setup(_ context.Context, _ string, baseDir string) (string, error)

func (SharedIsolation) Teardown

func (SharedIsolation) Teardown(_ context.Context, _ string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL