Documentation
¶
Overview ¶
Package engine implements the Flyto Agent engine: runtime loop, session management, tool dispatch, observability, context management, and the consumer-facing integration surfaces that platform / SaaS / CLI layers attach to.
Consumer integration surfaces fall into three shapes. For the full taxonomy (mechanism, when-to-use, quick chooser, why each shape cannot replace another), see `docs/api-reference.md` section "API 消费形态 / API Consumption Patterns".
Push (subscribe) — form 1:
- `<-chan flyto.Event` returned by Engine.Run / Session.Send (typed event stream consumers range and type-switch on)
- engine.Config.Observer (flyto.EventObserver) for internal diagnostic events such as `session_cost_threshold_crossed`, `compact_finished`, `section_cache_break`
Pull — form 2:
- Session.Stats() SessionStats
- Session.Title() / Messages() / Snapshot()
- FileStateCache.Stats() CacheStats
Synchronous callback — form 3:
- engine.Config.PermissionHandler (permission.Handler)
- engine.Config.ApprovalPolicy (ApprovalPolicy interface)
- engine.Config.ElicitationHandler (ElicitationHandler interface)
- engine.Config.AuditSink (security.AuditSink)
- engine.Config.Hooks (pkg/hooks.HookHandler via Manager)
Package engine 实现 Flyto Agent 引擎: 运行循环 / 会话管理 / 工具分发 / 可 观测性 / 上下文管理, 以及平台/SaaS/CLI 接入的消费者接口面.
消费者接入面分三种形态. 完整分类 (机制 / 何时用 / 选型速查 / 三者不互相 替代的原因) 见 `docs/api-reference.md` "API 消费形态 / API Consumption Patterns" 章节.
订阅 (push) —— 形态一:
- Engine.Run / Session.Send 返回的 `<-chan flyto.Event` (强类型事件流, 消费者 range + type-switch)
- engine.Config.Observer (flyto.EventObserver) 内部诊断事件, 如 `session_cost_threshold_crossed` / `compact_finished` / `section_cache_break`
调取 (pull) —— 形态二:
- Session.Stats() SessionStats
- Session.Title() / Messages() / Snapshot()
- FileStateCache.Stats() CacheStats
同步回调 (callback) —— 形态三:
- engine.Config.PermissionHandler (permission.Handler)
- engine.Config.ApprovalPolicy
- engine.Config.ElicitationHandler
- engine.Config.AuditSink (security.AuditSink)
- engine.Config.Hooks (经 Manager 的 pkg/hooks.HookHandler)
Package engine 是 Flyto Agent Engine 的顶层入口包.
这是整个系统的核心:可嵌入的 Agent 引擎. 任何消费层(CLI,SDK,HTTP Server,IDE 插件)都通过这个包与 Agent 交互.
设计原则:
- 零 UI 依赖:引擎不知道自己跑在终端还是 Web 服务里
- 流式优先:所有输出通过 channel 推送,消费者自己决定怎么展示
- 可组合:每个子系统(工具,权限,内存等)都是接口,可替换
Package engine - secret_store.go 实现凭据注册表.
问题背景 ¶
Agent 执行工具时常需要访问外部系统(数据库,API,云服务), 这些凭据(密码,Token,API Key)不应出现在:
- AI 看到的工具输出中(防止模型在后续提示词中泄露)
- 引擎推送给消费层的 Event 流中(防止日志/监控系统记录明文凭据)
- 内存文件中(SecretGuard 已覆盖这一层)
设计决策 ¶
SecretStore 只做两件事:
- 向 Bash 工具子进程注入 env var(工具通过 $NAME 引用凭据)
- 对所有工具输出做 value-level 脱敏(将明文替换为 [SECRET:name])
升华改进(ELEVATED): 早期实现 只有静态的 GHA_SUBPROCESS_SCRUB 白名单, 只覆盖 CI 环境的已知 env var 名,无法处理用户自定义凭据. 我们允许用户动态注册任意 secret,引擎自动在全部工具输出中脱敏. 替代方案:<要求调用方自己过滤 EventChannel 中的输出> - 否决:消费层无法感知所有输出路径(工具结果被送回模型的路径尤其隐蔽).
Package engine - token_budget.go 管理上下文窗口的 token 预算.
模块 7.4: Token 预算精细度.
三个核心问题:
- 当前上下文占了多少 token?(混合估算法)
- 还剩多少空间?(有效窗口多层计算)
- 什么时候该触发压缩/警告?(阈值体系)
精妙之处(CLEVER): 混合估算法--不纯估也不纯用 API 返回值. 找到最后一个有 usage 的 assistant 消息(API 返回的精确值)作为"锚点", 锚点之后的消息(用户输入,工具结果)用粗估(tokenizer.EstimateTokens)补充. 总量 = 锚点精确值 + 新增粗估. 这样永远不会大幅偏离真实值--粗估最多偏差几次工具调用的量.
替代方案:纯粗估(当前实现,4chars/token,误差可达 30%).
维护说明(MAINTENANCE, 2026-04-14): 本文件 535 行曾被 TODO L1225 列为 "6 大功能合体需拆分 token_estimator + token_window + token_warnings". 2026-04-14 audit 后否决,理由记录在此避免下次又被提起:
- 6 个功能组共享两个依赖 (modelRegistry + observer),它们不是 6 个抽象, 是同一个抽象 (TokenBudgetManager = "token 预算知识的持有者") 的 6 个入口.
- 扩展/修改成本和文件行数无关 (IDE 跳转 O(1)),真正成本是跨抽象边界跳转. 拆成 3 个文件反而引入 "新方法放哪个文件" 的决策负担 + 跨文件跳转.
- warnings 组 (CalculateWarningState) 严重依赖 window 组 (AutoCompactThreshold/EffectiveContextWindow),硬拆等于人为切断内部联系.
- "535 行太长" 是美学焦虑不是工程痛点.等到文件真长到 1000+ 行或出现 明显不属于本 struct 的第 7 组功能时,结构自然涌现的重构点才该动手.
真正的抽象债应沿 "同一处修改跨多个概念" 或 "新增场景绕开现有结构" 识别, 而非 "文件行数" 这个表层代理指标.
Index ¶
- Constants
- Variables
- func BuildConsolidationPrompt(memoryRoot, transcriptDir string, sessionIDs []string) string
- func BundleKeyFor(modelFamily, scenario string) agentctx.BundleKey
- func CleanupWorktree(repoRoot, worktreePath, branchName string, executor execenv.Executor) error
- func CreateWorktree(repoRoot, branchName string, executor execenv.Executor) (*WorktreeInfo, WorktreeCleanup, error)
- func DefaultAuditPath() (string, error)
- func DetectGitRepo(cwd string, executor execenv.Executor) (string, bool)
- func FilterSubAgentTools(allTools []tools.Tool) []tools.Tool
- func FormatErrorForDisplay(err error, verbose bool) string
- func FormatReminder(text string) string
- func FormatSkillsPrompt(skills []*Skill, maxChars int) string
- func GetBillingTokens(usage *query.Usage, pricing *config.ModelConfig) float64
- func GetFinalContextTokens(usage *query.Usage) int
- func GetTokenCountFromUsage(usage *query.Usage) int
- func IsRetryable(err error) bool
- func ListTranscripts(cwd string) ([]string, error)
- func NewEnterPlanModeTool(manager *PlanModeManager) tools.Tool
- func NewExitPlanModeTool(manager *PlanModeManager) tools.Tool
- func NormalizeMessagesForAPI(messages []query.Message) []query.Message
- func ParseContextError(errStr string) (actual, max int)
- func PickWarningCode(state *TokenWarningState) string
- func ReadLockMtime(path string) time.Time
- func RegisterBuiltinAgents(r *AgentRegistry)
- func ResumeConversation(ctx context.Context, eng *Engine, store SnapshotStore, convID string) (<-chan Event, error)
- func SaveTranscript(path string, sessionID string, model string, messages []query.Message, ...) error
- func SendPlanCommand(ctx context.Context, sockPath string, req planCmdRequest) (*planCmdResponse, error)
- func ShouldRetryOverloaded(source QuerySource) bool
- func ToolsToAllowedMap(toolList []tools.Tool) map[string]bool
- func TranscriptPath(cwd, sessionID string) string
- func UpdateTranscript(path string, sessionID string, model string, messages []query.Message, ...) error
- func WithEventEmitter(ctx context.Context, emit EventEmitter) context.Context
- type ActivityReason
- type ActivityTracker
- func (t *ActivityTracker) ActiveReasons() map[ActivityReason]int
- func (t *ActivityTracker) BusySince() time.Time
- func (t *ActivityTracker) Close()
- func (t *ActivityTracker) IsBusy() bool
- func (t *ActivityTracker) LastActivity() time.Time
- func (t *ActivityTracker) Refcount() int
- func (t *ActivityTracker) Start(reason ActivityReason)
- func (t *ActivityTracker) Stop(reason ActivityReason)
- type ActivityTrackerConfig
- type AgentDefLoader
- type AgentDefinition
- type AgentRegistry
- func (r *AgentRegistry) Get(agentType string) (*AgentDefinition, bool)
- func (r *AgentRegistry) List() []*AgentDefinition
- func (r *AgentRegistry) Register(def *AgentDefinition) error
- func (r *AgentRegistry) ResolveToolset(def *AgentDefinition, parentTools []string) []string
- func (r *AgentRegistry) SetGlobalDisallowed(toolNames []string)
- type ApprovalPolicy
- type AttachmentReorderer
- type AuditObserver
- type BufferedObserver
- type CacheStats
- type CheckpointEvent
- type CheckpointHandlerFn
- type CheckpointSuggestedEvent
- type CompactEvent
- type Complexity
- type CompositeObserver
- func (c *CompositeObserver) Error(err error, context map[string]any)
- func (c *CompositeObserver) Event(name string, data map[string]any)
- func (c *CompositeObserver) Metric(name string, value float64, tags map[string]string)
- func (c *CompositeObserver) Observers() []EventObserver
- func (c *CompositeObserver) SpanEnd(spanID string, err error)
- func (c *CompositeObserver) SpanStart(name string, tags map[string]string) string
- type Config
- type ConsecutiveRoleMerger
- type ContextWindowCalibrator
- type DoneEvent
- type DreamConfig
- type DreamEngine
- type DreamStatus
- type DreamTaskState
- type DreamTaskStore
- func (s *DreamTaskStore) AddFileTouched(id string, path string)
- func (s *DreamTaskStore) AddTurn(id string, turn DreamTurn)
- func (s *DreamTaskStore) Get(id string) *DreamTaskState
- func (s *DreamTaskStore) Latest() *DreamTaskState
- func (s *DreamTaskStore) Register(task *DreamTaskState)
- func (s *DreamTaskStore) SetError(id string, errMsg string)
- func (s *DreamTaskStore) Update(id string, status DreamStatus, phase string)
- type DreamTurn
- type ElicitationField
- type ElicitationHandler
- type ElicitationHandlerFunc
- type ElicitationRequest
- type ElicitationResponse
- type EmptyMessageFilter
- type Engine
- func (e *Engine) Activity() *ActivityTracker
- func (e *Engine) BuildAndStream(ctx context.Context, model string, maxTokens int, ...) (<-chan flyto.Event, error)
- func (e *Engine) Close() error
- func (e *Engine) Closed() bool
- func (e *Engine) Context() context.Context
- func (e *Engine) Cwd() string
- func (e *Engine) DisablePlugin(name string) error
- func (e *Engine) Dream() *DreamEngine
- func (e *Engine) EnablePlugin(name string) error
- func (e *Engine) ExecuteUndo(ctx context.Context, undo *tools.UndoInfo) error
- func (e *Engine) FileCache() *FileStateCache
- func (e *Engine) FileHistoryRef() *FileHistory
- func (e *Engine) FileHistoryView() FileHistoryView
- func (e *Engine) ForkSubAgent(cfg *SubAgentConfig) *SubAgent
- func (e *Engine) InboxServer() *inbox.UDSServer
- func (e *Engine) LoadPlugin(dir string) error
- func (e *Engine) Observer() EventObserver
- func (e *Engine) OperationLogRef() *OperationLog
- func (e *Engine) Redact(s string) string
- func (e *Engine) RegisterPromptBundle(key agentctx.BundleKey, bundle agentctx.PromptBundle)
- func (e *Engine) ResetSectionCache()
- func (e *Engine) ResultStoreRef() *ResultStore
- func (e *Engine) Rollback(ctx context.Context, messageID string) error
- func (e *Engine) Run(ctx context.Context, prompt string, opts ...RunOption) <-chan Event
- func (e *Engine) Session(id string) *Session
- func (e *Engine) SetSecret(name, value string) error
- func (e *Engine) SkillRegistry() *SkillRegistry
- func (e *Engine) SpawnSkillAgent(ctx context.Context, cfg *SubAgentConfig, prompt string) (string, error)
- func (e *Engine) StrictModeRef() *StrictMode
- func (e *Engine) TokenBudget() *TokenBudgetManager
- func (e *Engine) Tools() *tools.Registry
- type EngineBackend
- type EngineError
- type EngineRef
- type ErrorCode
- type ErrorContentStripper
- type ErrorEvent
- type Event
- type EventEmitter
- type EventObserver
- type ExecutionContext
- type FallbackConfig
- type FallbackTracker
- func (ft *FallbackTracker) CurrentModel() string
- func (ft *FallbackTracker) FallbackCount() int
- func (ft *FallbackTracker) OriginalModel() string
- func (ft *FallbackTracker) RecordFallback()
- func (ft *FallbackTracker) ShouldFallback(err error) (fallbackModel string, should bool)
- func (ft *FallbackTracker) WasFallback() bool
- type FileAgentDefLoader
- type FileBackup
- type FileCacheEntry
- type FileChangeChecker
- type FileHistory
- func (h *FileHistory) Backup(ctx context.Context, filePath string) error
- func (h *FileHistory) BeforeEdit(filePath string, messageID string) error
- func (h *FileHistory) BeforeWrite(filePath string, messageID string) error
- func (h *FileHistory) CanRollback(messageID string) (bool, []string)
- func (h *FileHistory) Prune(ctx context.Context, policy RetentionPolicy) error
- func (h *FileHistory) Rollback(messageID string) error
- func (h *FileHistory) SnapshotCount() int
- type FileHistoryView
- type FileLock
- type FilePlanQueue
- func (q *FilePlanQueue) Cancel(planID string) error
- func (q *FilePlanQueue) Close() error
- func (q *FilePlanQueue) List() ([]*QueuedPlan, error)
- func (q *FilePlanQueue) RecoverPending() error
- func (q *FilePlanQueue) Status(planID string) (*QueuedPlan, error)
- func (q *FilePlanQueue) Submit(steps []PlanStep, opts PlanSubmitOptions) (string, error)
- type FilePlanStore
- type FileScratchpad
- type FileSessionProvider
- type FileSnapshot
- type FileSnapshotStore
- type FileStateCache
- func (c *FileStateCache) Clear()
- func (c *FileStateCache) Get(path string) (*FileCacheEntry, bool)
- func (c *FileStateCache) Info(path string) (FileCacheEntry, bool)
- func (c *FileStateCache) IsModified(path string) bool
- func (c *FileStateCache) Peek(path string) (*FileCacheEntry, bool)
- func (c *FileStateCache) RecentFiles(limit int) []string
- func (c *FileStateCache) Record(path string, content []byte)
- func (c *FileStateCache) Stats() CacheStats
- type FlushGate
- type FuncApprovalPolicy
- type ImageValidator
- type InboxMessageEvent
- type InputProcessor
- type LocalAuditSink
- type ManagedSession
- type MemoryPlanStore
- type MessageNormalizer
- type MetricObserver
- type MigrateFunc
- type NoopApprovalPolicy
- type NoopElicitationHandler
- type NoopObserver
- type NormalizePipeline
- type OperationEntry
- type OperationLog
- func (l *OperationLog) EntryCount() int
- func (l *OperationLog) GetByMessage(messageID string) []*OperationEntry
- func (l *OperationLog) GetByMessageLocked(messageID string) []*OperationEntry
- func (l *OperationLog) Record(entry *OperationEntry)
- func (l *OperationLog) RollbackMessage(ctx context.Context, messageID string, executor UndoExecutor) error
- type OrphanThinkingFilter
- type OrphanToolResultRemover
- type PartialToolUse
- type PermissionHandler
- type PermissionLearnEvent
- type PermissionLearnRule
- type PermissionRequestEvent
- type PlanApprovalEvent
- type PlanCommandServer
- type PlanExecFunc
- type PlanModeManager
- func (m *PlanModeManager) AttachProgress(p *PlanProgress)
- func (m *PlanModeManager) Enter() error
- func (m *PlanModeManager) Exit(ctx context.Context, steps []PlanStep) (plan string, err error)
- func (m *PlanModeManager) IsActive() bool
- func (m *PlanModeManager) Progress() *PlanProgress
- func (m *PlanModeManager) SetSessionID(id string)
- type PlanProgress
- func (p *PlanProgress) FinishStep(stepID string, status StepStatus, errorMsg string) error
- func (p *PlanProgress) RegisterStep(step PlanStep) error
- func (p *PlanProgress) SetOnProgress(fn func(snapshot PlanProgressSnapshot))
- func (p *PlanProgress) SkipDependents(failedStepID string) []string
- func (p *PlanProgress) Snapshot() PlanProgressSnapshot
- func (p *PlanProgress) StartStep(stepID, agentID string) error
- type PlanProgressEvent
- type PlanProgressSnapshot
- func (s PlanProgressSnapshot) Duration() time.Duration
- func (s PlanProgressSnapshot) HasFailed() bool
- func (s PlanProgressSnapshot) IsComplete() bool
- func (s PlanProgressSnapshot) IsSuccess() bool
- func (s PlanProgressSnapshot) ProgressPercent() float64
- func (s PlanProgressSnapshot) ReadySteps() []StepProgress
- type PlanQueue
- type PlanStatus
- type PlanStep
- type PlanStore
- type PlanSubmitOptions
- type ProcessedInput
- type QueryChainTracking
- type QuerySource
- type QueuedPlan
- type ReminderSystem
- func (r *ReminderSystem) CheckDateChange() string
- func (r *ReminderSystem) CheckFileModifications() []string
- func (r *ReminderSystem) CheckMemoryRelevance(ctx context.Context, recentText string) string
- func (r *ReminderSystem) CollectReminders(ctx context.Context, currentMessages []query.Message, turnNumber int) []string
- type ResultStore
- type RetentionPolicy
- type RunOption
- func WithBundleKey(key agentctx.BundleKey) RunOption
- func WithCheckpointHandler(fn CheckpointHandlerFn) RunOption
- func WithMaxTurns(n int) RunOption
- func WithMessages(msgs []query.Message) RunOption
- func WithModel(model string) RunOption
- func WithSecret(name, value string) RunOption
- func WithSessionHooks(m *hooks.Manager) RunOption
- type Scratchpad
- type SecretStore
- type Session
- func (s *Session) Close()
- func (s *Session) CreatedAt() time.Time
- func (s *Session) ID() string
- func (s *Session) IsClosed() bool
- func (s *Session) LastActiveAt() time.Time
- func (s *Session) Messages() []query.Message
- func (s *Session) ResolvePermission(requestID string, allow bool)
- func (s *Session) Send(ctx context.Context, prompt string, opts ...RunOption) <-chan Event
- func (s *Session) SetTitle(title string)
- func (s *Session) Stats() SessionStats
- func (s *Session) Title() string
- func (s *Session) WaitForPermission(ctx context.Context, requestID string) bool
- type SessionInfo
- type SessionInfoEvent
- type SessionManager
- func (sm *SessionManager) Close()
- func (sm *SessionManager) CreateSession(id string) (*Session, error)
- func (sm *SessionManager) GetSession(id string) (*Session, error)
- func (sm *SessionManager) ListSessions() []SessionInfo
- func (sm *SessionManager) RemoveSession(id string)
- func (sm *SessionManager) ResumeSession(transcriptPath string) (*Session, error)
- func (sm *SessionManager) SessionCount() int
- func (sm *SessionManager) SetSessionTitle(id, title string)
- func (sm *SessionManager) UpdateSessionInfo(id string, inputTokens, outputTokens int, costUSD float64)
- type SessionManagerConfig
- type SessionProvider
- type SessionSnapshot
- type SessionStats
- type Skill
- type SkillInvokeResult
- type SkillRegistry
- func (r *SkillRegistry) Get(name string) (*Skill, bool)
- func (r *SkillRegistry) Invoke(ctx context.Context, name, args, sessionID string) (*SkillInvokeResult, error)
- func (r *SkillRegistry) InvokeSkill(ctx context.Context, name, args string) (*builtin.SkillResult, error)
- func (r *SkillRegistry) List(filters ...func(*Skill) bool) []*Skill
- func (r *SkillRegistry) ListForCwd(cwd string, filters ...func(*Skill) bool) []*Skill
- func (r *SkillRegistry) ListSkillEntries() []*builtin.SkillEntryDesc
- func (r *SkillRegistry) Register(s *Skill) error
- func (r *SkillRegistry) RegisterAll(skills []*Skill)
- func (r *SkillRegistry) RegisterBuiltin(s *Skill)
- func (r *SkillRegistry) SetAgentRegistry(ar *AgentRegistry)
- func (r *SkillRegistry) SetObserver(o EventObserver)
- func (r *SkillRegistry) SetParentToolNames(f func() []string)
- type SkillSpawner
- type SlashCommand
- type SlashCommandEvent
- type SnapshotStore
- type StderrObserver
- type StepExecStatus
- type StepProgress
- type StepStatus
- type StrictMode
- func (s *StrictMode) Check(condition string, enabled bool, observer EventObserver, detail string)
- func (s *StrictMode) CheckCompactFailure(observer EventObserver, detail string)
- func (s *StrictMode) CheckNormalizerError(observer EventObserver, detail string)
- func (s *StrictMode) CheckToolResultPairing(observer EventObserver, detail string)
- type SubAgent
- func (sa *SubAgent) Cancel()
- func (sa *SubAgent) GetResult() (string, error)
- func (sa *SubAgent) GetStatus() SubAgentStatus
- func (sa *SubAgent) Run(ctx context.Context, prompt string) <-chan Event
- func (sa *SubAgent) RunBackground(prompt string)
- func (sa *SubAgent) RunSync(ctx context.Context, prompt string) (string, error)
- func (sa *SubAgent) RunSyncWithCallback(ctx context.Context, prompt string, ...) (string, error)
- func (sa *SubAgent) Wait() error
- type SubAgentConfig
- type SubAgentEndEvent
- type SubAgentEvent
- type SubAgentProgress
- type SubAgentRegistry
- func (r *SubAgentRegistry) CancelAll()
- func (r *SubAgentRegistry) Close()
- func (r *SubAgentRegistry) Count() int
- func (r *SubAgentRegistry) Get(id string) (*SubAgent, bool)
- func (r *SubAgentRegistry) List() []*SubAgent
- func (r *SubAgentRegistry) ListByStatus(status SubAgentStatus) []*SubAgent
- func (r *SubAgentRegistry) OnComplete(fn func(sa *SubAgent))
- func (r *SubAgentRegistry) Register(sa *SubAgent) error
- func (r *SubAgentRegistry) Remove(id string) bool
- func (r *SubAgentRegistry) Summary() map[SubAgentStatus]int
- type SubAgentStartEvent
- type SubAgentStatus
- type Team
- type TeamConfig
- type TextDeltaEvent
- type TextEvent
- type ThinkingDeltaEvent
- type ThinkingEvent
- type ThinkingOptions
- type TokenBudgetManager
- func (m *TokenBudgetManager) AutoCompactThreshold(model string) int
- func (m *TokenBudgetManager) AutoCompactThresholdWithThinking(model string, thinkingBudget int) int
- func (m *TokenBudgetManager) CalculateWarningState(tokenUsage int, model string) *TokenWarningState
- func (m *TokenBudgetManager) EffectiveContextWindow(model string) int
- func (m *TokenBudgetManager) EffectiveContextWindowWithThinking(model string, thinkingBudget int) int
- func (m *TokenBudgetManager) EstimateCurrentUsage(messages []query.Message) int
- func (m *TokenBudgetManager) ManualCompactThreshold(model string) int
- func (m *TokenBudgetManager) OnModelSwitch(oldModel, newModel string, currentUsage int) *TokenWarningState
- type TokenWarningState
- type ToolProgressEvent
- type ToolResultEvent
- type ToolResultPairingNormalizer
- type ToolSummaryEvent
- type ToolSummaryGenerator
- type ToolUseEvent
- type ToolUseInfo
- type ToolUseInputNormalizer
- type TraceObserver
- type Transcript
- type TranscriptStats
- type TurnEndEvent
- type TurnStartEvent
- type UndoExecutor
- type WarningEvent
- type WhitespaceAssistantFilter
- type WorkerResult
- type WorkerSpec
- type WorktreeCleanup
- type WorktreeInfo
Constants ¶
const ( WarnTokenUsageHigh = flyto.WarnTokenUsageHigh WarnBudgetNearLimit = flyto.WarnBudgetNearLimit WarnSessionExpiring = flyto.WarnSessionExpiring )
const ( // MaxInlineResultChars 是内联结果的最大字符数. // 超过此阈值的工具输出将被存到磁盘. MaxInlineResultChars = 30000 // TruncatedPreviewChars 是截断时保留的预览字符数. // 模型会看到前 N 个字符 + 磁盘路径提示. TruncatedPreviewChars = 5000 )
const ( // AutoCompactBufferTokens 自动压缩前的缓冲空间(token 数). // 压缩阈值 = 有效窗口 - 此缓冲.留出空间给新的用户消息和模型回复. AutoCompactBufferTokens = 13_000 // WarningThresholdBuffer 黄色警告缓冲(token 数). // 当剩余空间 < 此值时触发黄色警告(提示用户上下文快满了). WarningThresholdBuffer = 20_000 // ErrorThresholdBuffer 红色警告缓冲(token 数). // 当剩余空间 < 此值时触发红色警告(紧急提醒,可能即将溢出). // 历史包袱(LEGACY): 与 WarningThresholdBuffer 相同值看起来没意义, // 但红色警告阈值是在有效窗口扣除 WarningThresholdBuffer 之后再减去此值, // 形成两层递进:黄色(80%)→红色(90%)→阻塞(100%). ErrorThresholdBuffer = 20_000 // ManualCompactBuffer 手动压缩的缓冲(token 数). // 用户手动触发 /compact 时,使用更激进的阈值. ManualCompactBuffer = 3_000 // MaxOutputTokensForSummary 压缩摘要的输出预留(token 数). // 压缩时需要为摘要输出预留空间,从有效窗口中扣除. MaxOutputTokensForSummary = 20_000 // MinEffectiveWindowFloor 有效窗口/阈值的最小下限(token 数). // 即使模型窗口很小或扣减项很多,也保证至少 10K 可用空间,避免阈值坍缩为 0 或负数. // 集中出现在 AutoCompactThreshold,ManualCompactThreshold, // EffectiveContextWindowWithThinking,AutoCompactThresholdWithThinking 四处. MinEffectiveWindowFloor = 10_000 // DefaultMaxOutputTokens 当模型配置缺失 MaxOutputTokens 时的默认输出上限. // 16K 是 Claude 3.5 一代的默认值,对未知模型保守兼容. DefaultMaxOutputTokens = 16_384 )
Token 预算常量.
const DefaultMaxImageSizeBytes int64 = 20 * 1024 * 1024
DefaultMaxImageSizeBytes 默认图片大小限制:20MB. Anthropic API 限制为 20MB(base64 编码后).
const IndexFile = "MEMORY.md"
IndexFile 是记忆目录的入口索引文件名. 与 pkg/memory 包的 UpdateIndex 逻辑保持一致(同文件名).
const MaxIndexLines = 200
MaxIndexLines 是索引文件的行数上限(超过则截断). 与 memory.TruncateIndex 的 maxIndexLines 常量保持一致:200 行 / 25KB.
const MaxSkillForkDepth = 2
MaxSkillForkDepth 最大 Skill fork 嵌套深度.
精妙之处(CLEVER): 限制为 2(而非 1)允许一次合理的嵌套:
review-pr Skill (depth=1) → 调用 run-tests Skill (depth=2) → 不允许再 fork
深度 0 = 顶层 LLM 直接调用 SkillTool. 替代方案:<限制为 1(完全禁止嵌套)> - 否决:会禁止 "review-pr 调 run-tests" 这类合理场景.
Variables ¶
var DefaultErrorPatterns = map[string][]query.ContentType{ "too large": {query.ContentDocument, query.ContentImage}, "password protected": {query.ContentDocument}, "invalid format": {query.ContentDocument}, "request too large": {query.ContentDocument, query.ContentImage}, }
DefaultErrorPatterns 默认错误模式 → 需要剥离的内容块类型.
key: 错误消息中的关键词(小写) value: 匹配时需要剥离的内容块类型列表
var DefaultGlobalDisallowedTools = []string{
"Agent",
"ExitPlanMode",
"EnterPlanMode",
"AskUserQuestion",
"TaskOutput",
"TaskStop",
}
DefaultGlobalDisallowedTools 是生产环境中所有子 Agent 默认禁用的工具集合.
升华改进(ELEVATED): 早期实现的 ALL_AGENT_DISALLOWED_TOOLS 包含 7 个工具,硬编码在 filterToolsForAgent() 函数内. 我们提取为可覆盖的包级变量--NewAgentRegistry() 用它初始化默认值, SDK 消费层可通过 SetGlobalDisallowed() 替换为完全不同的集合. 跨行业影响:测试环境可传入空切片,医疗合规场景可追加私有工具. 替代方案:<硬编码在 NewAgentRegistry() 内部> - 否决:测试/SDK 无法覆盖.
var DefaultInternalInputFields = []string{
"plan",
"caller",
"_internal",
"_debug",
}
DefaultInternalInputFields 默认需要剥离的内部字段名. 这些字段是 engine 内部使用的,不应发送给 API.
var DefaultRetentionPolicy = RetentionPolicy{
MaxAgeDays: 30,
MaxVersions: 50,
}
DefaultRetentionPolicy 是 Engine.Close() 使用的默认保留策略.
精妙之处(CLEVER): 30 天 + 50 版本是经验值-- 30 天覆盖一个迭代周期,足以回滚任何合理的"最近修改"; 50 版本限制单文件备份上限,防止频繁修改的文件(如日志,配置)无限累积. 两个条件 OR:任意一个满足就删除--更激进,磁盘占用更可控.
var ErrPlanNotFound = fmt.Errorf("plan queue: plan not found")
ErrPlanNotFound 计划 ID 不存在.
var ErrPlanQueueFull = fmt.Errorf("plan queue: queue is full")
ErrPlanQueueFull 队列已满,拒绝新提交.
var ErrPlanRejected = fmt.Errorf("plan_mode: plan rejected by user")
ErrPlanRejected 表示用户拒绝了计划(不是系统错误).
var ErrPlanTerminal = fmt.Errorf("plan queue: plan already in terminal state")
ErrPlanTerminal 计划已处于终态,无法取消.
Functions ¶
func BuildConsolidationPrompt ¶
BuildConsolidationPrompt 构建发送给 Dream SubAgent 的四阶段巩固提示词.
参数:
- memoryRoot:记忆目录的绝对路径(SubAgent 的 Edit/Write 限于此目录)
- transcriptDir:会话 transcript 文件目录(空 = 跳过 Grep transcript 提示)
- sessionIDs:自上次巩固以来的会话 ID 列表(可选,用于提示词 hint;nil = 不提供)
SubAgent 使用的工具:
- Bash(只读命令:ls/find/grep/cat/stat/head/tail)
- Read,Glob,Grep(探索记忆和 transcript)
- Edit,Write(只允许写入 memoryRoot 目录)
func BundleKeyFor ¶
BundleKeyFor 是构建 BundleKey 的便捷函数,减少调用方 import agentctx 的需要.
精妙之处(CLEVER): 封装 agentctx.BundleKey 的构造,让 HTTP handler 等消费层 无需显式 import flyto-agent/pkg/context 包--只需 import engine 包即可. 减少消费层的直接依赖深度,降低未来包路径重构的影响范围.
func CleanupWorktree ¶
CleanupWorktree 清理一个 worktree:移除 worktree 引用,删除目录,删除临时分支. executor 不能为 nil (方案 β 严格 DI).
func CreateWorktree ¶
func CreateWorktree(repoRoot, branchName string, executor execenv.Executor) (*WorktreeInfo, WorktreeCleanup, error)
CreateWorktree 在指定仓库中创建一个新的 git worktree.
参数:
- repoRoot: git 仓库根目录
- branchName: 新分支名称(如果为空,自动生成)
- executor: 子进程启动器 (方案 β 严格 DI, nil 即 panic)
返回 worktree 信息,清理函数和错误. 调用者必须在使用完毕后调用清理函数释放资源.
M1: 用 context.Background() 对齐原 exec.Command 无超时语义, 未来加 ctx 是正交 commit. cleanup closure 捕获 executor 透传给 CleanupWorktree.
func DefaultAuditPath ¶
DefaultAuditPath 返回默认审计日志路径(~/.flyto/audit.jsonl). 与项目配置目录统一,避免散落在文件系统各处.
func DetectGitRepo ¶
DetectGitRepo 检测给定目录是否在 git 仓库中. 返回仓库根目录和是否检测到仓库. executor 不能为 nil.
func FilterSubAgentTools ¶
FilterSubAgentTools 过滤出子 Agent 可以使用的工具列表. 移除 Agent 工具(防止递归)和其他管理工具.
历史包袱(LEGACY): 保留此函数用于向后兼容.新代码应使用 SubAgentConfig.AllowedTools 在 SpawnSubAgent 中通过 canUseTool 运行时拦截,而非在 API 层裁剪工具列表.
被排除的工具:
- Agent: 防止无限递归
func FormatErrorForDisplay ¶
FormatErrorForDisplay 将 EngineError 格式化为适合展示给用户的文本.
输出格式:
错误: <人类可读消息> 建议: <恢复建议>
如果有 Detail,在 Verbose 模式下追加技术细节.
func FormatReminder ¶
FormatReminder 将文本格式化为 <system-reminder> 包裹的消息.
func FormatSkillsPrompt ¶
FormatSkillsPrompt 将多个技能格式化为一段系统提示词. maxChars 控制最大字节数(0 = 不限制).
升华改进(ELEVATED): 早期实现 用 1% 上下文 token 预算动态截断. 我们用固定字节上限(默认 8000),更简单且与 Token 计算解耦. 替代方案:<传入 tokenBudget 参数> - P1 改进,当前 P0 用固定值.
func GetBillingTokens ¶
func GetBillingTokens(usage *query.Usage, pricing *config.ModelConfig) float64
GetBillingTokens 计算计费 token(考虑 cache 折扣/溢价). 用于:成本报告.
计费规则:
- input_tokens: 按原价计费
- output_tokens: 按原价计费
- cache_read: 按 10% 计费(缓存命中的折扣)
- cache_creation: 按 125% 计费(首次写入缓存的溢价)
升华改进(ELEVATED): 用 ModelConfig 的实际价格计算,而非硬编码比例. 每个模型的 cache 定价不同(Opus cache_read=$1.5 vs Haiku cache_read=$0.08). 替代方案:硬编码 10%/125% 比例(忽略模型差异,在 Opus 和 Haiku 之间误差显著).
func GetFinalContextTokens ¶
GetFinalContextTokens 计算 task_budget 用的 token. 用于:和服务端对齐的预算倒计时. = input_tokens + output_tokens(不含 cache,和服务端公式一致)
精妙之处(CLEVER): 服务端按 input_tokens + output_tokens 扣预算,不算 cache. 如果我们用含 cache 的值做倒计时,预算会比服务端快耗尽,导致提前终止. 保持和服务端公式一致,避免 client/server 对不上.
func GetTokenCountFromUsage ¶
GetTokenCountFromUsage 计算完整上下文窗口 token(含 cache). 用于:自动压缩阈值检查. = input_tokens + cache_creation + cache_read + output_tokens
历史包袱(LEGACY): cache_creation 和 cache_read 都计入 input 的窗口占用, 因为它们实际上就是 input 的一部分--只是分成了"新写入缓存"和"从缓存读取"两种来源. API 返回时 input_tokens 不含 cache 部分,需要加回来才是真正的上下文占用量.
func ListTranscripts ¶
ListTranscripts 列出指定项目目录下的所有会话记录. 返回文件路径列表,按修改时间倒序排列.
func NewEnterPlanModeTool ¶
func NewEnterPlanModeTool(manager *PlanModeManager) tools.Tool
NewEnterPlanModeTool 创建 EnterPlanMode 工具. manager 是计划模式管理器(必须非 nil).
func NewExitPlanModeTool ¶
func NewExitPlanModeTool(manager *PlanModeManager) tools.Tool
NewExitPlanModeTool 创建 ExitPlanMode 工具.
func NormalizeMessagesForAPI ¶
NormalizeMessagesForAPI 规范化消息列表,确保可以安全发送给 Messages API.
历史包袱(LEGACY): 保持原有函数签名,向后兼容. 内部委托给 DefaultNormalizePipeline().Run(). 新代码应直接使用 NormalizePipeline 以获得更多控制.
func ParseContextError ¶
ParseContextError 从 context-too-long 错误消息中提取 (actual, max) token 数.
actual = 本次请求实际 token 数(> max) max = provider 申报的上下文窗口上限 无法解析时返回 (0, 0)--调用方应退化到使用静态默认值.
func PickWarningCode ¶
func PickWarningCode(state *TokenWarningState) string
PickWarningCode returns the most-severe WarningEvent code warranted by the state, or empty when no event should fire. Callers emit the event with this code and a code-specific message. Severity-first order ensures a blocking state surfaces as "blocked" rather than being swallowed by the weaker "critical" branch — the dead-field audit on IsAtBlockingLimit surfaced the original bug where the else-if chain tested weakest first.
PickWarningCode 返回 state 对应最严重的 WarningEvent code, 空串表示无事件. 调用方据此 code 发 WarningEvent 并填具体消息. 严重优先顺序 (blocking > critical > warning) 确保 blocking 状态真正以 "blocked" code 暴露, 而不是 被较弱的 "critical" 分支吞掉 — 原 else-if 链从最弱判起正是 IsAtBlockingLimit 成 dead 字段的根因.
func ReadLockMtime ¶
ReadLockMtime 读取锁文件的 mtime,用作 lastConsolidatedAt.
精妙之处(CLEVER): mtime 即状态--不需要单独的 JSON 字段记录"上次巩固时间". TryAcquire 成功时会将 mtime 更新为 now,这就是"巩固开始时间". 进程 crash 后,mtime 仍然保留,下次启动能正确判断"24h 内是否已巩固过". 对比:dream_state.json 只有在 saveStateLocked 成功后才更新, crash-between-dream-and-save 会导致下次启动重复触发.
返回零值 time.Time 表示锁文件不存在(从未巩固过,触发首次 Dream).
func RegisterBuiltinAgents ¶
func RegisterBuiltinAgents(r *AgentRegistry)
RegisterBuiltinAgents 注册 4 种内置 Agent 类型到注册表.
内置类型设计原则:
- general-purpose: 全能型,不限制工具(基准线)
- Explore: 只读探索,禁止写操作(安全探索代码库)
- Plan: 只读规划,禁止执行(只允许思考,不允许操作)
- Verification: 验证型,禁止写操作,默认后台(不干扰主流程)
升华改进(ELEVATED): 内置类型是"工厂默认配置"-- 用户/SDK 可以 Register 同名定义覆盖,实现企业级定制. 替代方案:内置类型硬编码为常量(无法覆盖,不支持定制).
func ResumeConversation ¶
func ResumeConversation(ctx context.Context, eng *Engine, store SnapshotStore, convID string) (<-chan Event, error)
ResumeConversation 从快照恢复并继续执行 Agent 对话.
流程:
- 从 store 加载快照(若不存在,返回 ErrSnapshotNotFound)
- 用 maybeInjectResumeSentinel 为断点注入恢复哨兵(处理 tool_result 末尾的情况)
- 调用 engine.Run(ctx, "", WithMessages(msgs))
- 收集 channel 事件,返回 RunResult 摘要
- 执行成功后删除快照(避免重复恢复)
注意:第 5 步"成功后删除"是最佳努力(delete 失败不返回错误), 避免让恢复成功但 delete 失败的情况污染用户体验. 若下次再次 Resume 同一 convID,会重新恢复(幂等)-- 这是期望行为:宁可重复一次恢复也不因 delete 失败丢失进度.
升华改进(ELEVATED): ResumeConversation 是引擎公开 API,消费层直接调用-- 无需了解 WithMessages / maybeInjectResumeSentinel 的内部实现细节. 替代方案:<让消费层自己 Load → WithMessages → Run>
- 否决:哨兵注入逻辑 (maybeInjectResumeSentinel) 是引擎内部知识, 不应泄漏到消费层.每个消费层重新实现会导致不一致.
func SaveTranscript ¶
func SaveTranscript(path string, sessionID string, model string, messages []query.Message, stats TranscriptStats) error
SaveTranscript 将会话消息保存为 JSON 文件.
参数:
- path: 保存路径(如 ~/.flyto/projects/xxx/transcript.json)
- sessionID: 会话 ID
- model: 使用的模型
- messages: 消息历史
- stats: 统计信息
如果文件已存在,会被覆盖. 会自动创建父目录.
func SendPlanCommand ¶
func SendPlanCommand(ctx context.Context, sockPath string, req planCmdRequest) (*planCmdResponse, error)
SendPlanCommand 是连接到 PlanCommandServer 并发送单条命令的辅助函数.
升华改进(ELEVATED): 将 "建连→写请求→读响应→断开" 封装为一步, CLI 脚本和测试可以直接调用,无需手动管理连接生命周期. 替代方案:让调用方自己 net.Dial + json.Encode + json.Decode(模板代码过多).
func ShouldRetryOverloaded ¶
func ShouldRetryOverloaded(source QuerySource) bool
ShouldRetryOverloaded 判断给定来源在遇到 529 Overloaded 时是否应该重试. 只有前台请求才重试 529,后台请求直接失败.
func ToolsToAllowedMap ¶
ToolsToAllowedMap 将工具列表转换为 AllowedTools map. 用于将旧版 []tools.Tool 参数转换为新版 map[string]bool 参数.
func TranscriptPath ¶
TranscriptPath 生成默认的会话记录路径. 格式:~/.flyto/projects/<project-hash>/transcripts/<session-id>.json
参数:
- cwd: 当前工作目录(用于生成项目 hash)
- sessionID: 会话 ID
func UpdateTranscript ¶
func UpdateTranscript(path string, sessionID string, model string, messages []query.Message, stats TranscriptStats) error
UpdateTranscript 更新已有的会话记录文件. 如果文件已存在,保留 CreatedAt,更新其他字段.
func WithEventEmitter ¶
func WithEventEmitter(ctx context.Context, emit EventEmitter) context.Context
WithEventEmitter 派生一个携带 emitter 的 context. nil emitter 视为 "未设置" — 直接返回原 ctx 不污染调用链.
WithEventEmitter derives a context carrying emit. A nil emit is treated as "unset" — the original ctx is returned unchanged.
Types ¶
type ActivityReason ¶
type ActivityReason string
ActivityReason 标识活动的原因/类型.
const ( ActivityAPICall ActivityReason = "api_call" ActivityToolExec ActivityReason = "tool_exec" ActivitySubAgent ActivityReason = "subagent" ActivityDream ActivityReason = "dream" ActivityMemoryExtraction ActivityReason = "memory_extraction" )
type ActivityTracker ¶
type ActivityTracker struct {
// contains filtered or unexported fields
}
ActivityTracker 追踪 Engine 的活动状态.
精妙之处(CLEVER): 引用计数模式--每个"工作单元"(API 调用/工具执行/SubAgent) Start 时 +1,Stop 时 -1.refcount>0 = busy,refcount==0 = idle. 比时间戳判断更准确--并行工具执行时,任何一个还在跑就是 busy.
线程安全:所有状态通过 sync.Mutex 保护.
func NewActivityTracker ¶
func NewActivityTracker(cfg *ActivityTrackerConfig, observer EventObserver) *ActivityTracker
NewActivityTracker 创建活动追踪器.
func (*ActivityTracker) ActiveReasons ¶
func (t *ActivityTracker) ActiveReasons() map[ActivityReason]int
ActiveReasons 返回当前活跃的原因和计数.
func (*ActivityTracker) BusySince ¶
func (t *ActivityTracker) BusySince() time.Time
BusySince 返回最近一次从 idle→busy 的时间(当前非 busy 时返回零值).
func (*ActivityTracker) Close ¶
func (t *ActivityTracker) Close()
Close 停止所有定时器,释放资源. 精妙之处(CLEVER): Close 时记录最终统计-- 如果还有活跃工作,说明 Close 打断了正在进行的操作(可用于排查卡住问题).
func (*ActivityTracker) LastActivity ¶
func (t *ActivityTracker) LastActivity() time.Time
LastActivity 返回最后一次活动时间.
func (*ActivityTracker) Start ¶
func (t *ActivityTracker) Start(reason ActivityReason)
Start 标记一个工作单元开始.
精妙之处(CLEVER): 按 reason 分类计数-- shutdown 时能看到"还有 2 个 api_call 和 1 个 tool_exec 没结束", 比纯 refcount 的"还有 3 个活动"信息量大得多.
func (*ActivityTracker) Stop ¶
func (t *ActivityTracker) Stop(reason ActivityReason)
Stop 标记一个工作单元结束.
type ActivityTrackerConfig ¶
type ActivityTrackerConfig struct {
// HeartbeatInterval 心跳间隔(busy 时定时触发).默认 30s.
HeartbeatInterval time.Duration
// IdleDelay 空闲延迟:refcount 归零后等多久才触发 OnIdle.默认 30s.
// 精妙之处(CLEVER): 不是立刻触发 OnIdle--
// 工具执行完到下一次 API 调用之间有短暂间隙,不应算 idle.
// 只有真正 30s 无活动才算空闲.
IdleDelay time.Duration
// OnBusy refcount 0→1 时触发(Agent 从空闲变为繁忙)
OnBusy func()
// OnIdle refcount 1→0 且延迟到期后触发(Agent 真正空闲)
OnIdle func(idleDuration time.Duration)
// OnHeartbeat busy 时每 HeartbeatInterval 触发一次
OnHeartbeat func(activeReasons map[ActivityReason]int, duration time.Duration)
}
ActivityTrackerConfig 配置活动追踪器.
func DefaultActivityTrackerConfig ¶
func DefaultActivityTrackerConfig() *ActivityTrackerConfig
DefaultActivityTrackerConfig 返回默认配置.
type AgentDefLoader ¶
type AgentDefLoader interface {
// LoadAgentDefs 加载并返回所有 AgentDefinition.
// 如果没有可加载的定义,返回空切片(不是错误).
LoadAgentDefs() ([]*AgentDefinition, error)
}
AgentDefLoader 是从外部来源加载 AgentDefinition 的接口.
升华改进(ELEVATED): 早期实现只有函数,无接口抽象. 我们定义接口--测试中可注入 stub(返回固定列表),生产中用 FileAgentDefLoader. 跨行业扩展:
- 数据库加载:RemoteAgentDefLoader(从 Neon/MySQL 读取)
- 动态注册:PluginAgentDefLoader(插件提供 AgentDef)
- 内存加载:StaticAgentDefLoader(测试用)
替代方案:<直接暴露 ScanAgentDefsDir 函数> - 否决:无法注入替代实现,测试困难.
type AgentDefinition ¶
type AgentDefinition struct {
// AgentType 唯一标识符,如 "Explore","Plan"
AgentType string
// Description 一句话描述,展示给用户/模型
Description string
// WhenToUse 何时使用的指引文案,辅助 AI 做类型选择
WhenToUse string
// AllowedTools 允许使用的工具白名单(nil = 继承父代理所有工具).
// MCP 工具(mcp__ 前缀)自动通过此过滤,无需在此列出.
AllowedTools []string
// DisallowedTools 额外禁用的工具(叠加在全局禁用之上)
DisallowedTools []string
// Model 使用的模型("" = 继承父代理模型)
Model string
// MaxTurns 最大轮数(0 = 默认 10)
MaxTurns int
// Background 是否默认后台运行.
// 当 Background=true 且 BackgroundAllowedTools 非空时,
// 在 AllowedTools 基础上额外收窄到 BackgroundAllowedTools.
Background bool
// BackgroundAllowedTools 后台运行时的附加工具白名单.
//
// 升华改进(ELEVATED): 早期实现 用全局常量 ASYNC_AGENT_ALLOWED_TOOLS(约16个工具),
// 所有 isAsync=true 的 agent 都受此约束,无法针对不同 agent 类型定制.
// 我们改为 per-agent 配置--每种 Agent 类型声明自己的后台工具集,
// 仓储场景可以把 mcp__wms__* 加进去,而默认 Verification agent 只需文件读工具.
// MCP 工具(mcp__ 前缀)仍然自动通过此过滤.
// nil = Background 模式不附加额外限制(仍受 AllowedTools 约束).
// 替代方案:<全局 ASYNC_AGENT_ALLOWED_TOOLS 常量> - 否决:扩展性差,
// 增加新 agent 类型时必须修改全局常量.
BackgroundAllowedTools []string
// AllowedSubAgentTypes 限制此 Agent 可以 spawn 的子 Agent 类型.
//
// 升华改进(ELEVATED): 早期实现 resolveAgentTools() 第191-195行用 allowedAgentTypes
// 元数据约束 Agent 工具只能 spawn 特定类型,但通过字符串解析("Agent(Explore)")实现.
// 我们改为结构化字段--类型安全,易于序列化,不需要解析器.
// 跨行业影响:Explore 类型只能 spawn Plan/Verification 子 agent(禁止 general-purpose),
// 防止只读 agent 通过 spawn 全能 agent 来绕过工具限制.
// nil/空 = 无限制,可 spawn 任何已注册的 AgentType.
// 替代方案:<字符串语法 Agent(Explore, Plan)> - 否决:需要解析器,容易写错.
AllowedSubAgentTypes []string
}
AgentDefinition 定义一种 Agent 类型的行为约束和资源配置.
升华改进(ELEVATED): 不只是"工具白名单",而是完整的 Agent 能力档案. 包含使用指引(WhenToUse),让 AI 能自主选择合适的 Agent 类型. 跨行业扩展:金融场景可定义 "Compliance" 类型(只允许查询工具),
医疗场景可定义 "Diagnosis" 类型(禁用所有写操作).
func LoadAgentDefFile ¶
func LoadAgentDefFile(path string) (*AgentDefinition, error)
LoadAgentDefFile 从单个文件加载并解析 AgentDefinition.
文件格式:YAML frontmatter(---...---)+ 可选正文(作为 WhenToUse 的补充描述). frontmatter 缺失时返回空 AgentDefinition(AgentType 为空,调用方需补充).
func ScanAgentDefsDir ¶
func ScanAgentDefsDir(dir string) ([]*AgentDefinition, error)
ScanAgentDefsDir 扫描目录下所有 Agent 定义文件并解析返回.
支持两种格式(与 ScanSkillsDir 对称):
- 子目录格式:dir/agent-name/AGENT.md
- 扁平格式:dir/agent-name.md
同名时子目录格式优先(新格式覆盖旧格式). 目录不存在时返回空切片(不是错误).
精妙之处(CLEVER): 使用 defMap 去重--同一 agent_type 只保留最新加载的定义. 子目录格式优先:扁平 .md 先加入,子目录 AGENT.md 后覆盖(两遍遍历). 替代方案:单遍遍历 + 跳过已存在 key(顺序依赖,不直观).
type AgentRegistry ¶
type AgentRegistry struct {
// contains filtered or unexported fields
}
AgentRegistry 是 Agent 类型注册表.
线程安全:所有操作通过 sync.RWMutex 保护. 升华改进(ELEVATED): 注册表是全局知识库,工具集解析是运行时决策-- 两个关注点在同一结构体内,但通过 resolveAgentToolset 明确分离. 替代方案:注册表只存储定义,工具集解析独立为包级函数(更分散,不利于封装 globalDisallowed).
func NewAgentRegistry ¶
func NewAgentRegistry() *AgentRegistry
NewAgentRegistry 创建 Agent 类型注册表.
默认使用 DefaultGlobalDisallowedTools 初始化全局禁用集合-- 包含递归防护,计划模式保护,用户交互保护等安全工具集.
升华改进(ELEVATED): 早期实现 ALL_AGENT_DISALLOWED_TOOLS 硬编码在 filterToolsForAgent() 内, 无法被 SDK 消费层覆盖.我们通过 DefaultGlobalDisallowedTools 变量 + SetGlobalDisallowed() 实现"默认安全 + 可覆盖":
- 测试环境:SetGlobalDisallowed([]string{}) 清空,完全开放
- 生产强化:SetGlobalDisallowed(append(DefaultGlobalDisallowedTools, "MyPrivateTool"))
替代方案:<硬编码 map 字面量> - 否决:无法表达 DefaultGlobalDisallowedTools 的来源意图.
func (*AgentRegistry) Get ¶
func (r *AgentRegistry) Get(agentType string) (*AgentDefinition, bool)
Get 按类型名称查找 Agent 定义.
func (*AgentRegistry) List ¶
func (r *AgentRegistry) List() []*AgentDefinition
List 返回所有已注册的 Agent 定义,按 AgentType 字母顺序排序.
精妙之处(CLEVER): 排序确保输出稳定--测试断言,UI 展示都不会因 map 随机遍历而抖动. 替代方案:直接返回 map 值(顺序随机,测试不稳定).
func (*AgentRegistry) Register ¶
func (r *AgentRegistry) Register(def *AgentDefinition) error
Register 注册一个 Agent 类型定义. 重名时覆盖旧定义(允许热更新/插件重载).
func (*AgentRegistry) ResolveToolset ¶
func (r *AgentRegistry) ResolveToolset(def *AgentDefinition, parentTools []string) []string
ResolveToolset 公开版本的 resolveAgentToolset,供外部调用(如 agentExecutor).
线程安全:持有读锁.
func (*AgentRegistry) SetGlobalDisallowed ¶
func (r *AgentRegistry) SetGlobalDisallowed(toolNames []string)
SetGlobalDisallowed 覆盖全局禁用工具列表. SDK 消费层可通过此方法定制安全边界.
升华改进(ELEVATED): "覆盖"而非"追加"--SDK 场景可能需要完全重置禁用列表. 例如:测试环境可传入空切片(完全开放),生产环境可传入更严格的列表. 替代方案:追加模式(无法删除已有禁用项).
type ApprovalPolicy ¶
type ApprovalPolicy interface {
// RequestApproval 请求用户审批计划.阻塞直到用户批准或拒绝.
// 返回:
// - approved: 是否批准
// - editedPlan: 用户编辑后的计划文本(空字符串 = 未编辑,使用早期方案)
// - err: 系统错误(不是用户拒绝)
RequestApproval(ctx context.Context, event PlanApprovalEvent) (approved bool, editedPlan string, err error)
}
ApprovalPolicy 抽象计划审批机制.
升华改进(ELEVATED): 早期实现 ExitPlanMode 直接渲染 React 组件(PermissionRequest) 并等待用户在 TUI 中点击.这完全无法用于 SDK 嵌入或 HTTP API. 我们用 ApprovalPolicy 接口解耦:CLI 实现终端渲染,SDK 注入函数回调, SaaS 实现 WebSocket/webhook 审批,测试用 NoopApprovalPolicy 自动批准. 替代方案:<直接在 ExitPlanMode 工具内渲染 UI> - 否决原因:工具层依赖 UI 框架,无法在 HTTP API 服务中复用.
Shape: synchronous callback. Engine passes a PlanApprovalEvent to the consumer and blocks until the consumer invokes Approve / Reject in the event struct or returns from RequestApproval.
形态: 同步回调. 引擎把 PlanApprovalEvent 交给消费者实现, 阻塞直到消费者 调用 event 里的 Approve / Reject 或 RequestApproval 返回.
type AttachmentReorderer ¶
type AttachmentReorderer struct{}
AttachmentReorderer 将附件消息上浮到同一 user turn 的 tool_result 前面.
精妙之处(CLEVER): 只在同角色连续消息块内重排,不跨角色边界. 这保证了 user/assistant 交替的基本约束不被破坏. 重排只影响 user 消息中 attachment 和 tool_result 的相对顺序.
func (*AttachmentReorderer) Name ¶
func (r *AttachmentReorderer) Name() string
func (*AttachmentReorderer) Normalize ¶
func (r *AttachmentReorderer) Normalize(messages []query.Message) []query.Message
func (*AttachmentReorderer) Priority ¶
func (r *AttachmentReorderer) Priority() int
type AuditObserver ¶
type AuditObserver struct {
// contains filtered or unexported fields
}
AuditObserver 实现 EventObserver,将引擎事件翻译为审计记录. 通过 NewCompositeObserver(engineObserver, auditObserver) 插入事件流.
func NewAuditObserver ¶
func NewAuditObserver(sink security.AuditSink, sessionID string) *AuditObserver
NewAuditObserver 创建 AuditObserver. sink 是审计落地后端;sessionID 可以为空(单用户 CLI 场景通常不需要).
构造后通常立即调用 SetToolRegistry 注入工具注册表, 启用工具自描述的 AuditOperation 查询 (L1191 修复).未调用 SetToolRegistry 时回退到 硬编码启发式, 行为等同重构前, 零回归.
func (*AuditObserver) Error ¶
func (a *AuditObserver) Error(err error, ctx map[string]any)
Error 接收错误事件,记录到审计日志(outcome=error).
func (*AuditObserver) Event ¶
func (a *AuditObserver) Event(name string, data map[string]any)
Event 接收引擎事件,转换为审计记录.
精妙之处(CLEVER): 用 switch 而非 if-else chain-- 随着事件类型增加,switch 可读性优于连续 if,且 Go 编译器对 switch 有优化. 未匹配的事件静默忽略,不影响其他 Observer 的处理.
func (*AuditObserver) SetInputAudit ¶
func (a *AuditObserver) SetInputAudit(include bool, maxBytes int)
SetInputAudit 配置工具 Input 审计策略 (L1223 修复).
include: 是否把工具 Input 写入 Extra["tool_input"].默认 false. maxBytes: 写入上限 (<=0 不截断).超限会截断并加 Extra["tool_input_truncated"]="true".
前置条件: 调用方 (engine.go) 负责在 OperationEntry.Input 进入 OperationLog 前 完成 SecretStore.Redact 脱敏, 与 Output 脱敏走同一路径 (engine.go:3769 precedent). AuditObserver 不重复脱敏, 避免跨包依赖 SecretStore.
替代方案 A: <AuditObserver 持有 SecretStore 引用自行脱敏> - 否决: 与 Output 现有 "入口统一脱敏" 原则冲突, 也让 AuditObserver 跨越单一职责. 替代方案 B: <加 InputRedactor 接口允许多实现叠加> - 否决: SecretStore.Redact 已 覆盖 99% 实际凭据 (value 匹配), 叠 SecretGuard regex 兜底边际收益太低.
线程安全: 与 SetToolRegistry 同约定, 仅在 engine 初始化阶段调用一次.
func (*AuditObserver) SetToolRegistry ¶
func (a *AuditObserver) SetToolRegistry(r *tools.Registry)
SetToolRegistry 后置注入工具注册表, 启用工具自描述的 AuditOperation 查询.
升华改进(ELEVATED): P1 L1191 修复 - 原 operationFromTool 是硬编码 switch, 违反开闭原则, 消费者加新工具 (DatabaseWrite / SlackSend 等) 无法正确生成 审计 operation 标签.改为运行时查询 Registry 的 Metadata.AuditOperation, 消费者在工具注册时声明一次, 审计系统零侵入扩展.
设计选择: 后置注入而非构造参数, 为了保持 NewAuditObserver 签名不变 (避免破坏现有 8 个 call site + doc 例子), 同时让 engine.go 初始化时 在构造后调用一次 SetToolRegistry 完成注入.
替代方案 A: <改 NewAuditObserver 为 3 参数> - 否决: 破坏现有 API. 替代方案 B: <functional options (WithToolRegistry)> - 否决: 过度设计,
单个可选依赖不值得引入 option 类型.
线程安全: 仅在 engine 初始化阶段调用一次, 后续只读.如果未来有运行时 动态替换 registry 的需求, 需要加锁 (但目前没有此需求).
type BufferedObserver ¶
type BufferedObserver struct {
// contains filtered or unexported fields
}
BufferedObserver 缓冲 observer(异步批量发送,不阻塞热路径).
升华改进(ELEVATED): 热路径上的 Observer 调用必须不阻塞. 如果 inner observer 是网络发送(DataDog API,Kafka,gRPC), 同步调用会拖慢每次 API 调用和工具执行. BufferedObserver 用 channel 缓冲,后台 goroutine 批量发送. 替代方案:每次都 go func() 发送(goroutine 爆炸,无背压控制).
func NewBufferedObserver ¶
func NewBufferedObserver(inner EventObserver, batchSize int, interval time.Duration, bufferSize int) *BufferedObserver
NewBufferedObserver 创建缓冲 observer. batchSize: 批量大小(0 默认 100) interval: 刷新间隔(0 默认 1s) bufferSize: channel 缓冲区大小(0 默认 1000)
func (*BufferedObserver) Close ¶
func (b *BufferedObserver) Close()
Close 关闭缓冲 observer,等待所有缓冲事件刷新完毕.
type CacheStats ¶
type CacheStats struct {
Entries int // 当前缓存条目数
MaxSize int // 最大缓存条目数
Hits int // 缓存命中次数(Get 命中)
Misses int // 缓存未命中次数(Get 未命中)
Evictions int // 淘汰次数
}
CacheStats 是缓存命中率和容量统计的值快照.
**消费形态 (调取 pull)**: 由 `FileStateCache.Stats() CacheStats` 返回, 消费者 (CLI 诊断命令 / SaaS 监控面板 / 测试 harness / 外部 SDK) 主动调 读字段. 和 `Session.Stats()` / `DenialTracker.Stats()` 同构 -- scanner 视野内无内部 reader 是正常的: 统计快照本就是暴露给**消费层**用的, 不 该强加内部 reader 来"过扫描器". 见 docs/api-reference.md "API 消费形态" 章节的 pull 形态清单.
字段解读:
- Entries: 当前缓存条目数, 判断缓存占用率 (Entries/MaxSize).
- MaxSize: 配置上限, 容量规划输入.
- Hits/Misses: 命中率, `HitRate()` 方法的加工结果也由此推导.
- Evictions: 淘汰次数, 过高说明 MaxSize 偏小或工作集过大.
CacheStats is a value snapshot of cache hit-rate and capacity stats.
Consumption shape (pull): returned by FileStateCache.Stats(); consumers (CLI diagnostic commands / SaaS dashboards / test harness / external SDK) actively read the fields. Structurally identical to Session.Stats() / DenialTracker.Stats() -- the absence of an internal reader within the scanner's view is expected. Stats snapshots are meant for the consumption layer; forcing an internal reader just to satisfy a linter would be dishonest. See docs/api-reference.md "API consumption shapes" for the pull-shape catalogue.
func (CacheStats) HitRate ¶
func (s CacheStats) HitRate() float64
HitRate 返回缓存命中率(0.0 ~ 1.0). 如果没有任何请求,返回 0.
type CheckpointEvent ¶
type CheckpointEvent = flyto.CheckpointEvent
type CheckpointHandlerFn ¶
type CheckpointHandlerFn func(evt CheckpointEvent) bool
CheckpointHandlerFn 是 CheckpointEvent 的处理回调.
引擎在执行声明了 RequiresCheckpoint=true 的工具前同步调用此函数. 返回 true = 允许执行;返回 false = 拒绝执行(工具调用中止,模型收到拒绝消息).
实现约定:
- 必须线程安全(可能被并发工具调用触发)
- 应快速返回(阻塞会影响 runLoop 响应性)
- panic 会被引擎 recover,视为返回 false(deny-safe)
默认行为(未注册时):拒绝(false). deny-safe 原则:宁可误拒绝一个操作,也不可在用户不知情的情况下执行不可逆操作.
type CheckpointSuggestedEvent ¶
type CheckpointSuggestedEvent = flyto.CheckpointSuggestedEvent
CheckpointSuggestedEvent 引擎静态分析高风险操作时推送的建议事件(不阻塞执行). 详见 flyto.CheckpointSuggestedEvent 文档.
type CompactEvent ¶
type CompactEvent = flyto.CompactEvent
type Complexity ¶
type Complexity string
Complexity 表示步骤的预估复杂度.
const ( ComplexityLow Complexity = "low" ComplexityMedium Complexity = "medium" ComplexityHigh Complexity = "high" )
type CompositeObserver ¶
type CompositeObserver struct {
// contains filtered or unexported fields
}
CompositeObserver 多 observer 叠加.
升华改进(ELEVATED): 同时发到 DataDog + 审计日志 + stderr. 生产环境通常需要多路输出:实时告警走 PagerDuty,指标走 Prometheus, 审计走合规日志,调试走 stderr.CompositeObserver 让它们各司其职. 替代方案:让每个 Observer 内部做多路分发(每个实现都要做,重复劳动).
func NewCompositeObserver ¶
func NewCompositeObserver(observers ...EventObserver) *CompositeObserver
NewCompositeObserver 创建多路复合 Observer.
func (*CompositeObserver) Error ¶
func (c *CompositeObserver) Error(err error, context map[string]any)
func (*CompositeObserver) Event ¶
func (c *CompositeObserver) Event(name string, data map[string]any)
func (*CompositeObserver) Metric ¶
func (c *CompositeObserver) Metric(name string, value float64, tags map[string]string)
Metric 转发指标到所有实现了 MetricObserver 的子 observer.
精妙之处(CLEVER): 只有实现了 MetricObserver 的子 observer 才会收到指标, 其他的静默跳过.这样 CompositeObserver 可以混合不同能力的 observer.
func (*CompositeObserver) Observers ¶
func (c *CompositeObserver) Observers() []EventObserver
Observers 返回内部 observer 列表(用于 Close 时遍历刷新 BufferedObserver).
func (*CompositeObserver) SpanEnd ¶
func (c *CompositeObserver) SpanEnd(spanID string, err error)
SpanEnd 转发到所有实现了 TraceObserver 的子 observer.
type Config ¶
type Config struct {
// Model 指定默认使用的模型 ID(向后兼容,设置 RoleMain)
Model string
// Models 模型注册表(如果为 nil,使用默认)
// 通过角色系统管理不同用途的模型选择
Models *config.ModelRegistry
// Cwd 是工作目录,工具执行的根目录
Cwd string
// Executor 是子进程启动器, 必填. engine.New 校验 nil 返回 error.
//
// 本地模式: 消费方传 execenv.DefaultExecutor{} (零开销包装 os/exec).
// 云端模式: platform 层传 sandbox.Backend{} 实现 microVM 隔离.
//
// 方案 β 严格 DI: 引擎对执行环境零假设, 所有 exec 子进程走此抽象.
// 详见 platform/common/internal/sandbox/DESIGN.md section 7 + 7.2.
//
// 2026-04-15 M1 commit 4b-1 引入.
Executor execenv.Executor
// Tools 指定启用的工具列表,为空则启用所有内置工具
Tools []string
// MaxTurns 限制单次 Run 的最大对话轮次,0 表示无限制
MaxTurns int
// MaxBudgetUSD 限制单次 Run 的最大花费(美元),0 表示无限制
MaxBudgetUSD float64
// PermissionMode 权限模式:default / accept_edits / bypass / plan
PermissionMode permission.Mode
// PermissionHandler 自定义权限处理器,消费层实现
// 当工具需要用户批准时调用此函数
// 如果为 nil,则根据 PermissionMode 自动决策
PermissionHandler permission.Handler
// PermissionTimeout 权限 Handler 调用超时(默认 5 分钟).
// 0 表示使用 permission.DefaultPermissionTimeout.
//
// 升华改进(ELEVATED): 暴露到 Config 而非硬编码在引擎内部--
// 自动化测试场景可以设为 100ms(快速失败),
// 企业审批流程可以设为 30min(等人工审批).
// 替代方案:硬编码 DefaultPermissionTimeout(无法按场景覆盖).
PermissionTimeout time.Duration
// CompactionPolicies 压缩策略列表(叠加为 CompositePolicy).
// 为空时使用 DefaultCodePolicy.
// 如果只有一个策略,不会包装 CompositePolicy,直接使用.
CompactionPolicies []agentctx.CompactionPolicy
// PermissionHandlers 权限处理器列表(叠加为 CompositeHandler).
// 为空时使用 Config.PermissionHandler(向后兼容).
PermissionHandlers []permission.NamedHandler
// SystemPrompt 自定义系统提示词(覆盖默认)
SystemPrompt string
// AppendSystemPrompt 追加到默认系统提示词之后
AppendSystemPrompt string
// Scenario 场景标识符,用于 PromptBundle 选择("programming"/"warehouse"/"medical" 等).
// 空字符串 = "programming"(默认).
// 引擎内置 claude+programming Bundle;其他场景需通过 RegisterPromptBundle 注册.
Scenario string
// ModelFamily 模型族标识符("claude"/"gpt"/"gemini"/"local" 等).
// 空字符串时从 Model 字段推断(前缀匹配 "claude-" → "claude").
// 用于 PromptBundle 选择.
ModelFamily string
// Plugins declares plugin definitions that the engine registers at
// construction time (engine.New) via plugin.Host.Load. Each definition
// is a metadata triple (Name / Description / Source); it does NOT carry
// a filesystem path to a plugin.json, so the registration is a
// metadata-only placeholder -- skills, hooks, tools, and MCP servers
// are NOT parsed from this path. For those, the sibling paths are:
// - engine.LoadPlugin(dir): reads a plugin.json on disk and registers
// all declared skills/hooks/tools/MCPServers. This is the primary
// path for "I have a packaged plugin on the filesystem".
// - plugin.Host.RegisterBuiltin: SDK consumers that want to ship a
// plugin in-binary (builtin) call this before engine.New and the
// host-wide LoadAll picks it up.
// Config.Plugins is the narrow niche of "declaratively announce this
// plugin's existence to the host registry without packaging it" --
// useful for SDK tests, bookkeeping, or consumers that want the host to
// know about a plugin before a later LoadFromDir call materializes it.
//
// Lifecycle: definitions are consumed once in engine.New
// (loadConfigPlugins), before syncPluginHooks / syncPluginTools /
// syncPluginMCPServers so any subsequent plugin operation observes a
// stable registry. The slice is read once and not watched for changes
// after construction.
//
// Failure strategy: an empty Name produces a config_plugin_skipped
// observer event and is discarded. A Host.Load failure (rare -- the
// current implementation only writes to a map under lock) produces a
// config_plugin_load_failed event and is not fatal -- a broken entry
// must not prevent the engine from starting or other entries from
// loading.
//
// Plugins 声明 engine.New 时要往 plugin.Host 注册的插件定义. 每个
// Definition 只是 metadata 三元组 (Name / Description / Source), 不
// 带 plugin.json 的文件系统路径, 因此这条路径只做 metadata 占位登记
// -- skills / hooks / tools / MCPServers 不会从这里解析. 要解析走
// 两条姊妹路径:
// - engine.LoadPlugin(dir): 读磁盘上的 plugin.json 并注册里面声明
// 的 skills / hooks / tools / MCPServers. 文件系统里有打包 plugin
// 时用这条.
// - plugin.Host.RegisterBuiltin: SDK 消费方想把 plugin 内嵌进二进制
// (builtin), 在 engine.New 之前调这个, 随后 host 的 LoadAll 合并
// 进 registry.
// Config.Plugins 是个窄场景: "声明式地让 host registry 知道有这个
// plugin, 但不打包". 用于 SDK 测试、bookkeeping, 或消费方希望在后续
// LoadFromDir 真正物化之前 host 就知道 plugin 存在的场合.
//
// 生命周期: engine.New 里 loadConfigPlugins 消费一次, 位置在
// syncPluginHooks / syncPluginTools / syncPluginMCPServers 之前,
// 让随后任何 plugin 操作观察到一份稳定的 registry. slice 读一次,
// 构造后不再 watch 变化.
//
// 失败策略: 空 Name 触发 config_plugin_skipped 事件并丢弃. Host.Load
// 失败 (罕见, 当前实现仅是带锁 map 写入) 触发 config_plugin_load_failed
// 事件且不 fatal -- 坏 entry 不能挡引擎启动或其他 entry 加载.
Plugins []plugin.Definition
// MCPServers declares MCP servers that the engine launches at construction
// time (engine.New). Each entry becomes an mcp.Manager client under the
// "config." namespace (see configMCPServerKey); its tools are wrapped as
// mcpProxyTool and registered in the engine's tool registry with names of
// the form "config:<ServerName>/<toolName>".
//
// This is the SDK-main-process path for "I want to plug in an external
// MCP server subprocess without packaging it as a plugin". The two
// sibling paths with different scopes:
// - plugin.Manifest.MCPServers: same subprocess style but declared
// via plugin.json, lifecycle tied to plugin enable/disable.
// - ExtraTools: same-process Go tools (tools.Tool interface) with no
// IPC round-trip, no subprocess, no MCP client -- preferable when
// the tool can be expressed in Go.
//
// Lifecycle: servers start once in engine.New (startConfigMCPServers)
// and stop in engine.Close via mcpMgr.CloseAll. The slice is read once
// and not watched for changes after construction.
//
// Failure strategy: a single server's connect/list-tools failure is an
// observer event, not fatal -- a broken server must not prevent the
// engine from starting or other servers from loading.
//
// MCPServers 声明 engine.New 时要拉起的 MCP server. 每个 entry 在
// mcp.Manager 里以 "config." 前缀建 client (见 configMCPServerKey),
// 发现的 tool 包成 mcpProxyTool 注册到引擎 tool registry, 名字形如
// "config:<ServerName>/<toolName>".
//
// 这是 SDK 主进程侧 "我想接一个 external MCP server subprocess, 但不
// 愿意打成 plugin 包" 的路径. 两条姊妹路径:
// - plugin.Manifest.MCPServers: 同样是 subprocess, 但经 plugin.json
// 声明, 生命周期随 plugin enable/disable.
// - ExtraTools: same-process Go 工具 (tools.Tool 接口), 无 IPC 往返,
// 无 subprocess, 无 MCP client -- 能用 Go 表达的工具首选这条路.
//
// 生命周期: engine.New 里 startConfigMCPServers 拉起一次,
// engine.Close 里 mcpMgr.CloseAll 一并关闭. slice 读一次, 构造后不再
// watch 变化.
//
// 失败策略: 单个 server connect / list-tools 失败是 observer 事件, 不
// fatal -- 坏 server 不能挡引擎启动或其他 server 加载.
MCPServers []config.MCPServerConfig
// HooksConfig Hook 配置
HooksConfig *hooks.Config
// EnableCaching 启用 prompt caching(静态系统提示会被标记为可缓存)
// EnableCaching 启用 prompt caching(静态系统提示会被标记为可缓存)
// 当 Provider==nil 时用于创建默认 AnthropicProvider;
// 当 Provider!=nil 时由 Provider 自己实现(如 anthropic.New(Config{EnableCaching: true})).
EnableCaching bool
// Thinking extended thinking 配置
// 当 Provider==nil 时用于创建默认 AnthropicProvider;
// 当 Provider!=nil 时通过 flyto.Request.NeedsThinking 控制.
Thinking *ThinkingOptions
// Effort 努力级别 ("low"/"medium"/"high"),空字符串表示不设置
// 通过 flyto.Request.Effort 传递给 Provider.
Effort string
// FastMode 启用快速模式(调整 max_tokens 默认值)
// 通过 flyto.Request.FastMode 传递给 Provider.
FastMode bool
// JSONSchema 结构化输出的 JSON Schema(如果设置,约束模型输出)
// 通过 flyto.Request.ResponseFormat 传递给 Provider.
JSONSchema json.RawMessage
// Fallback 模型降级配置(可选).
// 当主模型 API 调用失败时,自动降级到备用模型.
Fallback *FallbackConfig
// MemoryExtractor 记忆提取器(可选).
// 设置后,查询循环结束时会异步 fork SubAgent 执行记忆提取.
MemoryExtractor memory.MemoryExtractor
// Verbose 详细日志模式
Verbose bool
// Observer 可观测性接口(可选).
// 接收引擎运行时的结构化事件,指标和调用链.
// 如果为 nil,使用 NoopObserver(零开销空实现).
//
// 升华改进(ELEVATED): Observer 是结构化事件流,与 Verbose 日志互补.
// Verbose 是给开发者看的文本日志,Observer 是给监控系统消费的数据.
// 替代方案:只用 Verbose + fmt.Fprintf(黑盒,无法被外部系统消费).
Observer EventObserver
// StrictMode 严格模式(可选).
// 安全评估/测试环境开启,让引擎在检测到异常时 panic 而不是静默修复.
// 如果为 nil,所有异常静默修复+记录.
StrictMode *StrictMode
// ActivityTracker 活动追踪器配置(可选).
// 设置后启用心跳/空闲检测.nil 时不追踪.
ActivityConfig *ActivityTrackerConfig
// CloseTimeout 优雅关闭超时(默认 10s).
// Close() 在 cancel context 后等待此时间让 goroutine 退出,
// 然后强制清理剩余资源.
// 仓储 daemon 场景可能需要更长(如 30s).
CloseTimeout time.Duration
// SecretGuard 秘密扫描器(可选).
// FileWrite / FileEdit 工具写入前调用此扫描器,检测 API key,密码等敏感信息.
//
// 升华改进(ELEVATED): 默认行为"默认开,全路径保护"--
// 若调用方不设置此字段,New() 自动注入 DefaultSecretGuard(45条内置规则).
// 若要关闭扫描(测试场景,已知安全的场景),显式传入 security.NoopSecretGuard{}.
// 若要自定义规则(行业特有 key 格式),传入 NewSecretGuardWithRules(...).
//
// 精妙之处(CLEVER): nil 与 NoopSecretGuard 的区别--
// nil 触发默认注入(保护性行为),NoopSecretGuard 是显式关闭(必须有意为之).
// 不可能因为"忘记设置"而静默绕过安全检查.
// 替代方案:<nil = 不扫描> - 否决原因:遗漏配置等价于静默关闭安全,风险太高.
SecretGuard security.SecretGuard
// AuditSink 审计落地后端(可选).
// 设置后,engine.New() 自动创建 AuditObserver 并与 Config.Observer 叠加,
// 将 operation_recorded / secret_scan_blocked 事件翻译为审计条目写入此 sink.
//
// 升华改进(ELEVATED): 审计通过 Observer 模式接入,工具层无感知--
// FileWrite/FileEdit 只需触发事件,AuditObserver 在 Observer 链中接收并写入.
// 零侵入性:不需要修改任何工具代码就能开启/关闭审计.
//
// nil 表示不开启审计.想开启本地审计(JSONL 文件),使用 NewLocalAuditSink().
// 替代方案:<在 OperationLog.Record 内部直接写 AuditSink>
// - 否决原因:将审计落地耦合进操作日志,违反单一职责,且跨包导致循环导入.
AuditSink security.AuditSink
// AuditSessionID 审计会话 ID(可选).
// 注入到每条 AuditEntry.SessionID 字段,用于多用户/多会话场景区分来源.
// CLI 单用户场景可以为空(audit.jsonl 里会显示为空字符串).
AuditSessionID string
// AuditIncludeToolInput 决定是否把工具调用的 Input 参数写入 AuditEntry.Extra["tool_input"].
//
// 默认 false (早期实现的 beta 也默认关, 要求双 env 才开).
// 开启前提: Input 已由 engine 层经 SecretStore.Redact 脱敏, 所以已注册的 secret 值
// 会替换为 [SECRET:NAME]. 未注册的明文密码/PII 仍会进入审计日志, 调用方自担风险.
//
// 场景: 合规审计 (HIPAA/SOC2) 要求留存完整请求载荷做 forensics.
// 单用户 CLI 场景保持默认关闭更安全.
//
// 替代方案: <引入可插拔 InputRedactor 接口 + SecretGuard regex 兜底> -
// 否决原因: SecretStore.Redact 已覆盖用户注册的所有凭据, 再叠一层 regex 脱敏边际收益
// 很低, 且违反 CLAUDE.md 原则 10 "叠加而非替换" (不应另造 redact 层, 应复用).
AuditIncludeToolInput bool
// AuditInputMaxBytes 限制 Extra["tool_input"] 单条字节数上限, 超出会截断并加
// Extra["tool_input_truncated"]="true". <=0 表示不截断 (不推荐, 单条审计条目可能爆表).
// 仅当 AuditIncludeToolInput=true 时生效. 建议值 4096 (早期方案 truncateContent 默认).
AuditInputMaxBytes int
// FreshnessConfig 记忆新鲜度警告配置(可选).
// nil = 不启用新鲜度功能(UpdateIndex 回落到早期方案 30 天硬编码,CheckMemoryRelevance 不注入警告).
// 启用:传 &memory.FreshnessConfig{GlobalThreshold: 24 * time.Hour}
// 或使用便捷函数:memory.DefaultFreshnessConfig() 然后取地址.
//
// 升华改进(ELEVATED): 早期实现 无任何新鲜度提示,只有 MEMORY.md 里的静态年龄注记.
// 模型无法根据静态注记判断"这条记忆当前对话是否仍然有效".
// 我们在 CheckMemoryRelevance 注入时加 FreshnessNote,让模型在看到记忆内容的
// 同时收到"此记忆已 N 天未更新,请核实"的提醒.
// 替代方案:<在 Entry.Content 里追加警告文本> -
// 否决原因:污染记忆内容本身,影响提取/检索评分.
FreshnessConfig *memory.FreshnessConfig
// MemoryScorer 自定义记忆相关性评分器(可选).
// nil 时走默认 TextScorer (Jaccard 文本相似度).
// 替代算法场景(嵌入向量/BM25)传入自定义实现.
MemoryScorer memory.RelevanceScorer
// MemoryTypeRegistry 自定义记忆类型注册表(可选).
// nil 时用 memory.DefaultTypeRegistry (user/feedback/project/reference 四类型).
// 多租户或行业特化场景可注册自定义类型(如 medical/legal/warehouse).
MemoryTypeRegistry *memory.MemoryTypeRegistry
// MemoryStrictSymlink 启用记忆文件严格符号链接保护(可选).
// false(默认): 检测到符号链接记日志但放行(兼容 NFS/Docker/WSL).
// true: 检测到符号链接拒绝操作(服务端部署/CI 环境推荐).
MemoryStrictSymlink bool
// MemorySyncAdapter 记忆远程同步适配器(可选).
// nil 时不同步(纯本地). 团队协作场景传 memory.NewGitSyncAdapter(...).
MemorySyncAdapter memory.SyncAdapter
// MemorySyncConfig 同步策略配置(仅 MemorySyncAdapter 非 nil 时生效).
MemorySyncConfig memory.SyncConfig
// ElicitationHandler 处理 MCP 服务器发出的用户输入请求(可选).
// 对应 MCP 2025-03-26 规范的 elicitation/create server-to-client 请求.
//
// 升华改进(ELEVATED): 早期实现 无此接口--遇到服务器主动请求时直接挂起.
// 设置后,MCP Manager 会在 Initialize 时将此 handler 注入到每个 Client,
// dispatchLoop 识别到 server-to-client 请求时路由到此 handler 处理.
//
// nil 时自动使用 NoopElicitationHandler(返回 cancel,不阻塞,不 panic).
// 替代方案:<全局变量注册> - 否决:多 Engine 实例下每个引擎可以有不同的 UI 处理器.
ElicitationHandler ElicitationHandler
// EnableUDSInbox 启用 Unix Domain Socket Inbox(默认 false).
// 启用后,引擎创建 UDS 服务端(路径由 resolveSockPath 决定,优先用户私有目录),
// 通过 FLYTO_SESSION_SOCK 环境变量注入到所有工具子进程.
// 工具子进程可向此 socket 发送 JSON 消息(进度,日志等),
// 引擎接收后转为 InboxMessageEvent 推送给观察者.
//
// CLI 模式下建议设为 true;SDK 嵌入/无 shell 环境下可保持 false.
//
// 精妙之处(CLEVER): 默认 false 确保向后兼容--
// 现有 SDK 用户不受影响,不会多出意外的 socket 文件.
// 只有明确需要工具进度报告的场景才开启.
// 替代方案:默认 true(更多功能但有副作用,/tmp 多文件,老用户可能困惑).
EnableUDSInbox bool
// UDSInboxSessionID 是 UDS socket 文件名的 sessionID 部分(可选).
// 空字符串时自动生成(基于时间戳+随机数).
// 显式设置:测试中可固定 socket 路径;多进程场景可通过外部协调分配路径.
//
// 升华改进(ELEVATED): sessionID 与 Engine 生命周期绑定(不是 Session.ID)--
// 一个 Engine 实例有且仅有一个 UDS 路径,与会话数量无关.
// 替代方案:每个 Session 独立 UDS(socket 文件爆炸,管理复杂).
UDSInboxSessionID string
// EnablePlanQueue 启用异步计划队列(默认 false).
// 启用后:
// 1. 引擎在 ~/.flyto/plans/ 创建 FilePlanQueue,daemon 重启时自动 RecoverPending.
// 2. 引擎启动 PlanCommandServer(UDS 路径由 resolvePlanSockPath 决定),
// 通过 FLYTO_PLAN_SOCK 环境变量注入,供外部进程提交/查询/取消计划.
//
// 适用场景:daemon 模式(长期驻留),需要 fire-and-forget 执行超大计划.
// CLI 单次调用模式无需启用(每次调用都是独立进程).
//
// 精妙之处(CLEVER): 与 EnableUDSInbox 分开控制--
// UDSInbox 是工具进度上报(从工具子进程 → 引擎),
// PlanQueue 是计划异步执行(从外部客户端 → daemon 引擎).
// 两者虽然都用 UDS,但方向和语义完全不同,应独立开关.
EnablePlanQueue bool
// PlanQueueDir 是计划状态文件的存储目录(可选).
// 空字符串使用默认路径 ~/.flyto/plans/.
// 显式设置主要用于测试隔离或自定义数据目录.
PlanQueueDir string
// PlanQueueSessionID 是 PlanCommandServer socket 文件名的 sessionID 部分(可选).
// 空字符串时自动生成(基于时间戳).与 UDSInboxSessionID 分开是因为两个服务独立.
PlanQueueSessionID string
//
// 精妙之处(CLEVER): Anthropic Prompt Cache 按内容字节哈希命中--
// 父子 Agent 如果分别渲染系统提示词,哪怕逻辑相同,
// 任何微小差异(时间戳注入,随机 ID,空格)都会导致哈希不同,无法命中同一缓存 slot.
// 父 Engine 将已发送给 API 的字节直接传给 SubAgent,
// SubAgent 用完全相同的字节发送请求,100% 命中缓存.
//
// 设置规则:
// nil = 不共享(SubAgent 独立渲染,向后兼容旧行为).
// 非nil = 优先使用此字节,跳过 buildSystemPromptWithContext 渲染.
//
// 注意:此字段由引擎内部(SpawnSubAgent)设置,SDK 用户通常不需要手动填写.
// 替代方案:<每次都独立渲染>
// - 否决:父子 Agent 分别渲染导致缓存 miss,每个 SubAgent 首轮都要付完整系统提示费用.
SharedSystemPromptBytes []byte
// IsOrchestrator 标记此 Engine 是否运行在协调器(Orchestrator)模式.
//
// 升华改进(ELEVATED): 协调器模式是"多 Agent 拓扑"的中间层角色--
// 它不直接完成叶子任务,而是分解任务,分派 SubAgent,合并结果.
// 开启此标志后,系统提示词会注入协调器专用指导段落,
// 告知模型如何并行分派任务,如何检查一致性,如何处理子代理失败.
//
// 精妙之处(CLEVER): bool 标志而非字符串角色--
// IsOrchestrator=true 精确表达"我是协调器",消除歧义.
// 如果将来需要更多角色("peer"/"leaf"),可扩展为 AgentRole string,
// 当前阶段 bool 足够且最简单.
// 替代方案:<在 SystemPrompt 里手动追加协调器指导文字>
// - 否决:手动追加无法被 SectionRegistry 缓存,每轮都重算;
// 且散落在调用方代码里难以统一维护.
IsOrchestrator bool
// ScratchpadDir 指定文件持久化 Scratchpad 的目录路径(模块 18.3).
//
// 升华改进(ELEVATED): 早期实现 无持久化 Scratchpad--所有暂存数据绑定单个进程.
// 我们通过此字段启用跨进程共享(FileScratchpad 替换 in-memory Scratchpad):
// - ScratchpadDir=""(默认):使用 in-memory Scratchpad,生命周期绑定 Engine
// - ScratchpadDir 非空:使用 FileScratchpad,数据持久化到目录,支持跨进程共享
// 跨行业应用:
// 仓储调度:多个 picking-robot Worker 进程共享"已分配库位"暂存数据
// 金融对账:并发 reconcile Worker 共享"已处理批次 ID 集合"
// 替代方案:<Redis/共享内存> - 否决:破坏零外部依赖;文件系统已满足最终一致性需求.
ScratchpadDir string
// ExtraTools 是额外注册的工具(通过 --tools-schema 或 SDK 注入).
//
// 升华改进(ELEVATED): 叠加而非替换--ExtraTools 在内置工具注册完成后追加,
// 支持"内置工具 + 外部工具"并存.SDK 用户可通过此字段注入任何实现了 tools.Tool 接口的工具,
// 无需 fork 引擎源码.
//
// 跨行业扩展:
// ExecTool(外部进程工具)是最常见的注入方式--将任意 CLI 程序包装为 Agent 工具.
// 也可以注入纯 Go 实现的定制工具(如特定行业 SDK 封装).
//
// 与 cfg.Tools 的交互:
// cfg.Tools 非空时会过滤内置工具,但 ExtraTools 不受此过滤--
// 调用方既然显式注入了,就一定是想启用的.
//
// 替代方案:<通过 Plugin 系统注入> - Plugin 机制更重(需要实现 plugin.Definition 接口 + 注册),
// ExtraTools 对简单场景(注入少量工具)代价更低.
ExtraTools []tools.Tool
// Provider 指定模型提供者(可选).
//
// 升华改进(ELEVATED): 当 Provider 非 nil 时,引擎通过 flyto.ModelProvider 接口
// 而非 internal/api/client.go 直接调用 API--
// - 支持任何实现 flyto.ModelProvider 的 provider:Gemini/OpenAI/Ollama/本地模型等
// - provider 自己处理鉴权,SSE 解析,思考模式等细节,引擎与 API 格式解耦
// - nil 时回退到 internal/api 路径(保持对 Anthropic 原生 API 的完整向后兼容)
//
// 使用示例:
//
// cfg := &engine.Config{
// Provider: gemini.New(gemini.Config{APIKey: os.Getenv("GEMINI_API_KEY")}),
// }
//
// 注意:Provider 路径目前不支持 Anthropic 专有功能(cache_control 分块,ExtendedThinking 配置).
// 需要这些功能时,使用 nil(默认 api.Client 路径)+ Anthropic provider 的对应配置.
// 替代方案:<强制所有 provider 都走 flyto.ModelProvider 路径> -
// 否决:Anthropic 路径有独特的 beta headers,cache_control 分块等,
// 需要 api.Client 特定功能,无法通过 flyto.Request 表达.
Provider flyto.ModelProvider
// AgentName 是此 Engine 在 Agent Teams 通讯中的名称 (peer-to-peer 消息的 from 字段).
// 空字符串时 = 不参与 Teams 通讯 (独立运行 / Leader 单机模式).
//
// 升华改进(ELEVATED): AgentName 解耦"Engine 实例"与"Team 成员身份"--
// 同一个 Engine 实例在不同 Team 中可用不同名字 (通过 Config 新建 Engine),
// Team 内成员名字由消费层决定 (code-reviewer / analyst / diagnoser 等跨行业语义).
//
// 跨行业扩展: 金融场景 "risk-analyst-1", 医疗场景 "triage-doctor", 仓储场景 "wave-planner"
// 都是合法值, 引擎不解释此字段的含义, 只作为消息路由 key.
//
// 替代方案: <把 name 硬绑到 session_id> - 否决: 一个 session 可能涉及多个 agent 身份.
AgentName string
// IncomingInbox 是 Agent Teams peer-to-peer 通讯的收件入口.
// nil = 不参与 Teams 通讯 (Engine 仍可正常运行, 只是无同伴消息).
//
// 升华改进(ELEVATED): 接口而非具体实现--
// MemoryInbox 用于单进程 dogfood / 测试, 跨行业客户可注入自己的 Inbox 实现
// (医疗合规加密存储 / 金融审计表 / SaaS 中间件桥接等).
//
// 消息由 Engine.runLoop 每轮开始时 Poll (非阻塞), 包装成 <teammate-message> XML
// 注入到对话流, 模型下一轮可自然读到同伴消息.
//
// 设计权衡: Poll 而非 goroutine + Recv 阻塞--
// Recv 需要独立 goroutine 和 channel 桥接到 runLoop, 复杂度高且死锁面大;
// Poll 和 runLoop 同步, 无额外 goroutine, 延迟上限 = 一轮对话时间 (可接受).
//
// 替代方案: <每条消息起独立 goroutine enqueueTeamNotification> -
// 否决: 额外协程和锁, 对 MVP 场景不必要.
IncomingInbox inbox.Inbox
}
Config 是引擎的配置. 统一收敛所有可配置项到一个结构体.
func (*Config) ModelForRole ¶
ModelForRole 获取指定角色对应的模型 ID. 如果 Models 未初始化,使用默认注册表. 所有角色 fallback 到 c.Model(主模型),不再区分 RoleMain 特殊处理. 历史包袱(LEGACY): 早期方案对 RoleMain 有特殊分支--现在统一 fallback 到 c.Model, 这意味着未配置 fast/thinking 角色时,它们默认使用主模型而非空字符串.
func (*Config) ModelRegistry ¶
func (c *Config) ModelRegistry() *config.ModelRegistry
ModelRegistry 获取模型注册表. 如果 Models 未初始化,创建默认注册表并应用 Model 字段.
type ConsecutiveRoleMerger ¶
type ConsecutiveRoleMerger struct{}
ConsecutiveRoleMerger 合并连续的同角色消息.
func (*ConsecutiveRoleMerger) Name ¶
func (m *ConsecutiveRoleMerger) Name() string
func (*ConsecutiveRoleMerger) Normalize ¶
func (m *ConsecutiveRoleMerger) Normalize(messages []query.Message) []query.Message
func (*ConsecutiveRoleMerger) Priority ¶
func (m *ConsecutiveRoleMerger) Priority() int
type ContextWindowCalibrator ¶
type ContextWindowCalibrator struct {
// contains filtered or unexported fields
}
ContextWindowCalibrator 记录并校准各模型的有效上下文窗口.
func NewContextWindowCalibrator ¶
func NewContextWindowCalibrator(path string) *ContextWindowCalibrator
NewContextWindowCalibrator 创建校准器并从磁盘加载历史记录(fail-open). path 是持久化文件路径,通常为 {cwd}/.flyto/context_calibration.json.
func (*ContextWindowCalibrator) EffectiveWindow ¶
func (c *ContextWindowCalibrator) EffectiveWindow(model string, staticDefault int) int
EffectiveWindow 返回模型的有效上下文窗口. 有校准记录则返回校准值,否则返回 staticDefault(不修改任何状态).
升华改进(ELEVATED): staticDefault 由调用方传入而非内部查表-- 校准器不依赖 ModelRegistry,可被任意来源的默认值覆盖, 测试时可传入任意值而无需 mock registry.
func (*ContextWindowCalibrator) RecordFailure ¶
func (c *ContextWindowCalibrator) RecordFailure(model string, actualTokens, maxTokens int)
RecordFailure 记录一次 context_too_long 失败并更新有效窗口.
actualTokens = 本次请求的实际 token 数(触发失败的值) maxTokens = provider 申报的上限(0 表示无法从错误消息中提取)
保守策略:多次失败取历史最小有效窗口,防止单次异常拉高阈值后复发.
func (*ContextWindowCalibrator) Records ¶
func (c *ContextWindowCalibrator) Records() map[string]calibrationRecord
Records 返回所有校准记录的快照(只读,用于日志/诊断).
type DreamConfig ¶
type DreamConfig struct {
MemStore memory.Store
Models *config.ModelRegistry
MinHours float64 // 默认 24
MinSess int // 默认 5
TaskStore *DreamTaskStore
ParentEngine EngineRef // 用于 fork 模式(可选;L1224 起仅 ForkSubAgent 需要它)
// L1224: 独立注入的可观测性依赖.engine.go 构造顺序已调整为先建 observer/activity/rootCtx
// 再 buildDreamEngine,所以这三项在 Config 里就可填,不必走 post-fill 回填.
// Observer==nil 时 NewDreamEngine 兜底 NoopObserver;Activity/RootCtx 可保持 nil.
Observer EventObserver
Activity *ActivityTracker
RootCtx context.Context
// SessionProvider 提供会话列表(可选).
// CLI 场景:传 &FileSessionProvider{Dir: transcriptDir}.
// SDK/API 长驻:可传自定义实现;nil = 不提供 session hint.
SessionProvider SessionProvider
// TranscriptDir 是 transcript 目录,用于 Dream prompt 中的 grep 示例(可选).
// CLI 场景下通常与 FileSessionProvider.Dir 相同.
TranscriptDir string
// PeriodicInterval 是定时触发的间隔(0 = 不启用定时器).
// CLI per-session 不需要设置(session end 触发已足够).
// SDK/API 长驻进程可设置,例如 6 * time.Hour.
PeriodicInterval time.Duration
}
DreamConfig 是 DreamEngine 的配置.
type DreamEngine ¶
type DreamEngine struct {
// contains filtered or unexported fields
}
DreamEngine 是 Dream 巩固系统的核心引擎.
func NewDreamEngine ¶
func NewDreamEngine(cfg *DreamConfig) *DreamEngine
NewDreamEngine 初始化 DreamEngine,从磁盘读取上次巩固时间和会话计数.
func (*DreamEngine) CheckAndRun ¶
func (de *DreamEngine) CheckAndRun(ctx context.Context)
CheckAndRun 检查三层门槛,满足则异步启动 Dream 巩固. 调用方应以 go de.CheckAndRun(ctx) 方式调用,本方法本身也不阻塞.
func (*DreamEngine) Close ¶
func (de *DreamEngine) Close(timeout time.Duration)
Close 停止 DreamEngine,等待正在进行的 Dream goroutine 退出.
调用方应先 cancel 传递给 CheckAndRun 的 context(如 Engine.rootCancel()), 再调用 Close()--context 取消会让 Dream goroutine 感知到停止信号, Close() 只是等待它干净退出.
timeout 参数限制等待时长(0 使用默认 3s). 超时后强制返回(goroutine 仍在后台运行,但文件锁会随进程退出自动释放).
精妙之处(CLEVER): 这不是"强制终止"--goroutine 通过 ctx.Done() 感知停止, Close() 只是礼貌地等一下.强制终止异步 goroutine 在 Go 中没有安全的做法, 也没有必要:goroutine 泄漏最坏情况是进程退出时 OS 回收资源.
func (*DreamEngine) RecordSession ¶
func (de *DreamEngine) RecordSession()
RecordSession 记录一次新会话完成. 在查询循环正常结束时调用(Engine.runLoop 末尾).
func (*DreamEngine) TaskStore ¶
func (de *DreamEngine) TaskStore() *DreamTaskStore
TaskStore 返回 Dream 任务状态存储.
type DreamStatus ¶
type DreamStatus string
DreamStatus 是 Dream 任务状态枚举.
const ( DreamStatusStarting DreamStatus = "starting" DreamStatusOrienting DreamStatus = "orienting" DreamStatusGathering DreamStatus = "gathering" DreamStatusConsolidating DreamStatus = "consolidating" DreamStatusPruning DreamStatus = "pruning" DreamStatusCompleted DreamStatus = "completed" DreamStatusFailed DreamStatus = "failed" )
type DreamTaskState ¶
type DreamTaskState struct {
ID string `json:"id"`
Status DreamStatus `json:"status"`
Phase string `json:"phase"`
SessionsReviewed int `json:"sessions_reviewed"`
// FilesTouched 是 Dream agent 通过 Edit/Write 工具触达的文件路径列表.
//
// 精妙之处(CLEVER): 通过 onMessage 回调从 tool_use 事件提取 file_path,
// 而不是在 Dream 完成后扫描 memory dir 的 mtime--
// 事件流提取:精确,不受并发写入干扰.
// mtime 扫描:有竞态,其他进程写文件会误报.
// filesTouched 用于完成提示"Improved N memories",精确比完整更重要.
FilesTouched []string `json:"files_touched"`
// Turns 是最近 maxDreamTurns 个 assistant 轮次的进度摘要(滚动窗口).
Turns []DreamTurn `json:"turns,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`
Error string `json:"error,omitempty"`
// contains filtered or unexported fields
}
DreamTaskState 是一次 Dream 巩固任务的状态.
type DreamTaskStore ¶
type DreamTaskStore struct {
// contains filtered or unexported fields
}
DreamTaskStore 管理 Dream 任务状态,线程安全.
func NewDreamTaskStore ¶
func NewDreamTaskStore() *DreamTaskStore
NewDreamTaskStore 创建一个新的 Dream 任务状态存储.
func (*DreamTaskStore) AddFileTouched ¶
func (s *DreamTaskStore) AddFileTouched(id string, path string)
AddFileTouched 记录任务修改了一个文件(去重).
func (*DreamTaskStore) AddTurn ¶
func (s *DreamTaskStore) AddTurn(id string, turn DreamTurn)
AddTurn 追加一个 assistant 轮次进度摘要.
精妙之处(CLEVER): 空轮跳过优化--text=="" && toolUseCount==0 时不写入, 避免纯噪声更新(如模型返回空文本)触发消费层不必要的刷新. 滚动窗口:超过 maxDreamTurns 时丢弃最老的一条,保持内存可控.
func (*DreamTaskStore) Get ¶
func (s *DreamTaskStore) Get(id string) *DreamTaskState
Get 获取指定任务的状态(返回深度副本).
func (*DreamTaskStore) Latest ¶
func (s *DreamTaskStore) Latest() *DreamTaskState
Latest 获取最近一次 Dream 任务状态(按开始时间倒序).
func (*DreamTaskStore) Register ¶
func (s *DreamTaskStore) Register(task *DreamTaskState)
Register 注册一个新的 Dream 任务.
func (*DreamTaskStore) SetError ¶
func (s *DreamTaskStore) SetError(id string, errMsg string)
SetError 标记任务失败并记录错误.
func (*DreamTaskStore) Update ¶
func (s *DreamTaskStore) Update(id string, status DreamStatus, phase string)
Update 更新指定任务的状态和阶段.
type DreamTurn ¶
type DreamTurn struct {
Text string // assistant 回复的文本内容(摘要/推理)
ToolUseCount int // 本轮工具调用次数(折叠,不枚举)
}
DreamTurn 是 Dream agent 一个 assistant 轮次的进度摘要.
升华改进(ELEVATED): 早期实现 中 DreamTurn 包含 text + toolUseCount, toolUse 折叠为计数而非枚举--因为 Dream 的工具调用细节对用户无意义(读取了哪些文件), 只有"改动了几次"和"改动了哪些记忆文件"才有意义. 我们保持相同设计.
type ElicitationField ¶
type ElicitationField struct {
// Name 字段名称(对应 JSON schema property key)
Name string
// Type 字段类型("string" / "number" / "boolean")
Type string
// Title 显示给用户的标题(可选)
Title string
// Description 字段描述(可选,展示为输入框 placeholder 或 tooltip)
Description string
// Required 是否必填
Required bool
// Default 默认值(字符串表示,消费层负责类型转换)
Default string
}
ElicitationField 描述用户需要填写的单个字段. 对应 MCP 规范中 elicitation/create 请求的 schema.properties 条目.
type ElicitationHandler ¶
type ElicitationHandler interface {
HandleElicitation(req ElicitationRequest) (ElicitationResponse, error)
}
ElicitationHandler 处理服务器发出的用户输入请求.
实现要求:
- 必须是线程安全的(可能被多个 MCP 服务器并发调用)
- 必须在合理时间内返回(不应无限阻塞)
- 若无法显示 UI,可返回 Action=="cancel"
精妙之处(CLEVER): 接口只有一个方法,消费层实现极简-- CLI 层 4 行代码,HTTP 层 10 行代码,测试桩 1 行代码. 叠加而非替换:多个 ElicitationHandler 可通过 CompositeElicitationHandler 组合.
Shape: synchronous callback. Engine passes ElicitationRequest to the consumer and blocks for an ElicitationResponse (accept / decline / cancel).
形态: 同步回调. 引擎把 ElicitationRequest 交给消费者, 阻塞等 ElicitationResponse (accept / decline / cancel).
type ElicitationHandlerFunc ¶
type ElicitationHandlerFunc func(req ElicitationRequest) (ElicitationResponse, error)
ElicitationHandlerFunc 是 ElicitationHandler 的函数适配器. 方便消费层用闭包实现简单的处理逻辑,无需定义新类型.
func (ElicitationHandlerFunc) HandleElicitation ¶
func (f ElicitationHandlerFunc) HandleElicitation(req ElicitationRequest) (ElicitationResponse, error)
HandleElicitation 实现 ElicitationHandler 接口.
type ElicitationRequest ¶
type ElicitationRequest struct {
// ServerName 发起请求的 MCP 服务器名称
ServerName string
// Message 展示给用户的问题或说明文字(Markdown 格式)
Message string
// Fields 需要用户填写的字段列表(按声明顺序)
Fields []ElicitationField
}
ElicitationRequest 是服务器发出的用户输入请求. 对应 MCP 规范 elicitation/create 请求的完整 params.
type ElicitationResponse ¶
type ElicitationResponse struct {
// Action 用户操作:
// "accept" - 用户确认并提交了数据
// "decline" - 用户明确拒绝(服务器应该放弃该操作)
// "cancel" - 用户取消(服务器可以重试或使用默认值)
Action string
// Values 用户填写的字段值(仅 Action=="accept" 时有意义)
// key 为 ElicitationField.Name,value 为用户输入的字符串
// 消费层负责类型校验;服务器负责最终语义解析
Values map[string]string
}
ElicitationResponse 是用户对 elicitation 请求的回复.
type EmptyMessageFilter ¶
type EmptyMessageFilter struct{}
EmptyMessageFilter 移除没有实质内容的消息. 只包含空文本块的消息也被视为空消息.
func (*EmptyMessageFilter) Name ¶
func (f *EmptyMessageFilter) Name() string
func (*EmptyMessageFilter) Normalize ¶
func (f *EmptyMessageFilter) Normalize(messages []query.Message) []query.Message
func (*EmptyMessageFilter) Priority ¶
func (f *EmptyMessageFilter) Priority() int
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine 是 Agent 引擎的主入口. 核心查询引擎 + 会话管理 + 工具编排, 剥离了所有 UI 和终端依赖.
func (*Engine) BuildAndStream ¶
func (e *Engine) BuildAndStream(ctx context.Context, model string, maxTokens int, promptBlocks []agentctx.SystemPromptBlock, messages []query.Message, toolDefs any, responseFormat *flyto.ResponseFormat, attempt int) (<-chan flyto.Event, error)
BuildAndStream 实现 EngineBackend 接口,封装双路径的请求构建和流式调用.
升华改进(ELEVATED): 替代主循环中的 Provider==nil 判断. 主循环不再感知"直连 Anthropic API"和"通过 flyto.Provider 接口"两条路径.
func (*Engine) Close ¶
Close 优雅关闭引擎,释放所有资源.
精妙之处(CLEVER): 分层关闭,每层有独立超时保护--
- 防重入(atomic CAS)
- Cancel root context(通知所有 goroutine 停止)
- 短暂等待(让 goroutine 有机会干净退出)
- 按依赖顺序关闭资源(先消费者后生产者)
- Observer 最后刷新(等前面的清理事件都发完)
整个 Close 在 CloseTimeout(默认 10s)内完成, failsafe 超时保护确保不会永远挂起.
升华改进(ELEVATED): 早期方案 gracefulShutdown 混合了终端清理/UI/分析刷新, 我们只做引擎资源清理--终端和 UI 是消费层的事. 替代方案:<原方案 530 行混合引擎+终端+UI+分析>
func (*Engine) Context ¶
Context 返回引擎的生命周期 context. 后台 goroutine(Dream,Memory extraction 等)应使用此 context 而非 background. 精妙之处(CLEVER): 暴露 rootCtx 让内部代码能够感知引擎生命周期, 而不需要每个方法都传递 context 参数.
func (*Engine) DisablePlugin ¶
DisablePlugin 禁用指定名称的插件并移除其所有 hooks.
精妙之处(CLEVER): 先 UnregisterAllBySource 再 Disable-- 顺序很重要:如果先 Disable 再 UnregisterAllBySource, 其他 goroutine 可能在 Disable 和 Unregister 的空窗期内 以为插件已禁用但 hooks 还在触发.先移除 hooks 再标记禁用, 保证"hooks 消失"早于"插件状态变化",fail-safe 方向.
func (*Engine) Dream ¶
func (e *Engine) Dream() *DreamEngine
Dream 返回 AutoDream 记忆巩固引擎,消费层可用于查询巩固任务状态.
func (*Engine) EnablePlugin ¶
EnablePlugin 启用指定名称的插件并重新注册其 hooks.
如果插件之前被 Disable,调用此方法重新激活其 hooks. 幂等:多次调用无副作用.
func (*Engine) ExecuteUndo ¶
ExecuteUndo 执行撤销操作. 实现 UndoExecutor 接口,供 OperationLog 回调.
func (*Engine) FileCache ¶
func (e *Engine) FileCache() *FileStateCache
FileCache 返回文件状态缓存,允许消费层查询文件读取历史.
func (*Engine) FileHistoryRef ¶
func (e *Engine) FileHistoryRef() *FileHistory
FileHistoryRef 返回文件历史管理器.
func (*Engine) FileHistoryView ¶
func (e *Engine) FileHistoryView() FileHistoryView
FileHistoryView 返回文件历史的只读查询接口.
升华改进(ELEVATED): L1186 修复 - 比 FileHistoryRef() 更窄的契约. 新代码优先用本访问器, 消费者只能调用 CanRollback / SnapshotCount 两个 只读查询方法, 无法直接调用 Rollback/Prune (后者应通过 Engine.Rollback 走完整路径).
FileHistoryRef() 保留用于向后兼容 (docs/configuration.md 示例 + 已有消费者), 但新代码/新文档应引用 FileHistoryView().
精妙之处(CLEVER): Go 结构化 typing 让 *FileHistory 自动实现 FileHistoryView, 无需任何显式 "implements" 声明或 adapter 类型, 零运行时成本.
func (*Engine) ForkSubAgent ¶
func (e *Engine) ForkSubAgent(cfg *SubAgentConfig) *SubAgent
ForkSubAgent 实现 EngineRef 接口. 将 SpawnSubAgent(*Engine, cfg) 封装为接口方法,消除 agentExecutor 中的类型断言.
精妙之处(CLEVER): 一行封装,让 EngineRef 从"有例外的接口"变为"完整接口"-- agentExecutor 只看到 ForkSubAgent,不需要知道内部调用了 SpawnSubAgent(*Engine, cfg), 也不需要类型断言.EngineRef mock 只需实现此方法即可覆盖 fork 路径.
func (*Engine) InboxServer ¶
InboxServer 返回 UDS Inbox 服务端(可能为 nil,未启用时). 消费层可用于查询 socket 路径或直接监听消息 channel.
func (*Engine) LoadPlugin ¶
LoadPlugin 从目录加载插件并立即同步其 hooks 到 hooks.Manager.
升华改进(ELEVATED): 早期实现 的 loadPluginHooks() 是全局函数,依赖全局 STATE, 并发调用需要手动保证顺序 (早期实现注释 "atomic clear-then-register"). 我们在 Engine 方法上操作,锁保护由 plugin.Host 和 hooks.Manager 各自负责, 无需外部协调--两个子系统的锁是独立的,不会死锁. 替代方案:<Engine.New() 时一次性加载所有插件,之后不支持动态加载> - 否决:SDK 用户需要在运行时根据会话上下文决定加载哪些插件.
func (*Engine) Observer ¶
func (e *Engine) Observer() EventObserver
Observer 返回引擎的可观测性接口. 消费层可以用它查询当前 observer 或在子系统中复用.
func (*Engine) OperationLogRef ¶
func (e *Engine) OperationLogRef() *OperationLog
OperationLogRef 返回统一操作日志.
func (*Engine) Redact ¶
Redact 将字符串中所有已注入的凭据值替换为 `[credential:name]`.
消费层自定义日志组件应在写出任何可能含凭据的字符串前调用此方法. 引擎内部日志路径已自动调用,此方法供消费层在引擎外部的日志代码使用.
func (*Engine) RegisterPromptBundle ¶
func (e *Engine) RegisterPromptBundle(key agentctx.BundleKey, bundle agentctx.PromptBundle)
RegisterPromptBundle 注册自定义 PromptBundle.
升华改进(ELEVATED): 这是平台层和 SDK 用户扩展提示词的标准入口-- 不需要修改引擎源码,在 New() 之后,Run() 之前注册行业特化 Bundle 即可. 引擎内置的 claude+programming Bundle 不受影响.
典型用法(仓储场景):
eng, _ := engine.New(cfg)
eng.RegisterPromptBundle(
engine.BundleKey{ModelFamily: "claude", Scenario: "warehouse"},
myWarehouseBundle,
)
func (*Engine) ResetSectionCache ¶
func (e *Engine) ResetSectionCache()
ResetSectionCache 清空 Section 计算缓存. 在 /clear 或 /compact 后调用,让动态 sections(FLYTO.md,环境信息等)下一轮重新计算. 内部调用 maybeCompact 时自动触发,外部调用方一般不需要显式调用.
升华改进(ELEVATED): 同时重置 PromptHashTracker-- /clear,/compact,Bundle 切换等场景下,SectionCache 被显式清空, 意味着下一次渲染内容可能与上次相同(如 /compact 后会话内容不变), 但 Anthropic 缓存 slot 已不再有效(会话 ID 变化或上下文位置偏移). 重置 tracker 确保下次 runLoop 检测到"变化"并触发重渲,建立新缓存 slot. 替代方案:<只重置 sectionRegistry,不重置 tracker> - 否决:tracker 的哈希还指向旧内容,下一轮 buildSystemPromptWithContext 后 内容可能相同但缓存 slot 已失效,tracker 不重置会错误判定为"未变化"跳过重渲.
func (*Engine) ResultStoreRef ¶
func (e *Engine) ResultStoreRef() *ResultStore
ResultStoreRef 返回大结果存储实例.
func (*Engine) Rollback ¶
Rollback 按消息 ID 回滚所有操作. 升华改进(ELEVATED): 同时回滚文件历史和操作日志-- 文件历史负责物理恢复,操作日志负责逻辑编排. 替代方案:只回滚文件历史(无法回滚数据库/API 操作).
func (*Engine) Run ¶
Run 执行一次完整的 Agent 交互. 返回一个 channel,消费层从中读取流式事件. 用 Go channel 实现流式输出,消费层用 for range 读取即可.
用法:
events := agent.Run(ctx, "修复 src/app.ts 中的类型错误")
for event := range events {
switch e := event.(type) {
case *TextEvent:
fmt.Print(e.Text)
case *ToolUseEvent:
fmt.Printf("调用工具: %s\n", e.ToolName)
}
}
func (*Engine) SetSecret ¶
SetSecret 注册引擎级别的凭据.
注册后,凭据在所有后续 Run() 中生效:
- 注入 Bash 工具子进程的 env var(工具脚本可通过 $name 引用)
- 对所有工具输出进行 value-level 脱敏(将明文替换为 [SECRET:name])
name 建议使用大写(如 "DB_PASSWORD")以符合 env var 命名惯例. value 必须至少 8 字节(防止超短 value 引发大量误替换).
升华改进(ELEVATED): 早期实现 只有静态的 GHA_SUBPROCESS_SCRUB 白名单, 无法处理用户自定义凭据.我们允许动态注册任意 secret, 引擎自动在全部工具输出和 Bash 子进程 env 中处理,消费层无需感知.
典型用法(SDK 嵌入):
eng.SetSecret("DB_PASSWORD", os.Getenv("DB_PASSWORD"))
eng.SetSecret("MY_API_KEY", os.Getenv("MY_API_KEY"))
for ev := range eng.Run(ctx, prompt) { ... }
若需要 per-request 隔离(HTTP API / 多租户),使用 WithSecret RunOption 代替.
func (*Engine) SkillRegistry ¶
func (e *Engine) SkillRegistry() *SkillRegistry
SkillRegistry 返回 Engine 的 Skill 注册表. 消费层可在 New() 之后,Run() 之前调用 engine.SkillRegistry() 注册自定义 Skill.
func (*Engine) SpawnSkillAgent ¶
func (e *Engine) SpawnSkillAgent(ctx context.Context, cfg *SubAgentConfig, prompt string) (string, error)
SpawnSkillAgent 实现 SkillSpawner 接口. 创建 SubAgent 并同步等待结果,供 SkillRegistry.invokeFork 调用.
精妙之处(CLEVER): Engine 实现 SkillSpawner,而不是 SkillRegistry 直接持有 Engine-- 依赖方向从双向变单向(Engine → SkillRegistry,SkillRegistry → SkillSpawner 接口). SkillRegistry 不再"知道"自己运行在哪个 Engine 里,只知道"能 spawn"这件事.
func (*Engine) StrictModeRef ¶
func (e *Engine) StrictModeRef() *StrictMode
StrictModeRef 返回严格模式配置(可能为 nil).
func (*Engine) TokenBudget ¶
func (e *Engine) TokenBudget() *TokenBudgetManager
TokenBudget 返回 Token 预算管理器. 消费层可用于查询上下文窗口状态,计算警告阈值.
type EngineBackend ¶
type EngineBackend interface {
// BuildAndStream 构建请求并发起流式调用.
// attempt 用于可观测性(记录在重试日志中).
BuildAndStream(ctx context.Context, model string, maxTokens int, promptBlocks []agentctx.SystemPromptBlock, messages []query.Message, toolDefs any, responseFormat *flyto.ResponseFormat, attempt int) (<-chan flyto.Event, error)
}
EngineBackend 封装"构建请求 + 流式调用"两个操作.
主循环只调此接口,不判断 Provider==nil.
type EngineError ¶
type EngineError struct {
// Code 是错误码,用于程序化处理
Code ErrorCode
// Message 是人类可读的错误描述
Message string
// Detail 是技术细节,用于调试和日志
Detail string
// Suggestion 是恢复建议,帮助用户解决问题
Suggestion string
// Cause 是导致此错误的原始错误
Cause error
// Retryable 指示此错误是否可以通过重试解决
Retryable bool
}
EngineError 是引擎统一错误类型.
所有引擎内部产生的错误都应包装为此类型, 消费层可以通过类型断言获取结构化的错误信息.
func NewEngineError ¶
func NewEngineError(code ErrorCode, message string, cause error) *EngineError
NewEngineError 创建一个新的引擎错误.
使用默认的 Suggestion 和 Retryable 值. 如果需要自定义,可以在创建后修改字段.
type EngineRef ¶
type EngineRef interface {
// Run 启动一次对话轮次,返回事件流.
Run(ctx context.Context, prompt string, opts ...RunOption) <-chan Event
// Session 获取或创建指定 ID 的会话.
Session(id string) *Session
// Observer 返回事件观察者,用于埋点和错误上报.
Observer() EventObserver
// Activity 返回活动追踪器,用于引用计数(防止 Close() 误判空闲).
Activity() *ActivityTracker
// Context 返回引擎生命周期 context(引擎关闭时 cancel).
Context() context.Context
// Cwd 返回工作目录路径.
Cwd() string
// ForkSubAgent 从当前引擎 fork 一个子 Agent 实例.
//
// 升华改进(ELEVATED): 将 SpawnSubAgent(*Engine, cfg) 包进接口方法--
// agentExecutor 不再需要类型断言(*Engine),EngineRef 成为真正的完整接口.
// 原文件注释说 SpawnSubAgent 是"例外"(需访问大量内部成员),
// 通过此方法封装后,内部成员访问移到 Engine.ForkSubAgent 实现里,
// 对 agentExecutor 完全透明.
// 替代方案:<保留类型断言,维持现状> --否决:三处重复断言,EngineRef 形同虚设.
ForkSubAgent(cfg *SubAgentConfig) *SubAgent
}
EngineRef 是各子系统对 Engine 的最小依赖接口.
只暴露子系统实际需要的能力--子系统不应通过此接口访问 Engine 的 工具注册,会话管理,配置等其他职责.
type ErrorCode ¶
type ErrorCode string
ErrorCode 是引擎错误码枚举. 消费层可以根据错误码进行程序化处理.
const ( ErrAPIAuth ErrorCode = "api_auth_error" // API key 无效或过期 ErrAPIRateLimit ErrorCode = "api_rate_limit" // API 速率限制 ErrAPIOverloaded ErrorCode = "api_overloaded" // API 服务过载 ErrAPIBadRequest ErrorCode = "api_bad_request" // API 请求格式错误 ErrContextTooLong ErrorCode = "context_too_long" // 超出模型上下文窗口(触发自适应校准) ErrToolNotFound ErrorCode = "tool_not_found" // 工具不存在 ErrToolExecution ErrorCode = "tool_execution_error" // 工具执行失败 ErrPermissionDenied ErrorCode = "permission_denied" // 权限被拒绝 ErrContextOverflow ErrorCode = "context_overflow" // 上下文溢出 ErrBudgetExceeded ErrorCode = "budget_exceeded" // 预算超限 ErrMaxTurns ErrorCode = "max_turns_reached" // 轮次超限 ErrSessionNotFound ErrorCode = "session_not_found" // 会话不存在 ErrSessionClosed ErrorCode = "session_closed" // 会话已关闭 ErrMCPConnection ErrorCode = "mcp_connection_error" // MCP 连接失败 ErrPluginLoad ErrorCode = "plugin_load_error" // 插件加载失败 ErrInternal ErrorCode = "internal_error" // 内部错误 ErrStreamTruncated ErrorCode = "stream_truncated" // 流式响应被代理截断(partial-stream) )
func ClassifyAPIError ¶
ClassifyAPIError 将 API 错误字符串分类为对应的 ErrorCode.
根据 HTTP 状态码和错误消息内容判断错误类型. 升华改进(ELEVATED): ErrContextTooLong 在 ErrAPIBadRequest 之前检测-- 大多数 provider 以 HTTP 400 返回 context-too-long, 若顺序颠倒,HTTP 400 会先命中,engine 会当成格式错误报错而非触发自适应校准.
type ErrorContentStripper ¶
type ErrorContentStripper struct {
// Patterns 错误模式映射.如果为 nil,使用 DefaultErrorPatterns.
Patterns map[string][]query.ContentType
}
ErrorContentStripper 根据错误模式自动剥离导致错误的内容块.
func (*ErrorContentStripper) Name ¶
func (s *ErrorContentStripper) Name() string
func (*ErrorContentStripper) Normalize ¶
func (s *ErrorContentStripper) Normalize(messages []query.Message) []query.Message
func (*ErrorContentStripper) Priority ¶
func (s *ErrorContentStripper) Priority() int
type ErrorEvent ¶
type ErrorEvent = flyto.ErrorEvent
type EventEmitter ¶
type EventEmitter func(evt Event)
EventEmitter is the callback signature for forwarding an Event into the parent engine's active Run channel. Implementations wrap the underlying send with backpressure handling (non-blocking select with default drop, or full-block — up to the caller).
EventEmitter 是把 Event 转发到父引擎当前 Run channel 的回调签名. 实现 方自己封装 backpressure (非阻塞 select 兜底丢弃 / 全阻塞, 由 caller 决定).
func EventEmitterFromContext ¶
func EventEmitterFromContext(ctx context.Context) EventEmitter
EventEmitterFromContext 返回由 WithEventEmitter 设置的 emitter, 未设置 返回 nil. 使用模式: if emit := EventEmitterFromContext(ctx); emit != nil { emit(evt) }.
EventEmitterFromContext returns the emitter set by WithEventEmitter, or nil if absent. Usage pattern: if emit := EventEmitterFromContext(ctx); emit != nil { emit(evt) }.
type EventObserver ¶
type EventObserver = flyto.EventObserver
EventObserver 是引擎可观测性核心接口(flyto.EventObserver 的类型别名).
升华改进(ELEVATED): 早期方案在 pkg/engine/observer.go 重复定义了与 pkg/flyto/observer.go 完全相同的接口--消费层要实现 Observer 就必须 import engine 包,产生反向依赖. 改为类型别名后:消费层只需 import flyto(轻量契约包),不需要 import engine. 替代方案:<保留重复定义,靠 Go 结构化类型兼容> - 否决:维护两份接口定义, 任何方法变更都要改两处,且 Config.Observer 字段类型不明确(engine 还是 flyto?).
type ExecutionContext ¶
type ExecutionContext string
ExecutionContext 定义 Skill 的执行方式.
精妙之处(CLEVER): 用具名字符串类型而非 bool 标志-- 未来可以扩展为 "inline" | "fork" | "background" | "stream" 而不需要修改调用方代码(open-closed 原则).
const ( // ExecutionContextInline 在当前对话中内联展开(默认). // Skill 提示词作为工具结果返回,LLM 在同一轮 context 中处理. ExecutionContextInline ExecutionContext = "inline" // ExecutionContextFork 在独立子 Agent 中运行. // 子 Agent 有独立的消息历史和 token 预算,完成后返回结果. ExecutionContextFork ExecutionContext = "fork" )
type FallbackConfig ¶
type FallbackConfig struct {
// FallbackModel 是备用模型 ID(从 ModelRegistry 获取).
// 如果为空,不做降级.
FallbackModel string
// MaxFallbacks 单次 Run 最大降级次数(防止无限循环).
// 默认值为 1.
MaxFallbacks int
// FallbackOnErrors 触发降级的错误类型.
// 例如 ["model_not_found", "quota_exceeded", "overloaded"].
FallbackOnErrors []string
}
FallbackConfig 是模型降级配置.
func DefaultFallbackConfig ¶
func DefaultFallbackConfig() *FallbackConfig
DefaultFallbackConfig 返回默认的降级配置(不启用降级).
type FallbackTracker ¶
type FallbackTracker struct {
// contains filtered or unexported fields
}
FallbackTracker 追踪模型降级状态. 在单次 Run 中使用,记录当前模型和降级历史.
func NewFallbackTracker ¶
func NewFallbackTracker(config *FallbackConfig, originalModel string) *FallbackTracker
NewFallbackTracker 创建一个新的降级追踪器.
config 为降级配置,originalModel 为初始使用的模型 ID. 如果 config 为 nil,使用默认配置(不启用降级).
func (*FallbackTracker) CurrentModel ¶
func (ft *FallbackTracker) CurrentModel() string
CurrentModel 返回当前使用的模型 ID. 如果已降级,返回备用模型;否则返回原始模型.
func (*FallbackTracker) FallbackCount ¶
func (ft *FallbackTracker) FallbackCount() int
FallbackCount 返回已降级次数.
func (*FallbackTracker) OriginalModel ¶
func (ft *FallbackTracker) OriginalModel() string
OriginalModel 返回原始模型 ID.
func (*FallbackTracker) RecordFallback ¶
func (ft *FallbackTracker) RecordFallback()
RecordFallback 记录一次降级事件. 将当前模型切换为备用模型,并增加降级计数.
func (*FallbackTracker) ShouldFallback ¶
func (ft *FallbackTracker) ShouldFallback(err error) (fallbackModel string, should bool)
ShouldFallback 判断在遇到指定错误时是否应该降级.
返回备用模型 ID 和是否应该降级. 以下情况不降级:
- 未配置备用模型
- 已达到最大降级次数
- 错误类型不在触发列表中
func (*FallbackTracker) WasFallback ¶
func (ft *FallbackTracker) WasFallback() bool
WasFallback 返回是否发生过降级.
type FileAgentDefLoader ¶
type FileAgentDefLoader struct {
// Dir 是要扫描的目录路径
Dir string
}
FileAgentDefLoader 从文件系统目录加载 AgentDefinition.
实现 AgentDefLoader 接口. 支持两种目录格式(与 ScanSkillsDir 一致):
- 子目录格式(推荐):dir/agent-name/AGENT.md
- 扁平格式(向后兼容):dir/agent-name.md
func (*FileAgentDefLoader) LoadAgentDefs ¶
func (l *FileAgentDefLoader) LoadAgentDefs() ([]*AgentDefinition, error)
LoadAgentDefs 扫描 Dir 目录,加载所有 AgentDefinition 文件.
type FileBackup ¶
type FileBackup struct {
ContentHash string // SHA256 前 16 字符
BackupPath string // 备份文件的磁盘路径
OriginalExists bool // 原文件是否存在(新建文件为 false)
FileMode fs.FileMode // 原文件的权限
}
FileBackup 单个文件的备份信息.
type FileCacheEntry ¶
type FileCacheEntry struct {
Path string // 绝对路径
ContentHash string // SHA256 前 16 字符
Size int64 // 文件大小(字节)
LineCount int // 行数
ReadAt time.Time // 最后读取时间
ModTime time.Time // 文件修改时间(读取时的)
WasModified bool // 自上次读取后是否被外部修改
}
FileCacheEntry 是文件缓存的单个条目.
**消费形态**: pull API (外部调取). 字段供以下 getter 返回的 快照 / 指针消费:
- `FileStateCache.Info(path) (FileCacheEntry, bool)` -- 值副本, 外部消费者 (SDK / platform / 诊断) 安全读字段
- `FileStateCache.Get/Peek(path) (*FileCacheEntry, bool)` -- 指针, 内部消费路径 (reminders.go CheckFileModifications 已接)
字段活点:
- Path: RecentFiles() 直接读 (消除 lruEntry.path 双重存后 single source of truth)
- ContentHash / Size / LineCount / ReadAt / WasModified: reminders.go CheckFileModifications 消费 -- reminder 文本里展示给模型 "Agent 当时读到的 N bytes / M lines / hash XX / T 前 读过"
- ModTime: IsModified(path) 内部读做 mtime 比对
FileCacheEntry is a single entry in the file state cache.
Consumption shape: pull API. Fields surface via:
- FileStateCache.Info(path) (FileCacheEntry, bool) -- value copy, safe for external consumers (SDK / platform / diagnostics)
- FileStateCache.Get/Peek(path) (*FileCacheEntry, bool) -- pointer, for internal consumers (reminders.go CheckFileModifications)
All six fields (Path / ContentHash / Size / LineCount / ReadAt / WasModified / ModTime) are read by real production code paths; none are formal-only.
type FileChangeChecker ¶
type FileChangeChecker interface {
RecentFiles(n int) []string
IsModified(path string) bool
// Info returns a value snapshot of the cache entry for path so
// reminders can tell the model what specifically was read (size,
// hash, line count, read timestamp, prior-modified flag) -- not
// just "this file changed". Implementations that cannot provide
// the snapshot should return (FileCacheEntry{}, false); the
// consumer (CheckFileModifications) falls back to path-only output.
//
// Info 返回 path 对应缓存条目的值快照, 让 reminder 能告诉模型"读
// 的是什么"(大小 / 哈希 / 行数 / 读取时间 / 是否已被标记过) --
// 而不仅仅是"这个文件变了". 实现若无法提供快照应返回
// (FileCacheEntry{}, false); 消费者 (CheckFileModifications)
// 会降级为只输出路径.
Info(path string) (FileCacheEntry, bool)
}
FileChangeChecker 是文件变更检测接口.
升华改进(ELEVATED): 从 *FileStateCache 具体类型提升为接口-- ReminderSystem 只需要"列出最近文件"和"检查是否变更"两个能力, 不需要 FileStateCache 的全部实现(LRU 缓存,SHA256 哈希等). 接口化后:
- 测试可以注入 mock 而无需构造真实 FileStateCache;
- 未来可以替换为基于 inotify/FSEvents 的实时监听实现;
- 其他模块实现相同接口即可复用 ReminderSystem 的文件变更检测.
替代方案:<保留 *FileStateCache 具体类型> -- 否决:测试困难,扩展受限. 对比:memStore memory.Store 已经是接口,与之风格一致.
type FileHistory ¶
type FileHistory struct {
// contains filtered or unexported fields
}
FileHistory 文件操作历史管理器.
func NewFileHistory ¶
func NewFileHistory(cwd string, observer EventObserver) *FileHistory
NewFileHistory 创建文件历史管理器. cwd 是当前工作目录,用于生成项目级的备份目录.
func NewFileHistoryWithDir ¶
func NewFileHistoryWithDir(baseDir string, observer EventObserver) *FileHistory
NewFileHistoryWithDir 创建指定备份目录的文件历史管理器(主要用于测试).
func (*FileHistory) Backup ¶
func (h *FileHistory) Backup(ctx context.Context, filePath string) error
Backup 实现 memory.Backupper 接口. 在写入记忆文件前调用,无需绑定消息 ID--使用固定的 "memory-autosave" 消息 ID.
精妙之处(CLEVER): memory.Save() 不知道当前消息 ID(记忆写入是异步的, 可能发生在对话结束后),所以用固定的 "memory-autosave" 作为分组键. 这意味着所有记忆文件备份归入同一个快照组--这是合理的, 因为记忆文件的变化是增量的,用一个"记忆自动保存"组统一管理比每次生成随机 ID 更可预测. 替代方案:让 Save() 传入 messageID(需要 Store 接口改动,破坏封装). 替代方案:每次生成 UUID 作为 messageID(快照爆炸,每次 Save 都新建一个组).
func (*FileHistory) BeforeEdit ¶
func (h *FileHistory) BeforeEdit(filePath string, messageID string) error
BeforeEdit 编辑前调用,备份原文件. 如果文件不存在,返回错误(FileEdit 要求文件已存在).
func (*FileHistory) BeforeWrite ¶
func (h *FileHistory) BeforeWrite(filePath string, messageID string) error
BeforeWrite 写入前调用,备份原文件(如果存在). 如果文件不存在(新建),记录一个 OriginalExists=false 的条目.
func (*FileHistory) CanRollback ¶
func (h *FileHistory) CanRollback(messageID string) (bool, []string)
CanRollback 检查是否可以回滚指定消息的文件修改. 返回是否可回滚,以及会影响的文件列表.
func (*FileHistory) Prune ¶
func (h *FileHistory) Prune(ctx context.Context, policy RetentionPolicy) error
Prune 清理超出策略的备份文件. 在 Engine.Close() 时自动调用,也可以手动调用(如定期维护).
清理逻辑:
- 遍历 baseDir 目录下的所有备份文件
- 按 mtime 排序,删除超过 MaxAgeDays 的文件
- 备份文件是内容寻址的(hash 为文件名),无法直接按"原始文件"分组; 因此 MaxVersions 按全局备份文件总数控制--超过上限时删除最旧的.
- 删除空目录
精妙之处(CLEVER): 内容寻址备份的 MaxVersions 语义是"全局版本数上限"而非 "每文件版本数上限"--因为 hash 文件名无法反向映射原始文件. 这是内容寻址存储的固有权衡:节省空间(去重)换取无法按文件分组. 如果需要"每文件版本数",需要额外的索引文件记录 hash→原始路径的映射. 替代方案:维护 hash→原始路径的 JSON 索引(复杂度高,失去内容寻址的简洁性).
历史包袱(LEGACY): 第一版不处理 hash→原始路径映射,MaxVersions 语义是全局总数. 未来版本可以加索引支持真正的"每文件版本数".
func (*FileHistory) Rollback ¶
func (h *FileHistory) Rollback(messageID string) error
Rollback 按消息 ID 回滚所有文件修改. 倒序恢复该消息期间修改的所有文件到修改前的状态.
func (*FileHistory) SnapshotCount ¶
func (h *FileHistory) SnapshotCount() int
SnapshotCount 返回当前快照数量(主要用于测试).
type FileHistoryView ¶
type FileHistoryView interface {
// CanRollback 检查指定 messageID 是否存在可回滚的快照.
// 返回: (是否可回滚, 该次提交涉及的文件路径列表)
CanRollback(messageID string) (bool, []string)
// SnapshotCount 返回当前持有的快照总数.
// 用于监控 / 诊断 / 容量告警场景.
SnapshotCount() int
}
FileHistoryView 是文件历史的只读查询接口, 供消费者绑定 Engine.FileHistoryView() 返回值.
升华改进(ELEVATED): L1186 修复 - 窄接口提取.
历史背景: Engine.FileHistoryRef() 返回具体 *FileHistory 指针, 消费者查询 "能否回滚这轮对话" 时必须绑定具体类型, 不利于测试 mock 和 API 稳定性声明. 此接口明确声明 "我们对消费者承诺这几个只读查询方法稳定" -- 未来 FileHistory 内部重构时可以在不破坏该契约的前提下自由演化.
为什么不放 Rollback/Prune: 这两个是执行/管理动作, 应该通过 Engine.Rollback() 走完整的 context + 事件发射 + 错误处理路径, 不应让消费者通过 FileHistoryView 绕过引擎直接调用.View 的命名本身就暗示 "只读观察" 语义.
同源 precedent:
- memory.Backupper (1 方法, 写侧, for memory 消费者)
- tools/builtin.FileHistoryRecorder (2 方法, 写侧, for tool 消费者)
- FileHistoryView (2 方法, 读侧, 本接口, for 外部消费者的 "查询" 场景)
三个接口构成 FileHistory 面对不同消费场景的 role interface 分层, 每个消费者只绑定他需要的窄契约, 不暴露完整的 FileHistory 结构.
替代方案 A: 改 FileHistoryRef() 签名为 FileHistoryView 返回类型 - 否决:
破坏现有 API (docs/configuration.md 有示例), 成本高于收益.
替代方案 B: 把 Rollback/Prune 也放进来 - 否决:
执行类方法应该只通过 Engine 层调用, 不给消费者 "半侧门" 绕过上下文管理.
Shape: pull. Consumer calls Engine.FileHistoryView() to obtain the read-only view, then queries CanRollback / SnapshotCount on demand for UI rendering, monitoring, or audit.
形态: 调取 (pull). 消费者调 Engine.FileHistoryView() 拿只读视图, 按需 查 CanRollback / SnapshotCount 供 UI 渲染 / 监控 / 审计.
type FileLock ¶
type FileLock struct {
// contains filtered or unexported fields
}
FileLock 是基于文件系统的分布式锁.
type FilePlanQueue ¶
type FilePlanQueue struct {
// contains filtered or unexported fields
}
FilePlanQueue 将计划状态持久化为 JSON 文件,后台 goroutine 串行执行计划.
文件布局:
{dir}/
plan-{ts}-{rand}.json ← 每个计划一个文件
并发安全:mu 保护内存中的 cancelFuncs 映射;文件操作使用原子写入.
func NewFilePlanQueue ¶
func NewFilePlanQueue(dir string, execFunc PlanExecFunc) (*FilePlanQueue, error)
NewFilePlanQueue 创建 FilePlanQueue.
dir:计划文件目录,空字符串使用默认路径 ~/.flyto/plans/. execFunc:执行函数,nil 表示仅管理状态(适合测试或只做状态服务的 replica).
func (*FilePlanQueue) Cancel ¶
func (q *FilePlanQueue) Cancel(planID string) error
Cancel 取消计划.
pending 状态:直接改为 cancelled 并写文件. running 状态:调用 context.CancelFunc 中断执行,runLoop 会写 cancelled 状态.
func (*FilePlanQueue) Close ¶
func (q *FilePlanQueue) Close() error
Close 关闭队列,等待当前执行的计划完成(或超时 30 秒).
func (*FilePlanQueue) List ¶
func (q *FilePlanQueue) List() ([]*QueuedPlan, error)
List 返回所有计划,按提交时间倒序.
func (*FilePlanQueue) RecoverPending ¶
func (q *FilePlanQueue) RecoverPending() error
RecoverPending 在 daemon 启动时调用,恢复崩溃前处于 running 状态的计划.
升华改进(ELEVATED): daemon 崩溃时 running 状态的计划是"进行中但结果未知", 早期实现 无此机制(无 daemon 模式). 我们将其重置为 pending 重新入队,保证 at-least-once 执行语义. 替代方案:标记为 failed(更保守,但浪费了已完成的工作).
func (*FilePlanQueue) Status ¶
func (q *FilePlanQueue) Status(planID string) (*QueuedPlan, error)
Status 查询计划状态(直接读文件,无锁).
func (*FilePlanQueue) Submit ¶
func (q *FilePlanQueue) Submit(steps []PlanStep, opts PlanSubmitOptions) (string, error)
Submit 提交计划,立即返回 planID.
type FilePlanStore ¶
type FilePlanStore struct {
// Dir 是计划文件目录(通常 ~/.flyto/plans/).
// 若为空,使用 os.TempDir()/flyto-plans/.
Dir string
// contains filtered or unexported fields
}
FilePlanStore 将计划存储为本地文件,使用 word-slug 作为文件名. 对应早期方案 的 getPlanFilePath / getPlan / writePlan 组合.
func (*FilePlanStore) ClearSession ¶
func (s *FilePlanStore) ClearSession(sessionID string)
ClearSession 删除给定会话的 slug 缓存条目(用于 /clear 命令). 对应早期方案 clearPlanSlug().
func (*FilePlanStore) PlanPath ¶
func (s *FilePlanStore) PlanPath(sessionID string) string
PlanPath 返回计划文件的完整路径(用于展示).
func (*FilePlanStore) ReadPlan ¶
func (s *FilePlanStore) ReadPlan(sessionID string) (string, error)
ReadPlan 读取计划内容.文件不存在时返回 ("", nil).
func (*FilePlanStore) WritePlan ¶
func (s *FilePlanStore) WritePlan(sessionID, content string) error
WritePlan 将 content 原子写入 {Dir}/{slug}.md.
L1205 (2026-04-13): 早期方案直接 os.WriteFile, 注释声称"原子写入"但实际不是-- 中途崩溃会留下截断文件, 并发 Read 也可能读到半写状态.
升华改进(ELEVATED): 改为 tmp file + os.Rename 真正原子写入. POSIX 保证同文件系统 rename 原子, Read 永远看到"某次完整 Write 的结果", 不会撞上半写.同时跨进程安全--另一个进程 (daemon / UI) 在 Read 时不需要锁.
替代方案 1: 加 sync.Mutex 锁 Write/Read -- 否决, 只能保护同进程, 解决不了跨进程场景, 是"形似 MemoryPlanStore"的肤浅一致性, 不是"读到的内容总是某次完整 Write"的语义一致性. 替代方案 2: 用文件锁 (flock) -- 过度工程, POSIX rename 足够且更简单.
精妙之处(CLEVER): tmp 文件与目标文件同目录 - 跨目录 rename 在 ext4/xfs 等主流文件 系统上也原子, 但"同目录 rename"是跨所有 POSIX 文件系统的最强保证 (EXDEV 错误排除).
type FileScratchpad ¶
type FileScratchpad struct {
// contains filtered or unexported fields
}
FileScratchpad 是基于文件系统的 Scratchpad 实现.
实现 builtin.ScratchpadStore 接口,可作为 in-memory Scratchpad 的直接替代. 数据持久化到目录,支持跨进程共享.
func NewFileScratchpad ¶
func NewFileScratchpad(dir string) (*FileScratchpad, error)
NewFileScratchpad 创建文件系统持久化的 Scratchpad.
参数 dir 是存储目录路径(会自动创建,如果不存在). 调用方应使用会话级别的专用目录,不同会话不应共享同一目录 (除非有意共享状态,如 Team 协调器场景).
func (*FileScratchpad) Get ¶
func (s *FileScratchpad) Get(key string) (string, bool)
Get 读取指定 key 的值.
返回 (value, true) 如果存在且未过期;返回 ("", false) 否则. 过期条目在 Get 时惰性删除(与 in-memory 实现一致).
func (*FileScratchpad) Keys ¶
func (s *FileScratchpad) Keys() []string
Keys 返回所有未过期 key 的有序列表.
扫描目录中的所有 .json 文件,读取原始 key,过滤已过期的条目. 过期文件在扫描时惰性删除.
精妙之处(CLEVER): 目录扫描 O(n) 是可接受的-- Scratchpad 用于 Agent 的工作内存,通常只有数十个条目; n=100 条时扫描 + 读文件的延迟远低于一次 LLM API 调用. 替代方案:<维护内存索引> - 否决:增加内存-磁盘同步的复杂性,
进程重启后索引丢失(需要从磁盘重建,等价于扫描).
type FileSessionProvider ¶
type FileSessionProvider struct {
// Dir 是 transcript 目录,例如 ~/.flyto/projects/<hash>/
Dir string
// ExcludeID 是要排除的会话 ID(通常是当前会话,其 mtime 总是很新)
ExcludeID string
}
FileSessionProvider 扫描指定目录中自 sinceTime 以来被修改的 JSONL 文件. 文件名(去掉 .jsonl 扩展名)即为 session ID.
这是 CLI 场景的标准实现,对应早期方案 listSessionsTouchedSince. 目录结构:<dir>/<session_id>.jsonl(每个会话一个文件).
type FileSnapshot ¶
type FileSnapshot struct {
MessageID string // 关联的消息 ID
Timestamp time.Time // 快照创建时间
FileBackups map[string]*FileBackup // filePath → backup
}
FileSnapshot 一次消息期间的文件快照.
type FileSnapshotStore ¶
type FileSnapshotStore struct {
// Dir 快照文件目录.
// 若为空,使用 defaultSnapshotDir()(~/.flyto/snapshots/).
Dir string
}
FileSnapshotStore 是基于文件系统的快照存储实现.
快照文件路径:<dir>/<conversation-id>.json 默认目录:~/.flyto/snapshots/
原子写入策略:
- 写入 <path>.tmp 临时文件
- os.Rename 原子替换(Unix 保证原子性,Windows 近似原子)
- Rename 失败时删除临时文件,返回错误
精妙之处(CLEVER): Rename 在同一文件系统内是原子操作(单个 inode 替换)-- 即使进程在 WriteFile 完成后 Rename 前崩溃,也不会留下损坏的快照文件(旧文件完好保留). 只有 Rename 成功后,新快照才"生效". 替代方案:直接写目标文件(崩溃时可能留下半截 JSON,Load 会失败).
func NewFileSnapshotStore ¶
func NewFileSnapshotStore(dir string) *FileSnapshotStore
NewFileSnapshotStore 创建一个使用指定目录的文件快照存储. dir 为空时使用 ~/.flyto/snapshots/.
func (*FileSnapshotStore) Delete ¶
func (s *FileSnapshotStore) Delete(_ context.Context, conversationID string) error
Delete 删除快照文件.幂等:文件不存在时静默返回 nil.
func (*FileSnapshotStore) Load ¶
func (s *FileSnapshotStore) Load(_ context.Context, conversationID string) (SessionSnapshot, bool, error)
Load 从文件加载快照. 若文件不存在,返回 (zero, false, nil).
func (*FileSnapshotStore) Save ¶
func (s *FileSnapshotStore) Save(_ context.Context, snap SessionSnapshot) error
Save 将快照原子写入文件. 若目录不存在,自动创建(MkdirAll).
type FileStateCache ¶
type FileStateCache struct {
// contains filtered or unexported fields
}
FileStateCache 跟踪 Agent 读过哪些文件,它们的内容哈希和大小.
精妙之处(CLEVER): O(1) LRU 实现--container/list(双向链表)+ map[string]*list.Element 双结构. 链表维护时序(头=最新,尾=最旧),map 维护 path→节点 的 O(1) 查找. 移动节点到头部:list.MoveToFront(elem),O(1). 淘汰最旧节点:list.Back() + list.Remove(),O(1).
对比早期方案 O(n) 实现:每次 Get/Record 都要 O(n) 线性扫描 []string lru 队列找旧位置. 在 maxSize=1000 时差距不大(≈1μs),但如果 maxSize 增大到 10K+ 时早期方案会明显变慢. 更重要的是:O(1) 实现是更正确的 LRU,不是过度工程.
func NewFileStateCache ¶
func NewFileStateCache(maxSize int) *FileStateCache
NewFileStateCache 创建一个文件状态缓存. maxSize 指定最大缓存条目数,<= 0 时使用默认值 1000.
func (*FileStateCache) Get ¶
func (c *FileStateCache) Get(path string) (*FileCacheEntry, bool)
Get 查询指定路径的缓存条目,并将其标记为"最近使用"(更新 LRU 顺序). 返回缓存条目和是否存在.命中/未命中会计入统计.
精妙之处(CLEVER): Get 主动更新 LRU 顺序--每次访问都把条目移到链表头部. 这实现了经典 LRU 语义:最近访问的最后被淘汰. 如果你只想查询存在性而不想影响淘汰顺序(如权限检查),请用 Peek.
func (*FileStateCache) Info ¶
func (c *FileStateCache) Info(path string) (FileCacheEntry, bool)
Info 返回 path 对应缓存条目的完整快照 (值类型, 非指针), 不影响 LRU 顺序, 不计入命中统计. 和 Peek 的差别: Peek 返回 *FileCacheEntry 指针供内部读, 外部拿到指针可能无意 mutate cache 状态; Info 返回值 副本, 安全暴露给外部消费者 (SDK / platform / 诊断面板).
这是 FileCacheEntry 元数据的**真 pull API**: 此前 FileCacheEntry 声明在 pkg/engine 是 exported 类型, 却没有任何 exported 出口让外部 拿到实例, 导致 Path / ContentHash / Size / LineCount / ReadAt / WasModified 6 字段长期"声明未读". Info 打通这条路径, 让外部能读:
- 内容哈希: 诊断"Agent 看到的版本 vs 当前磁盘版本"
- 大小 / 行数: 审计"Agent 读了多大文件"
- 读取时间: 审计"上次读这个文件是何时"
- 被修改标记: 展示"此文件自读后被外部改过"
Info returns a value snapshot of the cache entry for path, with no LRU reordering and no hit-stat impact. Differs from Peek (which returns *FileCacheEntry for internal reads and allows accidental mutation) by returning a copy -- safe to expose to external consumers. This is the real pull API for FileCacheEntry metadata; previously the struct was exported but unreachable from outside, leaving the six fields formally defined yet never read.
func (*FileStateCache) IsModified ¶
func (c *FileStateCache) IsModified(path string) bool
IsModified 检查文件自上次读取后是否被外部修改. 通过比较当前文件的 mtime 和缓存中记录的 mtime 判断. 如果文件不在缓存中或无法 stat,返回 true(保守策略).
精妙之处(CLEVER): IsModified 使用 Peek 语义--不更新 LRU 顺序. 修改检测是"观察"行为,不是"使用"行为. 如果用 Get,每次检测外部修改都会把文件蹭到 LRU 顶端, 导致"被检测但未被使用"的文件挤占缓存空间.
func (*FileStateCache) Peek ¶
func (c *FileStateCache) Peek(path string) (*FileCacheEntry, bool)
Peek 查询缓存条目但**不更新 LRU 顺序,不计入命中统计**.
升华改进(ELEVATED): 早期方案无 Peek,所有查询都用 Get 更新 LRU 顺序. 权限系统,后台审计,秘密扫描等"只想观察"的场景用 Get 会污染 LRU-- 被扫描的文件蹭到顶端,真正频繁使用的文件反而可能被驱逐. 我们提供 Peek 实现"观察不污染"语义,用于:
- 权限分类器检查"此文件是否被读过"
- SecretGuard 后台扫描已缓存文件
- DreamEngine / FreshnessChecker 检查文件缓存状态
替代方案:<统一用 Get> - 否决原因:后台周期性扫描会系统性地把扫描集里的所有文件推到 LRU 顶端, 导致工作集外的文件占满缓存,把工作集内的文件驱逐出去.
func (*FileStateCache) RecentFiles ¶
func (c *FileStateCache) RecentFiles(limit int) []string
RecentFiles 返回最近读过的文件列表(按最近使用排序,最近的在前). limit 指定最多返回多少个,<= 0 时返回全部.
func (*FileStateCache) Record ¶
func (c *FileStateCache) Record(path string, content []byte)
Record 在读取文件后记录其状态到缓存. content 是文件的完整内容(用于计算哈希和行数).
type FlushGate ¶
type FlushGate[T any] struct { // contains filtered or unexported fields }
FlushGate 是一个泛型的 flush 排队门. 在 flush 期间,通过 Enqueue 提交的项会被暂存; flush 结束后通过 End 取回所有暂存项.
func (*FlushGate[T]) Deactivate ¶
func (g *FlushGate[T]) Deactivate()
Deactivate 清除激活状态但保留 pending 项. 用于 transport 替换场景:旧 transport 的 flush 不再等待, 但排队的项需要由新 transport 处理. 精妙之处(CLEVER): Deactivate 只清除 active 标记但保留 pending 队列-- 这是为 transport 热替换设计的:旧 transport 的 flush 中断时, 排队的消息不能丢(可能包含工具结果),需要由新 transport 接手处理. 如果用 End() 会取走 pending 数据但可能没有消费者处理.
func (*FlushGate[T]) End ¶
func (g *FlushGate[T]) End() []T
End flush 结束,返回所有排队的项并清空队列. 调用后 active 变为 false,后续 Enqueue 将返回 false.
func (*FlushGate[T]) Enqueue ¶
Enqueue 在 flush 期间将项加入排队. 返回 true 表示项已排队(flush 进行中); 返回 false 表示当前不在 flush 中,调用者应直接处理该项.
func (*FlushGate[T]) PendingCount ¶
PendingCount 返回当前排队的项数量.
type FuncApprovalPolicy ¶
type FuncApprovalPolicy struct {
Fn func(ctx context.Context, event PlanApprovalEvent) (approved bool, editedPlan string, err error)
}
FuncApprovalPolicy 用函数实现审批策略(SDK 嵌入最常用).
升华改进(ELEVATED): 函数式审批策略让 SDK 用户不需要实现接口, 只需要传一个 func 就能定制审批逻辑. 对比:需要实现接口的方式在 Go 里更正式但更繁琐,不适合快速集成.
func (FuncApprovalPolicy) RequestApproval ¶
func (p FuncApprovalPolicy) RequestApproval(ctx context.Context, event PlanApprovalEvent) (bool, string, error)
type ImageValidator ¶
type ImageValidator struct {
// MaxSizeBytes 图片大小限制(字节).如果为 0,使用默认值.
MaxSizeBytes int64
}
ImageValidator 验证并处理超大图片块.
精妙之处(CLEVER): 不直接删除超大图片,而是替换为文本提示. 这样模型知道"这里原来有一张图",可以据此做出合理回应 (比如让用户重新上传小一点的版本),而不是莫名其妙地丢失信息.
func (*ImageValidator) Name ¶
func (v *ImageValidator) Name() string
func (*ImageValidator) Normalize ¶
func (v *ImageValidator) Normalize(messages []query.Message) []query.Message
func (*ImageValidator) Priority ¶
func (v *ImageValidator) Priority() int
type InboxMessageEvent ¶
type InboxMessageEvent = flyto.InboxMessageEvent
type InputProcessor ¶
type InputProcessor struct {
// contains filtered or unexported fields
}
InputProcessor 处理用户输入的预处理器.
func NewInputProcessor ¶
func NewInputProcessor(cwd string, fileCache *FileStateCache) *InputProcessor
NewInputProcessor 创建一个输入预处理器.
参数:
- cwd: 当前工作目录,用于解析相对路径
- fileCache: 文件状态缓存,引用的文件会被记录到缓存中.可为 nil.
func (*InputProcessor) Process ¶
func (p *InputProcessor) Process(input string) (*ProcessedInput, error)
Process 预处理用户输入. 按以下顺序处理:
- 检测是否为空输入
- 检测斜杠命令(如果是斜杠命令,跳过其他处理)
- 检测并展开文件引用
- 检测图片引用
- 检测 URL
type LocalAuditSink ¶
type LocalAuditSink struct {
// contains filtered or unexported fields
}
LocalAuditSink 将审计记录以 JSONL 格式追加写入本地文件. 线程安全:内部用 sync.Mutex 保证并发写入不交叉.
func NewLocalAuditSink ¶
func NewLocalAuditSink(path string) (*LocalAuditSink, error)
NewLocalAuditSink 创建本地文件 AuditSink. path 是目标文件路径(如 ~/.flyto/audit.jsonl). 如果父目录不存在,自动创建(类似 mkdir -p). 文件以追加模式打开,已有记录不会被覆盖.
func (*LocalAuditSink) Close ¶
func (s *LocalAuditSink) Close() error
Close 刷新缓冲区并关闭文件. 调用 Close 后不应再调用 Write.
func (*LocalAuditSink) Write ¶
func (s *LocalAuditSink) Write(entry security.AuditEntry) error
Write 将一条审计记录以 JSON 序列化后追加到文件(换行结尾). 线程安全.
type ManagedSession ¶
type ManagedSession struct {
// Session 是底层会话对象
Session *Session
// Info 是会话摘要信息
Info SessionInfo
// contains filtered or unexported fields
}
ManagedSession 是被管理器管理的会话,包含额外的元数据.
type MemoryPlanStore ¶
type MemoryPlanStore struct {
// contains filtered or unexported fields
}
MemoryPlanStore 将计划存储在内存 map 中,适合 SDK 嵌入服务端或单元测试.
升华改进(ELEVATED): 早期方案没有内存实现--测试时必须 mock fs 或用真实文件系统. 我们提供 MemoryPlanStore 让测试不需要 tmp 目录,SDK 嵌入 Web Server 时也避免 文件系统权限问题(容器 read-only rootfs 场景).
func NewMemoryPlanStore ¶
func NewMemoryPlanStore(prefix string) *MemoryPlanStore
NewMemoryPlanStore 创建内存计划存储. prefix 用于 PlanPath 展示,例如 "memory://plans".
func (*MemoryPlanStore) PlanPath ¶
func (m *MemoryPlanStore) PlanPath(sessionID string) string
PlanPath 返回逻辑路径,格式 "{prefix}/{sessionID}".
func (*MemoryPlanStore) ReadPlan ¶
func (m *MemoryPlanStore) ReadPlan(sessionID string) (string, error)
func (*MemoryPlanStore) WritePlan ¶
func (m *MemoryPlanStore) WritePlan(sessionID, content string) error
type MessageNormalizer ¶
type MessageNormalizer interface {
Name() string
Priority() int // 执行优先级(越小越先执行)
Normalize(messages []query.Message) []query.Message
}
MessageNormalizer 单个消息规范化步骤.
升华改进(ELEVATED): 从硬编码 3 步升级为可组合管道. 早期方案有 12 个 pass 串联(互相依赖,顺序脆弱). Pipeline 模式让每个步骤独立,可测试,可替换. 场景可以注册自己的步骤(仓储:传感器数据格式化,法律:脱敏). 替代方案:硬编码所有步骤在一个大函数里(原始设计,400 行,12 个 pass 互相依赖).
type MetricObserver ¶
type MetricObserver = flyto.MetricObserver
type MigrateFunc ¶
type MigrateFunc func(*Transcript) error
MigrateFunc 是一个 Transcript 迁移函数的类型. 输入是版本 N 的 Transcript,函数就地修改为版本 N+1 的格式. 返回 error 时迁移中止,不会继续执行更高版本的迁移.
迁移函数必须满足:
- 幂等:对已经是 N+1 格式的数据调用不产生副作用(防御性检查)
- 无损:迁移后原有字段语义不变,只新增或规范化
- 可回滚(尽力):能记录变更内容,方便问题排查
type NoopApprovalPolicy ¶
type NoopApprovalPolicy struct{}
NoopApprovalPolicy 自动批准所有计划,用于测试和 bypass 场景.
func (NoopApprovalPolicy) RequestApproval ¶
func (NoopApprovalPolicy) RequestApproval(_ context.Context, _ PlanApprovalEvent) (bool, string, error)
type NoopElicitationHandler ¶
type NoopElicitationHandler struct{}
NoopElicitationHandler 是默认的空操作处理器. 引擎未配置 ElicitationHandler 时使用,直接返回 cancel-- 让服务器知道客户端无法收集用户输入,服务器自行决策.
精妙之处(CLEVER): cancel 而非 decline,语义上"无法响应"而非"用户拒绝"-- 服务器可以据此走默认值路径,而不是中止操作.
func (NoopElicitationHandler) HandleElicitation ¶
func (NoopElicitationHandler) HandleElicitation(_ ElicitationRequest) (ElicitationResponse, error)
HandleElicitation 永远返回 cancel(无 UI 消费层的通用兜底).
type NoopObserver ¶
type NoopObserver struct{}
NoopObserver 空实现(不做任何事,生产环境未配置时用).
精妙之处(CLEVER): 引擎内部不需要 nil 检查 observer, 未配置时统一用 NoopObserver--所有调用都是空操作但不会 panic. 替代方案:每次调用前 `if observer != nil`(散布在几十个地方,容易遗漏).
type NormalizePipeline ¶
type NormalizePipeline struct {
// contains filtered or unexported fields
}
NormalizePipeline 可组合的规范化管道.
精妙之处(CLEVER): 步骤通过 Priority 隐式排序,不显式声明依赖. 早期方案的 3 步有显式的顺序依赖("先 strip orphan 再 filter empty 再 merge"), 改一步可能破坏另一步.Priority 让每个步骤只关心自己排在第几, 新增步骤不需要知道其他步骤的存在.
使用示例:
pipeline := NewNormalizePipeline()
// 内置步骤自动注册
// 场景可以追加:
pipeline.Add(&SensorDataNormalizer{}) // 仓储
pipeline.Add(&RedactionNormalizer{}) // 法律
normalized := pipeline.Run(messages)
func DefaultNormalizePipeline ¶
func DefaultNormalizePipeline() *NormalizePipeline
DefaultNormalizePipeline 返回内置的规范化管道.
步骤按 Priority 排序执行 (L1187 修复: 每行后加依赖原因):
5: AttachmentReorderer - 附件消息上浮 (暂不启用, 需先引入附件概念)
8: ToolResultPairingNormalizer - tool_use/tool_result 完整配对修复 (7.3)
[必须早于 orphan_tool_result - 成对后才能检出孤儿]
10: OrphanToolResultRemover - 孤立 tool_result 移除
[依赖: tool_result_pairing 先运行]
15: ErrorContentStripper - 错误内容剥离 [独立]
18: OrphanThinkingFilter - 孤立 thinking 过滤 [独立]
20: EmptyMessageFilter - 空消息过滤
[必须早于 whitespace_assistant - 空消息是空白特例]
22: WhitespaceAssistantFilter - 空白 assistant 过滤
[依赖: empty_message 先运行]
25: ToolUseInputNormalizer - tool_use 输入规范化 [独立]
30: ConsecutiveRoleMerger - 连续同角色合并
[必须尾部 - 合并只在所有结构化过滤完成后才有意义]
50: ImageValidator - 图片验证
[必须最后 - 验证 pass 不应看到被清理前的脏数据]
L1187 修复说明 (2026-04-13): Priority 系统本质是**隐式依赖** - 数值定顺序而非 显式声明 DependsOn.改成完整 DependsOn + 拓扑排序是过度设计 (没有活跃外部 消费者注入自定义 normalizer).改用**文档化依赖图 + 契约测试**作为最小 viable 方案: 文档告诉读者"为什么这个数值", 测试保证未来改 Priority 不会静默破坏依赖.
依赖图 (有向边 = "必须早于"):
tool_result_pairing (8) ─────────► orphan_tool_result (10) empty_message (20) ─────────► whitespace_assistant (22) (所有结构化 normalizer) ─────────► consecutive_role (30) (所有 transformation) ─────────► image_validator (50)
其余 normalizer (error_content_strip / orphan_thinking / tool_input) 独立, 可以插入任意位置而不破坏语义.未来改 Priority 值必须保持上述 4 条有向边的 偏序关系, 否则会静默破坏规范化语义.
契约测试: `TestDefaultNormalizePipeline_OrderingContract` 锁定精确顺序, `TestDefaultNormalizePipeline_DependencyInvariants` 锁定核心偏序 (允许新增 normalizer 只要保持偏序).两个互补, 破坏任一条偏序会 fail CI.
消费者插入自定义 normalizer 的指南:
- 只读分析 → 任意位置, 不影响语义
- 结构化过滤 → 插在 empty_message (20) 之前, 或 consecutive_role (30) 之前
- 验证 pass → 优先级 > 30, 即后于 consecutive_role
- **绝不允许** 插在 tool_result_pairing (8) 和 orphan_tool_result (10) 之间
精妙之处(CLEVER): 最后一道防线--即使上层 runLoop 逻辑完美, 压缩,恢复,消息合并等操作都可能产生不合法的消息序列. Pipeline 在发送 API 请求前做最终清理,保证 API 永远不会收到畸形数据.
func DefaultNormalizePipelineWithObserver ¶
func DefaultNormalizePipelineWithObserver(observer EventObserver, strict *StrictMode) *NormalizePipeline
DefaultNormalizePipelineWithObserver 返回内置的规范化管道,并注入 Observer 和 StrictMode.
升华改进(ELEVATED): Observer 注入让规范化管道的每次修复都可观测. 生产中消息配对错误是最常见的 API 400 根因--没有可观测性就无法定位. 替代方案:规范化管道内部用 fmt.Println 调试(开发时有用,生产中无法消费).
func NewNormalizePipeline ¶
func NewNormalizePipeline(steps ...MessageNormalizer) *NormalizePipeline
NewNormalizePipeline 创建管道,可选传入初始步骤.
func (*NormalizePipeline) Add ¶
func (p *NormalizePipeline) Add(step MessageNormalizer)
Add 向管道追加一个步骤.
func (*NormalizePipeline) Remove ¶
func (p *NormalizePipeline) Remove(name string) bool
Remove 按名称移除一个步骤.返回是否成功移除.
type OperationEntry ¶
type OperationEntry struct {
ID string // 操作唯一 ID(通常等于 tool_use ID)
MessageID string // 关联的消息 ID
TurnNumber int // 对话轮次号 (pull API, 外部审计经 GetByMessage 读)
ToolName string // 工具名称
Input json.RawMessage // 工具输入参数
Output string // 工具输出 (pull API, 可能是 orchestrator 截断后的短摘要, 见 Truncated)
UndoInfo *tools.UndoInfo // 撤销信息(可选)
Timestamp time.Time // 操作时间
Status string // opStatusSuccess / opStatusFailed / opStatusRolledBack
// Truncated marks that Output is a short summary -- the full tool
// result was persisted to StoredPath by the orchestrator. Audit
// consumers walking OperationLog need this flag to avoid treating
// summary as the authoritative record.
//
// Truncated 标记 Output 是短摘要 -- 完整工具结果已由 orchestrator 落盘到
// StoredPath. 走 OperationLog 的审计消费者据此避免把摘要当作权威记录.
Truncated bool
// StoredPath is the persistence location of the full tool result.
// Path shape is consumer-defined (local path / sandbox path / object
// key). Empty when Truncated is false. SECURITY: same constraints as
// flyto.ToolResultEvent.StoredPath -- treat as caller-specific data.
//
// StoredPath 是完整工具结果的持久化位置. Path shape 由消费层定义 (本地
// 路径 / sandbox 路径 / 对象 key). Truncated=false 时为空串. 安全: 约束
// 同 flyto.ToolResultEvent.StoredPath -- 当调用方特定数据对待.
StoredPath string
}
OperationEntry is a single operation record in OperationLog.
Field consumption forms:
- ID / MessageID / ToolName / Input / UndoInfo / Timestamp / Status are consumed by the engine rollback path (RollbackMessage walks entries, dispatches UndoInfo via UndoExecutor) AND emitted in the opEventRecorded observer event.
- Output / TurnNumber are external pull-API fields: consumed by audit consumers via OperationLog.GetByMessage() / .GetByMessageLocked(). The engine's rollback path itself does not read them. Note the observer event opEventRecorded also carries a parallel turn_number key (read by AuditObserver in audit_observer.go) -- the observer path and the OperationEntry pull path are two disjoint sinks, not redundant: observer is push, OperationEntry is pull-on-demand.
- Truncated / StoredPath see field-level godoc below.
OperationEntry 是 OperationLog 中一条操作记录.
字段消费形态:
- ID / MessageID / ToolName / Input / UndoInfo / Timestamp / Status 由引擎 rollback 路径消费 (RollbackMessage 遍历 entries, 经 UndoExecutor 派发 UndoInfo), 同时在 opEventRecorded observer 事件中 emit.
- Output / TurnNumber 是外部调取 (pull) API 字段: 审计消费者经 OperationLog.GetByMessage() / .GetByMessageLocked() 读取. 引擎自身 rollback 路径不读. 注意 observer 事件 opEventRecorded 同时携带 parallel turn_number key (由 audit_observer.go 的 AuditObserver 读), observer 路径和 OperationEntry 调取路径是两个互不相交的 sink, 非 冗余: observer 是推送 (push), OperationEntry 是按需调取 (pull).
- Truncated / StoredPath 见字段级 godoc.
type OperationLog ¶
type OperationLog struct {
// contains filtered or unexported fields
}
OperationLog 统一操作日志.
func NewOperationLog ¶
func NewOperationLog(observer EventObserver) *OperationLog
NewOperationLog 创建统一操作日志.
func (*OperationLog) EntryCount ¶
func (l *OperationLog) EntryCount() int
EntryCount 返回操作记录总数(主要用于测试).
func (*OperationLog) GetByMessage ¶
func (l *OperationLog) GetByMessage(messageID string) []*OperationEntry
GetByMessage 获取某消息的所有操作.
func (*OperationLog) GetByMessageLocked ¶
func (l *OperationLog) GetByMessageLocked(messageID string) []*OperationEntry
GetByMessageLocked 获取某消息的所有操作(必须在持有锁的情况下调用或无锁场景). 注意:RollbackMessage 内部使用,避免死锁.
func (*OperationLog) RollbackMessage ¶
func (l *OperationLog) RollbackMessage(ctx context.Context, messageID string, executor UndoExecutor) error
RollbackMessage 按消息 ID 回滚(倒序执行 UndoInfo). 精妙之处(CLEVER): Saga 补偿模式--倒序执行补偿操作,保证逆序一致性. 如果某个补偿失败,继续执行后续补偿(best-effort),不中断.
type OrphanThinkingFilter ¶
type OrphanThinkingFilter struct{}
OrphanThinkingFilter 过滤只包含 thinking 块的 assistant 消息.
精妙之处(CLEVER): 只过滤 assistant 消息中的纯 thinking. user 消息不可能包含 thinking 块(API 约束), 所以只检查 assistant 角色,避免误伤.
func (*OrphanThinkingFilter) Name ¶
func (f *OrphanThinkingFilter) Name() string
func (*OrphanThinkingFilter) Normalize ¶
func (f *OrphanThinkingFilter) Normalize(messages []query.Message) []query.Message
func (*OrphanThinkingFilter) Priority ¶
func (f *OrphanThinkingFilter) Priority() int
type OrphanToolResultRemover ¶
type OrphanToolResultRemover struct{}
OrphanToolResultRemover 移除没有对应 tool_use 的 tool_result 块.
精妙之处(CLEVER): 先收集所有 tool_use ID 再过滤,保证即使 tool_use 出现在 tool_result 之后(理论上不该发生,但压缩可能打乱顺序)也能正确匹配.
func (*OrphanToolResultRemover) Name ¶
func (r *OrphanToolResultRemover) Name() string
func (*OrphanToolResultRemover) Normalize ¶
func (r *OrphanToolResultRemover) Normalize(messages []query.Message) []query.Message
func (*OrphanToolResultRemover) Priority ¶
func (r *OrphanToolResultRemover) Priority() int
type PartialToolUse ¶
type PartialToolUse struct {
// ID 工具调用的 ID(来自 content_block_start 事件)
ID string `json:"id"`
// Name 工具名称
Name string `json:"name"`
// Input 已收到的部分 JSON 字符串(可能不完整)
Input string `json:"input"`
}
PartialToolUse 记录流截断时的不完整工具调用片段.
历史包袱(LEGACY): 当前引擎在 partial-stream 时用 WarningEvent 报告截断, 不记录 PartialToolUse 到快照(因为截断时消息历史尚未更新). 此结构体为未来扩展预留--如果将来实现更精细的"逐 block 保存", PartialToolUse 可以帮助恢复工具调用的输入 JSON 拼接状态.
type PermissionHandler ¶
type PermissionHandler interface {
// HandlePermissionRequest handles a Worker's permission request.
// Returns approved=true to allow, false to deny.
// updatedInput, when non-nil, replaces the original tool input (consumer-layer
// sanitization, e.g. rewriting Bash commands or restricting file paths).
// reason is a human-readable explanation surfaced in logs and tool-result errors.
//
// HandlePermissionRequest 处理来自 Worker 的权限请求.
// approved=true 批准, false 拒绝.
// updatedInput 非 nil 时替换原始 tool input (消费层 sanitize, 如改写 Bash 命令
// 或限制文件路径).
// reason 是人类可读说明, 写入日志并显示在 tool-result 错误里.
HandlePermissionRequest(ctx context.Context, req inbox.PermissionRequestPayload) (approved bool, updatedInput map[string]any, reason string)
}
PermissionHandler is the consumer-layer permission confirmation interface. CLI consumers pop a terminal dialog; SDK consumers can auto-approve / deny; HTTP consumers can push to a frontend over WebSocket.
PermissionHandler 是消费层实现的权限确认接口. CLI 消费层弹出确认框; SDK 消费层可以自动批准或拒绝; HTTP 消费层可通过 WebSocket 推到前端.
升华改进(ELEVATED): 接口而非函数指针-- 消费层可以实现有状态的 PermissionHandler(如记录审批历史,UI 对话框). 替代方案:func(ctx, req) (bool, string)(无状态,无法携带 UI 上下文).
Shape: synchronous callback. Team Leader calls HandlePermissionRequest synchronously when a Worker SubAgent bubbles up a tool permission ask; Leader blocks for approval/deny + optional updatedInput rewrite.
形态: 同步回调. Worker SubAgent 冒泡权限请求时, Team Leader 同步调 HandlePermissionRequest, 阻塞等批准/拒绝 + 可选 updatedInput 改写.
type PermissionLearnEvent ¶
type PermissionLearnEvent = flyto.PermissionLearnEvent
type PermissionLearnRule ¶
type PermissionLearnRule = flyto.PermissionLearnRule
type PermissionRequestEvent ¶
type PermissionRequestEvent = flyto.PermissionRequestEvent
type PlanApprovalEvent ¶
type PlanApprovalEvent struct {
// SessionID 标识是哪个会话的计划.
SessionID string
// Plan 计划的完整文本内容(从 PlanStore 读取).
Plan string
// FilePath 计划文件的逻辑路径(用于展示给用户).
FilePath string
// Steps 可选的结构化步骤列表(如果计划包含 YAML/JSON 步骤块则解析).
Steps []PlanStep
// Approve 批准计划(允许执行).editedPlan 非空时用编辑后的版本覆盖原计划.
// 必须调用 Approve 或 Reject 之一,否则 ExitPlanMode 会一直等待.
Approve func(editedPlan string) error
// Reject 拒绝计划(保持 plan 模式,让模型修改计划).
Reject func(reason string) error
}
PlanApprovalEvent 是计划审批事件,由 ExitPlanMode 工具触发.
精妙之处(CLEVER): 审批事件携带 Approve/Reject 函数而不是返回 bool-- 这让 CLI 实现可以在异步渲染审批 UI 后回调,而不需要阻塞工具执行线程. SDK 实现可以把事件发送给外部系统(Slack 审批机器人,审批工作流)然后回调.
func (*PlanApprovalEvent) EventType ¶
func (e *PlanApprovalEvent) EventType() string
EventType lets PlanApprovalEvent satisfy engine.Event so it can flow through the Run channel and be serialized by bridge.EventSerializer. Consumers subscribing to Run events can receive the approval event and invoke Approve / Reject asynchronously (push mode).
EventType 让 PlanApprovalEvent 实现 engine.Event, 能走 Run channel 并 被 bridge.EventSerializer 序列化. 订阅 Run 事件的消费者可收到审批事件 并异步调 Approve / Reject (push 模式).
type PlanCommandServer ¶
type PlanCommandServer struct {
// contains filtered or unexported fields
}
PlanCommandServer 是 PlanQueue 的 Unix Domain Socket 请求-响应服务端.
生命周期:
- NewPlanCommandServer(sessionID, queue) → 创建实例
- Start() → 开始监听(后台 acceptLoop goroutine)
- SockPath() → 获取 socket 路径(注入到客户端环境变量)
- Close() → 停止监听,清理 socket 文件
func NewPlanCommandServer ¶
func NewPlanCommandServer(sessionID string, queue PlanQueue) (*PlanCommandServer, error)
NewPlanCommandServer 创建计划命令服务器.
sessionID 用于生成唯一的 socket 文件名, 路径由 resolvePlanSockPath 决定 (优先用户私有 cache 目录, fallback 到 os.TempDir). 与 UDSServer 使用不同前缀(flyto-plan- vs flyto-), 避免路径冲突. queue 是底层计划队列, nil 会导致所有命令返回 "queue not available".
func (*PlanCommandServer) Close ¶
func (s *PlanCommandServer) Close() error
Close 停止服务器,清理 socket 文件.幂等,多次调用安全.
func (*PlanCommandServer) SockPath ¶
func (s *PlanCommandServer) SockPath() string
SockPath 返回 socket 文件路径. 客户端通过 FLYTO_PLAN_SOCK 环境变量获取此路径并连接.
func (*PlanCommandServer) Start ¶
func (s *PlanCommandServer) Start() error
Start 开始监听 UDS.最多调用一次,必须在 Close() 之前.
type PlanExecFunc ¶
type PlanExecFunc func(ctx context.Context, plan *QueuedPlan, onStepDone func(stepID string, err error)) error
PlanExecFunc 是计划执行函数的类型.
调用方(通常是 Engine)注入此函数,实现具体的步骤执行逻辑. FilePlanQueue 不感知执行细节(只管队列和状态).
参数:
- ctx:带超时的 context,execFunc 必须响应取消信号
- plan:待执行的计划(含 Steps)
- onStepDone:每个步骤完成时回调,stepID 是完成的步骤 ID,err 非 nil 表示步骤失败
升华改进(ELEVATED): onStepDone 回调让 FilePlanQueue 实时更新状态文件, 客户端轮询时能看到逐步进度,而非只有最终结果. 替代方案:execFunc 只返回整体 error(无中间状态,轮询看不到进度).
type PlanModeManager ¶
type PlanModeManager struct {
// contains filtered or unexported fields
}
PlanModeManager 管理 plan 模式的进入/退出生命周期. 它是 EnterPlanModeTool 和 ExitPlanModeTool 的共享状态.
精妙之处(CLEVER): Manager 通过构造函数注入,没有全局变量-- 同一个进程中可以有多个 Engine 实例(多租户/测试),各自独立管理计划状态. 早期实现 用全局 AppState 单例,在多实例场景下会互相污染.
func NewPlanModeManager ¶
func NewPlanModeManager(store PlanStore, approval ApprovalPolicy, perms permission.Checker) *PlanModeManager
NewPlanModeManager 创建计划模式管理器.
store 控制计划文件存储位置(FilePlanStore for CLI,MemoryPlanStore for SDK/测试). approval 控制审批机制(NoopApprovalPolicy for 测试,CLI 实现用 FuncApprovalPolicy). perms 是权限引擎引用,进入/退出时更新模式.
func (*PlanModeManager) AttachProgress ¶
func (m *PlanModeManager) AttachProgress(p *PlanProgress)
AttachProgress 绑定步骤进度追踪器.
通常在 Exit 返回批准的计划(含 Steps)后由消费方构建 PlanProgress 并调用此方法绑定, 之后消费方通过 PlanProgress 追踪每个步骤的执行状态.
精妙之处(CLEVER): 消费方主导进度追踪的创建-- 消费方知道"要不要追踪进度"(简单 SDK 场景不需要),知道"用什么 observer", 知道"步骤列表"(来自 PlanApprovalEvent.Steps 或自行解析计划文本). 引擎只提供存储挂载点,不强制创建. 替代方案:<Exit 内部自动创建 PlanProgress> - 否决原因:自动创建需要 Exit 知道 Observer 配置,把可观测性配置耦合到审批流程,关注点混杂.
func (*PlanModeManager) Enter ¶
func (m *PlanModeManager) Enter() error
Enter 进入 plan 模式.保存当前权限模式,切换到 ModePlan. 如果已经在 plan 模式,返回错误(防止嵌套进入).
func (*PlanModeManager) Exit ¶
Exit 退出 plan 模式, 读取计划, 触发审批 (push + pull 双路), 恢复权限模式.
审批决策有两种来源:
pull: policy.RequestApproval 同步返回 (approved/edited/err) push: event.Approve(edited) / event.Reject(reason) 被订阅者异步调用
Exit 用 resolveCh (容量 1) 收第一条决策, first-to-resolve 胜; 另一路的 后续写入被 tryResolve 的 select default 丢弃. 两种模式消费者选一个实现 即可 — pull 向后兼容 (policy 实现者照旧填返回值); push 激活 (订阅 Run channel 的 TUI/SDK 收到 PlanApprovalEvent 后异步调 Approve/Reject).
push 通道用上层 ctx EventEmitter (event_emitter.go) 把 event 送到父 Run channel; emitter 由 engine 主工具派发时注入, 外部代码无法伪造 (私有 key).
steps 从工具 input 解析的结构化步骤列表 (optional); 填入 event.Steps 供 消费方做并行调度. nil 表示模型没给结构化步骤 (仅 markdown 计划).
goroutine 生命周期: 用派生 ctx + defer cancel 确保 Exit 返回时 policy 的 goroutine 收到取消信号, policy 实现者有责任听 ctx.Done 退出 (Go ctx 标准 协议). 若 policy 不听, 仅影响其自身 goroutine, 不阻塞 Exit.
返回:
- plan: 批准后的计划文本
- err: ErrPlanRejected 表示用户拒绝; ctx.Err 表示超时/取消; 其他为系统错误
Exit exits plan mode, reads the plan, triggers approval via push + pull dual paths, and restores the permission mode.
Approval decisions come from two sources:
pull: policy.RequestApproval's synchronous return (approved/edited/err) push: event.Approve(edited) / event.Reject(reason) invoked async by subscribers
Exit uses a capacity-1 resolveCh to receive the first decision; first-to-resolve wins, and the later writer is dropped by tryResolve's select default. Consumers pick one mode — pull stays backward compatible (policy implementors still return values); push is now active (TUI/SDK subscribers receive PlanApprovalEvent on Run channel and call Approve/Reject asynchronously).
The push channel uses the ctx EventEmitter (event_emitter.go) to send the event into the parent Run channel. The emitter is injected by the engine main dispatch path and cannot be spoofed by external code (private key).
steps are the structured step list parsed from the tool input (optional); filled into event.Steps for downstream schedulers. nil means the model did not supply structured steps (markdown-only plan).
Goroutine lifecycle: a derived ctx + defer cancel ensures the policy's goroutine gets a cancel signal when Exit returns. Policy implementors are responsible for honoring ctx.Done (standard Go ctx protocol); if they don't, only their own goroutine stalls, not Exit.
func (*PlanModeManager) IsActive ¶
func (m *PlanModeManager) IsActive() bool
IsActive 返回当前是否处于 plan 模式.
func (*PlanModeManager) Progress ¶
func (m *PlanModeManager) Progress() *PlanProgress
Progress 返回当前绑定的进度追踪器(可能为 nil). 消费方用此方法获取追踪器后调用 StartStep / FinishStep 等方法.
func (*PlanModeManager) SetSessionID ¶
func (m *PlanModeManager) SetSessionID(id string)
SetSessionID 设置当前会话 ID(Engine.Run 开始时调用). 会话 ID 用于路由到正确的计划文件.
type PlanProgress ¶
type PlanProgress struct {
// contains filtered or unexported fields
}
PlanProgress 追踪整个计划的步骤执行进度.
精妙之处(CLEVER): PlanProgress 独立于 PlanModeManager 生存-- PlanModeManager.active 变为 false 之后(计划已批准,开始执行), PlanProgress 继续追踪每个步骤直到全部完成. 这符合"生命周期分离"原则:审批 ≠ 执行.
线程安全:所有状态通过 sync.RWMutex 保护.
func NewPlanProgress ¶
func NewPlanProgress(sessionID string, steps []PlanStep, observer EventObserver) *PlanProgress
NewPlanProgress 创建计划进度追踪器.
steps 是初始步骤列表(通常来自 PlanApprovalEvent.Steps). observer 用于结构化事件上报,传 nil 则使用 NoopObserver.
func (*PlanProgress) FinishStep ¶
func (p *PlanProgress) FinishStep(stepID string, status StepStatus, errorMsg string) error
FinishStep 将步骤标记为完成(done/failed/skipped).
errorMsg 仅 statusFailed 时有意义.
func (*PlanProgress) RegisterStep ¶
func (p *PlanProgress) RegisterStep(step PlanStep) error
RegisterStep 动态添加步骤(重规划时追加新步骤).
升华改进(ELEVATED): 早期方案没有动态添加步骤的概念(计划是静态文件). 我们允许"步骤失败时的重规划"场景:模型生成补救步骤,消费方调用 RegisterStep 追加, 然后 Start/Finish 继续执行. 替代方案:<重规划时创建新 PlanProgress> - 否决原因:丢失之前步骤的历史状态, 消费方无法展示"原计划 5 步,失败后增加了 2 个补救步骤"的完整视图.
func (*PlanProgress) SetOnProgress ¶
func (p *PlanProgress) SetOnProgress(fn func(snapshot PlanProgressSnapshot))
SetOnProgress 设置进度变更回调. 每次步骤状态变更时同步调用(在锁外调用,避免死锁).
精妙之处(CLEVER): 允许事后设置,而不要求构造时就传入-- SDK 嵌入场景常见模式:先创建 PlanProgress,再绑定 UI 回调.
func (*PlanProgress) SkipDependents ¶
func (p *PlanProgress) SkipDependents(failedStepID string) []string
SkipDependents 将所有依赖 failedStepID 的步骤递归标记为 skipped.
精妙之处(CLEVER): 拓扑传播--步骤 A 失败后,所有直接/间接依赖 A 的步骤 都应该 skip,而不是等待然后超时. 消费方不需要自己遍历 Deps 图,调用一次 SkipDependents 即可. 时间复杂度:O(n²) 最坏情况(全链依赖),但计划步骤数通常 < 20,可接受. 替代方案:<消费方自己遍历 Deps> - 否决原因:每个消费层都要实现一遍图遍历, 且"什么时候 skip"的业务语义理应在引擎层统一,不散落到各消费层.
func (*PlanProgress) Snapshot ¶
func (p *PlanProgress) Snapshot() PlanProgressSnapshot
Snapshot 返回当前进度的完整快照(线程安全).
func (*PlanProgress) StartStep ¶
func (p *PlanProgress) StartStep(stepID, agentID string) error
StartStep 将步骤标记为运行中.
agentID 是执行此步骤的 Agent ID(可选,传空字符串表示未知). 返回 error 当步骤 ID 不存在或步骤不在 Pending 状态.
type PlanProgressEvent ¶
type PlanProgressEvent struct {
// Snapshot 当前进度快照.
Snapshot PlanProgressSnapshot
}
PlanProgressEvent 是计划进度变更事件,通过 Engine 事件流广播.
升华改进(ELEVATED): 早期方案无此事件--计划执行完全黑盒. 我们把进度变更接入 Engine 事件系统,CLI 可以渲染进度列表, SDK 可以推送 SSE 事件,监控系统可以记录执行轨迹. 替代方案:<只用 OnProgress 回调> - 否决原因:回调是 push 模式, 事件流是 pull 模式(消费方按需订阅),两种消费模式满足不同场景.
func (*PlanProgressEvent) EventType ¶
func (e *PlanProgressEvent) EventType() string
type PlanProgressSnapshot ¶
type PlanProgressSnapshot struct {
// SessionID 关联的会话 ID.
SessionID string
// Steps 步骤进度列表(有序,与注册顺序一致).
Steps []StepProgress
// 聚合计数(冗余但高频使用,快照生成时预计算)
TotalCount int
PendingCount int
RunningCount int
DoneCount int
FailedCount int
SkippedCount int
// StartedAt 第一个步骤开始的时间(所有步骤未开始时为零值).
StartedAt time.Time
// FinishedAt 所有步骤进入终态的时间(尚未完成时为零值).
FinishedAt time.Time
}
PlanProgressSnapshot 是某一时刻的计划进度快照. 不可变:创建后不可修改,消费方可以安全地并发读取.
精妙之处(CLEVER): 快照携带所有聚合计数,消费方不需要自己遍历 Steps 统计-- 在热路径 UI 刷新时(每个步骤状态变更都可能触发重渲染),省去 O(n) 遍历.
func (PlanProgressSnapshot) Duration ¶
func (s PlanProgressSnapshot) Duration() time.Duration
Duration 返回整体执行耗时. 尚未开始时返回 0;执行中返回已经过时间;完成后返回总耗时.
func (PlanProgressSnapshot) HasFailed ¶
func (s PlanProgressSnapshot) HasFailed() bool
HasFailed 返回是否有步骤失败.
func (PlanProgressSnapshot) IsComplete ¶
func (s PlanProgressSnapshot) IsComplete() bool
IsComplete 返回所有步骤是否均已进入终态(done/failed/skipped).
func (PlanProgressSnapshot) IsSuccess ¶
func (s PlanProgressSnapshot) IsSuccess() bool
IsSuccess 返回计划是否成功完成(所有步骤 done 或 skipped,无失败步骤).
func (PlanProgressSnapshot) ProgressPercent ¶
func (s PlanProgressSnapshot) ProgressPercent() float64
ProgressPercent 返回已完成(含 failed/skipped)的步骤百分比(0-100).
func (PlanProgressSnapshot) ReadySteps ¶
func (s PlanProgressSnapshot) ReadySteps() []StepProgress
ReadySteps 返回依赖已满足,可以立即执行的 pending 步骤列表.
精妙之处(CLEVER): 消费方不需要自己做拓扑排序-- ReadySteps 每次返回当前可以并行启动的步骤集合. 消费方只需循环:获取 ReadySteps → 并行 Start → 等待完成 → 再次调用 ReadySteps. 这就是 Kahn 算法的消费者端封装.
type PlanQueue ¶
type PlanQueue interface {
// Submit 提交一个计划进入队列,立即返回 planID.
// 如果队列已满返回 ErrPlanQueueFull.
Submit(steps []PlanStep, opts PlanSubmitOptions) (planID string, err error)
// Status 查询计划当前状态.planID 不存在返回 ErrPlanNotFound.
Status(planID string) (*QueuedPlan, error)
// Cancel 取消一个 pending 或 running 的计划.
// 已是终态的计划返回 ErrPlanTerminal.
Cancel(planID string) error
// List 返回所有计划的列表(按提交时间倒序).
List() ([]*QueuedPlan, error)
// RecoverPending 在 daemon 启动时调用:扫描文件目录,
// 将状态为 running 的计划(daemon 崩溃前的遗留)重置为 pending 并重新入队.
RecoverPending() error
// Close 关闭队列,等待当前执行中的计划完成(或超时).
Close() error
}
PlanQueue 是异步计划队列接口.
实现:FilePlanQueue(文件持久化). 扩展:可实现 RedisPlanQueue(多副本共享),MemoryPlanQueue(测试).
type PlanStatus ¶
type PlanStatus string
PlanStatus 是异步计划的生命周期状态.
状态转换:
pending → running → done pending → cancelled running → done running → failed running → cancelled
精妙之处(CLEVER): 状态机只有单向转换--done/failed/cancelled 是终态, 不能回退.RecoverPending 将 running 重置为 pending(唯一的"逆向"操作), 仅在 daemon 启动时执行,代表"崩溃恢复"而非正常状态转换.
const ( PlanStatusPending PlanStatus = "pending" PlanStatusRunning PlanStatus = "running" PlanStatusDone PlanStatus = "done" PlanStatusFailed PlanStatus = "failed" PlanStatusCancelled PlanStatus = "cancelled" )
type PlanStep ¶
type PlanStep struct {
// ID 步骤唯一标识符(模型生成,通常是顺序编号如 "step-1").
ID string
// Description 步骤的人类可读描述.
Description string
// Tools 该步骤预期用到的工具列表(提示性,不做强制限制).
Tools []string
// Complexity 预估复杂度(low/medium/high).
Complexity Complexity
// Deps 依赖的步骤 ID 列表.空表示无依赖(可并行执行).
// 精妙之处(CLEVER): Deps 是拓扑排序的输入,而不是"顺序"--
// 消费方可以 Kahn 算法找出可并行的步骤层.
// 如果用 "after: step-2" 的字符串指令,消费方要解析 Markdown,容易错.
Deps []string
}
PlanStep 是计划中的一个可执行步骤.
升华改进(ELEVATED): 早期实现 计划是纯 Markdown 文件,没有结构化步骤. 我们增加 PlanStep 允许消费方解析依赖关系做并行调度. 引擎只暴露结构--CLI/SDK/Coordinator 决定调度策略,不同场景可不同. 替代方案:<计划只是 string,步骤全靠模型在 Markdown 里描述> - 否决原因:消费方无法机器解析并行性,只能串行执行,浪费多 Agent 机会.
type PlanStore ¶
type PlanStore interface {
// WritePlan 将计划内容写入持久化存储.sessionID 用于路由到正确的文件/记录.
WritePlan(sessionID, content string) error
// ReadPlan 读取计划内容.若不存在返回 ("", nil),不是 error.
ReadPlan(sessionID string) (string, error)
// PlanPath 返回计划的逻辑路径(用于展示给用户,不保证是真实文件系统路径).
PlanPath(sessionID string) string
}
PlanStore 抽象计划文件的存储.
精妙之处(CLEVER): 接口只有 3 个方法--Write/Read/Path. 故意不暴露 List/Delete,计划文件生命周期由调用方(Engine)管理, 不在存储层增加可观测的副作用.
type PlanSubmitOptions ¶
type PlanSubmitOptions struct {
// TimeoutSecs 单次执行超时(秒),0 使用默认值 planDefaultTimeoutSecs.
TimeoutSecs int
}
PlanSubmitOptions 是计划提交的可选参数.
type ProcessedInput ¶
type ProcessedInput struct {
// Text 处理后的文本(文件引用已展开)
Text string
// ContentBlocks 额外的内容块(图片等)
ContentBlocks []query.Content
// ReferencedFiles 引用的文件列表
ReferencedFiles []string
// DetectedURLs 检测到的 URL
DetectedURLs []string
// SlashCommand 如果是斜杠命令,返回命令信息
SlashCommand *SlashCommand
// IsEmpty 输入是否为空
IsEmpty bool
}
ProcessedInput 是预处理后的输入结果.
type QueryChainTracking ¶
type QueryChainTracking struct {
// ChainId 整条链的唯一 ID(从用户请求到所有子 agent 共享同一个)
ChainId string
// Depth 当前深度(0=主查询, 1=子agent, 2=子子agent)
Depth int
// ParentAgentId 父 agent 的 ID(主查询为空)
ParentAgentId string
}
QueryChainTracking 查询链追踪结构体.
func (*QueryChainTracking) EventFields ¶
func (t *QueryChainTracking) EventFields() map[string]any
EventFields 返回要注入到每个 Observer Event 的字段.
func (*QueryChainTracking) Fork ¶
func (t *QueryChainTracking) Fork(parentAgentId string) *QueryChainTracking
Fork 创建子链(SubAgent spawn 时调用). 继承 ChainId,Depth+1,记录 parentAgentId.
type QuerySource ¶
type QuerySource string
QuerySource 标识一次 API 查询的来源. 不同来源在重试策略,优先级,超时等方面有不同行为.
const ( // SourceMainThread 用户直接交互(最高优先级,必须重试) SourceMainThread QuerySource = "main_thread" // SourceSubAgent 子代理调用(用户间接等待) SourceSubAgent QuerySource = "sub_agent" // SourceCompact 上下文压缩(用户间接等待,但可降级) SourceCompact QuerySource = "compact" // SourceSummary 工具摘要生成(后台,可丢弃) SourceSummary QuerySource = "summary" // SourceDream Dream 巩固(纯后台) SourceDream QuerySource = "dream" // SourceClassifier 权限分类器(后台,可降级为默认策略) SourceClassifier QuerySource = "classifier" // SourceBackground 后台任务(最低优先级) SourceBackground QuerySource = "background" )
func (QuerySource) IsForeground ¶
func (s QuerySource) IsForeground() bool
IsForeground 判断是否为前台请求(用户正在等待结果). 前台请求在遇到 529 时应该重试;后台请求应直接返回错误,防止级联雪崩.
type QueuedPlan ¶
type QueuedPlan struct {
// ID 全局唯一标识符,格式 "plan-{timestamp_ns}-{random_hex}".
// 精妙之处(CLEVER): 时间戳前缀保证按提交顺序排序(ls 即可看到顺序).
ID string `json:"id"`
// Steps 计划的结构化步骤列表(来自 PlanStep,复用模块 17 的类型).
Steps []PlanStep `json:"steps"`
// Status 当前状态(状态机见上方注释).
Status PlanStatus `json:"status"`
// SubmittedAt 提交时间(UTC).
SubmittedAt time.Time `json:"submitted_at"`
// StartedAt 开始执行时间,nil 表示尚未开始.
StartedAt *time.Time `json:"started_at,omitempty"`
// FinishedAt 完成时间(成功/失败/取消),nil 表示尚未完成.
FinishedAt *time.Time `json:"finished_at,omitempty"`
// ErrorMsg 失败原因(仅 failed 状态有值).
ErrorMsg string `json:"error,omitempty"`
// TimeoutSecs 单次执行超时秒数(默认 planDefaultTimeoutSecs).
TimeoutSecs int `json:"timeout_secs"`
// StepStatuses 各步骤的执行状态,key = PlanStep.ID.
// 升华改进(ELEVATED): 早期实现 PlanMode 没有 per-step 状态追踪,
// 只有整体进度.我们按步骤粒度记录,客户端可以显示精确进度条.
StepStatuses map[string]StepExecStatus `json:"step_statuses,omitempty"`
// CurrentStepID 当前正在执行的步骤 ID.
CurrentStepID string `json:"current_step_id,omitempty"`
}
QueuedPlan 是一个已提交等待异步执行的计划. 此结构体序列化为 JSON 文件(~/.flyto/plans/{id}.json),是跨进程通信的唯一载体.
func (*QueuedPlan) IsTerminal ¶
func (p *QueuedPlan) IsTerminal() bool
IsTerminal 返回该计划是否处于终态(不可再变更).
type ReminderSystem ¶
type ReminderSystem struct {
// contains filtered or unexported fields
}
ReminderSystem 管理系统提醒的收集与注入.
func NewReminderSystem ¶
func NewReminderSystem(fileCache FileChangeChecker, memStore memory.Store) *ReminderSystem
NewReminderSystem 创建一个新的系统提醒管理器.
参数:
- fileCache: 文件变更检测器,用于检测外部文件修改.可为 nil(跳过文件检测).
- memStore: 记忆存储,用于查找相关记忆.可为 nil(跳过记忆检测).
func (*ReminderSystem) CheckDateChange ¶
func (r *ReminderSystem) CheckDateChange() string
CheckDateChange 检查日期是否变化. 如果日期发生了变化,返回格式化的提醒文本;否则返回空字符串.
func (*ReminderSystem) CheckFileModifications ¶
func (r *ReminderSystem) CheckFileModifications() []string
CheckFileModifications 检查 Agent 读过的文件是否被外部修改. 返回所有检测到修改的文件的提醒列表.
func (*ReminderSystem) CheckMemoryRelevance ¶
func (r *ReminderSystem) CheckMemoryRelevance(ctx context.Context, recentText string) string
CheckMemoryRelevance 检查是否有相关记忆需要提醒. 根据最近的对话文本查找相关记忆,如果找到则格式化为提醒.
参数:
- ctx: 上下文
- recentText: 最近的用户输入文本(用于相似度匹配)
func (*ReminderSystem) CollectReminders ¶
func (r *ReminderSystem) CollectReminders(ctx context.Context, currentMessages []query.Message, turnNumber int) []string
CollectReminders 收集当前需要注入的所有系统提醒. 返回需要注入的提醒列表,每个都是用 <system-reminder> 包裹的文本.
参数:
- ctx: 上下文,用于可取消操作
- currentMessages: 当前的消息历史(用于提取最近文本做记忆匹配)
- turnNumber: 当前轮次编号(用于决定是否推送统计提醒)
升华改进(ELEVATED): 增加轮次间隔提醒--在非编程场景(如长时间咨询,教学辅导), 定期提醒进度和资源消耗有助于用户做出"继续/停止"的决策, 而不是在预算用完时才突然中断.每 10 轮注入一次轮次统计提醒. 替代方案:<原方案 turnNumber 参数未使用,无轮次统计提醒>
type ResultStore ¶
type ResultStore struct {
// contains filtered or unexported fields
}
ResultStore 管理大结果的磁盘持久化.
func NewResultStore ¶
func NewResultStore(baseDir string, sessionID string) *ResultStore
NewResultStore 创建一个结果存储实例. baseDir 是存储基础目录,sessionID 是当前会话 ID. 如果目录不存在会在首次写入时自动创建.
func (*ResultStore) Cleanup ¶
func (s *ResultStore) Cleanup(olderThan time.Duration) int
Cleanup 清理指定时间之前的旧结果文件. olderThan 指定清理多久之前的文件(例如 24 * time.Hour 清理一天前的). 返回清理的文件数量.
func (*ResultStore) ProcessResult ¶
func (s *ResultStore) ProcessResult(toolUseID, toolName, output string) (processedOutput string, storedPath string)
ProcessResult 处理工具输出. 如果输出长度 <= MaxInlineResultChars,原样返回. 否则存到磁盘,返回截断预览 + 磁盘路径.
返回值:
- processedOutput: 处理后的输出(可能被截断)
- storedPath: 如果存储到磁盘,返回文件路径;否则为空字符串
func (*ResultStore) ReadStoredResult ¶
func (s *ResultStore) ReadStoredResult(path string) (string, error)
ReadStoredResult 从磁盘读取之前存储的完整工具输出.
type RetentionPolicy ¶
type RetentionPolicy struct {
// MaxAgeDays 超过 N 天的备份文件删除,0 表示不按时间清理.
MaxAgeDays int
// MaxVersions 每个原始文件保留最近 N 个版本,0 表示不按版本数清理.
MaxVersions int
}
RetentionPolicy 定义备份文件的保留策略. 满足任一条件的备份都会被删除(OR 逻辑).
升华改进(ELEVATED): 双维度保留策略(时间 + 版本数)满足不同场景需求-- 研发机:MaxVersions=50 防止磁盘爆满; 审计场景:MaxAgeDays=90 合规保留; 资源受限设备:MaxAgeDays=7 + MaxVersions=20 双重限制. 0 表示不限制该维度,可以灵活组合. 替代方案:只支持 MaxAgeDays(无法控制高频修改文件的磁盘占用). 替代方案:只支持 MaxVersions(时间无限延伸,审计场景不可用).
type RunOption ¶
type RunOption func(*runConfig)
RunOption 是 Run 方法的选项函数.
func WithBundleKey ¶
WithBundleKey 为本次 Run 覆盖使用的 PromptBundle key.
升华改进(ELEVATED): 早期实现 没有任何运行时切换提示词 Bundle 的机制-- 系统提示词完全由编译时确定,无法 per-request 切换. 我们允许 SaaS 场景下同一个 Engine 实例在不同请求间使用不同 Bundle:
- 编程工作台请求 → WithBundleKey({ModelFamily: "claude", Scenario: "programming"})
- 仓储调度请求 → WithBundleKey({ModelFamily: "claude", Scenario: "warehouse"})
- 通用助手请求 → WithBundleKey({ModelFamily: "claude", Scenario: "general"})
不传此选项时使用 Config.Scenario + Config.ModelFamily 推断(原有行为不变). 对应 Bundle 未注册时,BundleRegistry.Resolve 自动回退到默认 Bundle(claude+programming), 行为安全,不会 panic.
典型用法(HTTP API 服务):
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
scenario := r.Header.Get("X-Flyto-Scenario") // "warehouse"
key := engine.BundleKeyFor("claude", scenario)
for ev := range eng.Run(ctx, prompt, engine.WithBundleKey(key)) {
// ...
}
})
替代方案:<为每个场景创建独立的 Engine 实例> - 否决原因:每个 Engine 持有独立的 sectionRegistry / sessionManager / tools 等, 内存开销随场景数线性增长;per-request RunOption 开销近乎零.
func WithCheckpointHandler ¶
func WithCheckpointHandler(fn CheckpointHandlerFn) RunOption
WithCheckpointHandler 为本次 Run 注册不可逆操作确认回调.
升华改进(ELEVATED): 早期实现 的权限确认依赖 React UI 组件,无法在 SDK/API 模式使用. 我们将确认逻辑抽象为函数回调,任何消费层都可以提供实现:
- TUI:阻塞读取用户键盘输入
- HTTP API:通过外部信号量或 webhook 异步收集 ACK
- 测试:直接返回 true/false
- 自动化 CI:策略引擎(根据工具名和参数决策)
fn 为 nil 时静默忽略(等同于未注册,保持 deny-safe 默认行为).
典型用法(SDK 嵌入):
eng.Run(ctx, prompt, engine.WithCheckpointHandler(func(evt engine.CheckpointEvent) bool {
fmt.Printf("⚠️ 工具 %q 请求执行不可逆操作,参数:%v\n继续?[y/N] ", evt.ToolName, evt.Input)
var s string
fmt.Scan(&s)
return s == "y" || s == "Y"
}))
func WithMaxTurns ¶
WithMaxTurns 覆盖本次 Run 的最大对话轮次. 0 表示使用引擎配置的默认值(Config.MaxTurns). 适用于 HTTP API 模式:前端可以按请求指定轮次上限,而无需重启服务器.
func WithSecret ¶
WithSecret 为本次 Run 注入 per-request 凭据.
升华改进(ELEVATED): HTTP API 多租户场景下,不同租户的凭据必须请求级隔离-- 用 WithSecret 注入的 secret 只在本次 Run 期间的工具调用中有效, 不写入引擎级 SecretStore,不影响其他并发请求.
与 SetSecret 的区别:
- SetSecret:引擎级,所有 Run 共享,适合单租户 SDK 嵌入
- WithSecret:请求级,本次 Run 独占,适合多租户 HTTP API
典型用法(HTTP API 服务):
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
tenantToken := r.Header.Get("X-Tenant-Token") // 从请求头取租户凭据
for ev := range eng.Run(ctx, prompt, engine.WithSecret("TENANT_TOKEN", tenantToken)) {
// ...
}
})
func WithSessionHooks ¶
WithSessionHooks 为本次 Run 注册会话级 Hook(14.P1-C Session-scoped Hook Registry).
升华改进(ELEVATED): 多租户安全隔离--session hooks 只在当前 Run 中生效, 与 Engine 级全局 hooks 组成 effectiveHooks,session 级优先触发. nil 参数静默忽略(向后兼容).
典型用法(HTTP API / SDK 嵌入):
perRequestHooks := hooks.NewManager(nil)
perRequestHooks.Register(hooks.HookPreToolUse, hooks.HookDef{
Handler: hooks.NewCallbackHandler(myTenantAuditFn),
})
engine.Run(ctx, prompt, engine.WithSessionHooks(perRequestHooks))
type Scratchpad ¶
type Scratchpad struct {
// contains filtered or unexported fields
}
Scratchpad 是引擎级别的键值暂存区.
用于跨轮次保存中间结果,Agent 可通过 scratchpad_write/scratchpad_read 工具访问. 线程安全,支持 TTL 过期(可选).
生命周期:与 Engine 实例绑定--引擎创建时初始化,引擎关闭时自动丢弃. 不持久化到磁盘,不跨引擎实例共享.
func (*Scratchpad) Clear ¶
func (s *Scratchpad) Clear()
Clear 清空所有条目(包括未过期的).
用于 Agent 主动重置暂存区,或 Engine 关闭清理(GC 也会自动回收,调用 Clear 是可选的).
func (*Scratchpad) Get ¶
func (s *Scratchpad) Get(key string) (string, bool)
Get 获取键对应的值.
返回 (value, true) 如果 key 存在且未过期; 返回 ("", false) 如果 key 不存在或已过期.
精妙之处(CLEVER): 惰性删除(lazy eviction)--过期条目在首次 Get 时才清除, 不启动后台清理 goroutine(避免 Engine 生命周期管理复杂化). 代价:过期条目会短暂占用内存,直到被 Get/Keys 触发清理. 对 Scratchpad 的典型规模(数十条)来说,内存影响可忽略. 替代方案:<后台 goroutine 定时清理> - 否决:增加 Engine 关闭协调复杂度.
func (*Scratchpad) Keys ¶
func (s *Scratchpad) Keys() []string
Keys 返回所有未过期 key 的有序列表.
升华改进(ELEVATED): 返回有序列表而非 map 的随机顺序-- Agent 通过 scratchpad_list 工具获取 key 列表时, 有序结果让模型更容易理解"当前暂存了什么", 也让测试可重复(不依赖 map 迭代顺序). 替代方案:<直接返回 []string 无序> - 否决:模型推理受输入顺序影响,无序时容易遗漏.
type SecretStore ¶
type SecretStore struct {
// contains filtered or unexported fields
}
SecretStore 是线程安全的凭据注册表.
设计约束:
- value 不对外暴露(无 Get(name) 方法),防止调用方误写日志
- Redact() 只替换,不缓存替换结果(输入随 tool output 变化,缓存无意义)
- Environ() 返回的 slice 是快照,调用方修改不影响 store
func (*SecretStore) Add ¶
func (s *SecretStore) Add(name, value string) error
Add 注册一个 secret.
name 用作 env var 名称(建议大写,如 "DB_PASSWORD")和脱敏标签([SECRET:DB_PASSWORD]). value 不得短于 minSecretLen 字节(防止超短 value 引发大量误替换).
同名 secret 允许覆盖(支持 token rotation / re-roll 场景). 返回 error 而非 panic,让调用方决定如何处理(测试或 CLI 可 log.Fatal,SDK 可返回给调用方).
func (*SecretStore) Environ ¶
func (s *SecretStore) Environ() []string
Environ 返回所有 secret 以 "NAME=VALUE" 格式的 env var slice.
用途:注入 Bash 工具子进程环境变量,让工具脚本能通过 $NAME 引用凭据, 同时不需要调用方手动管理 env var 列表.
返回的 slice 是快照,调用方修改不影响 store.
func (*SecretStore) Len ¶
func (s *SecretStore) Len() int
Len 返回已注册的 secret 数量. 主要用于测试断言和监控(不暴露具体 name/value).
func (*SecretStore) Redact ¶
func (s *SecretStore) Redact(input string) string
Redact 将字符串中所有已注册 secret 的 value 替换为 [SECRET:name].
精妙之处(CLEVER): 按注册顺序替换,而非按 value 长度降序-- 简化实现,且实践中 secret value 几乎不会互相包含(短 value 被最小长度约束过滤). 若确实需要"长 value 优先"(防止子串被短 value 替换后长 value 匹配失败), 调用方可以在注册时控制顺序,或使用带前缀的 secret name 区分.
替代方案:<Aho-Corasick 多模式匹配,O(n) 单次扫描> - 否决:secret 数量通常 <20,strings.ReplaceAll 性能足够,无需引入复杂算法.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session 是一个有状态的多轮对话会话.
对应原项目中 REPL 的会话概念,但完全剥离了终端 UI. Session 维护消息历史,支持多轮对话,上下文压缩,会话恢复.
增强功能:
- 自动追踪消息:Send() 完成后自动将用户消息和助手回复追加到历史
- Token 统计:维护 inputTokens, outputTokens, costUSD 累计统计
- 会话元数据:标题,创建时间,最后活跃时间
- 权限回复:通过 pendingPermissions channel map 实现异步权限决策
- Compact 集成:内置 compact 触发检查
将会话状态集中管理, 和 QueryEngine 的闭包变量里,耦合严重. Go 版本将会话状态集中管理,干净独立.
func (*Session) Close ¶
func (s *Session) Close()
Close 关闭会话,释放资源.
幂等:多次调用安全,内部通过 closeOnce 保证只执行一次.
关闭顺序:
- 设置 s.closed = true(Send/WaitForPermission 快速失败)
- 关闭 s.done(广播给 trackEvents goroutine 和所有 WaitForPermission goroutine)
- 遍历 pendingPermissions,向每个 channel 发送 false 并清空 map (唤醒任何在 WaitForPermission 中已过 closed 检查,但尚未收到 s.done 的 goroutine)
- 发送 observer 事件
替代方案:<只 close(s.done),不向 pendingPermissions 发送 false>
- 否决:WaitForPermission 的 select 已监听 s.done,理论上够用; 但向 ch 发送 false 是防御性设计--万一有竞争窗口(如 WaitForPermission 在 s.done 关闭 之前刚把 ch 写入 map 但还没进 select),额外的 false 发送能确保其最终被唤醒.
func (*Session) ResolvePermission ¶
ResolvePermission 回复一个权限请求. 消费层收到 PermissionRequestEvent 后,通过此方法告知引擎用户的决策.
工作机制:
- Engine 的 runLoop 在需要权限时调用 WaitForPermission(requestID), 它创建一个 channel 放入 pendingPermissions map,然后阻塞等待.
- 消费层(HTTP Server,CLI 等)收到 PermissionRequestEvent 后, 调用 ResolvePermission 向 channel 发送决策,唤醒阻塞的 runLoop.
对应原项目中 control_request / control_response 协议.
func (*Session) Send ¶
Send 在会话中发送一条消息,返回流式事件 channel. 自动携带历史消息上下文.
增强:事件流完成后自动将用户消息和助手回复追加到 session.messages, 并更新 token 统计和最后活跃时间.
可选 opts 透传给底层 Engine.Run,用于 per-Send 运行时覆盖: 典型场景是 WithModel (运行时模型切换) 和 WithCheckpointHandler (per-Send 不可逆操作确认回调).
ELEVATED: opts 顺序 invariant. 原方案: Send 不接受 opts, 内部硬编码 WithMessages(history), 把所有 Run-level 可选能力都封死. 新方案: Send 接受 opts ...RunOption, 但在调 engine.Run 时把 opts 放前面, Session 自己的 WithMessages(history) append 在**最后**. 由于 RunOption 是 函数式 runConfig 应用,最后一个 option 覆盖前面的,因此:
- caller 传的 WithMessages 会被 Session 的 history 快照强制覆盖, 锁死 Session "自动追踪历史" 的核心语义 -- 这是刻意的安全兜底.
- 其他所有 option (WithModel / WithMaxTurns / WithSecret 等) 正常生效.
替代方案: <在入口处扫描 opts 拒绝 WithMessages> -- 否决: RunOption 是 不透明的函数值,无法反射识别,只能靠应用顺序约定.
用法:
session := agent.Session("my-session")
events := session.Send(ctx, "先看看项目结构")
// ... 处理事件 ...
events = session.Send(ctx, "然后修复那个 bug", WithModel("claude-sonnet-4-6"))
// 第二次调用自动包含第一轮的上下文,并切换到指定模型
func (*Session) Stats ¶
func (s *Session) Stats() SessionStats
Stats returns the session's cumulative statistics snapshot.
Shape: pull. Complementary to the push-side `session_cost_threshold_crossed` observer event — pull for on-demand snapshots, push for cost-tier alerting; the two are orthogonal.
Stats 返回会话的累计统计信息快照.
形态: 调取 (pull). 和 push 侧 `session_cost_threshold_crossed` observer 事件正交互补 -- 要按需快照走 pull, 要成本跨档告警走 push.
func (*Session) WaitForPermission ¶
WaitForPermission 等待消费层对指定权限请求的回复. 被 Engine 的 runLoop 调用 -- 当 PermissionHandler 为 nil(HTTP Server 模式)时, 引擎通过此方法异步等待消费层的权限决策.
返回 true 表示允许,false 表示拒绝. ctx 取消或 session 关闭时返回 false(视为拒绝).
精妙之处(CLEVER): 三路 select:ch(正常回复)/ ctx.Done()(请求取消)/ s.done(会话关闭). 后两路都执行 delete(pendingPermissions, requestID) 清理 map entry, 防止 Close() 再次尝试向已无消费者的 channel 发送(double-send 死代码路径). Close() 自己也会清理 map,但两处 delete 同一 key 是幂等的,不会 panic.
type SessionInfo ¶
type SessionInfo struct {
// ID 会话 ID
ID string
// Title 会话标题(从第一次对话自动生成或用户指定)
Title string
// CreatedAt 创建时间
CreatedAt time.Time
// LastActiveAt 最后活跃时间
LastActiveAt time.Time
// TurnCount 总轮次数
TurnCount int
// InputTokens 累计输入 token 数
InputTokens int
// OutputTokens 累计输出 token 数
OutputTokens int
// CostUSD 累计花费(美元)
CostUSD float64
// MessageCount 消息数量
MessageCount int
// Closed 会话是否已关闭
Closed bool
}
SessionInfo 是会话的摘要信息,用于列表展示.
type SessionInfoEvent ¶
type SessionInfoEvent = flyto.SessionInfoEvent
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager 管理所有会话的生命周期.
func NewSessionManager ¶
func NewSessionManager(engine EngineRef, cfg *SessionManagerConfig) *SessionManager
NewSessionManager 创建一个新的会话管理器.
func (*SessionManager) Close ¶
func (sm *SessionManager) Close()
Close 关闭会话管理器,保存所有未保存的会话,停止后台清理.
升华改进(ELEVATED): 等待 cleanupLoop goroutine 真正退出后再清空 sessions map, 避免 cleanupLoop 的最后一次 cleanupExpired() 与 Close() 的清空操作发生竞争. 原方案:close(stopCh) 后立刻清空 map,cleanupLoop 若刚好持有 sm.mu 等待则在
Close() 释放锁后会访问一个空 map(语义错误,虽不 panic 但日志可能产生噪音).
新方案:release mu → 等待 cleanupDone → 重新加锁 → 清空 map. 替代方案:<在 cleanupExpired 中检查 sm.stopped,有则直接返回>
- 否决:仍有极短窗口期(close(stopCh) 之前 ticker 触发),且需要在 cleanupExpired 里增加额外读锁判断,不如直接 drain goroutine 更干净.
func (*SessionManager) CreateSession ¶
func (sm *SessionManager) CreateSession(id string) (*Session, error)
CreateSession 创建一个新的会话.
如果已达到最大会话数限制,会先清理最不活跃的会话. 如果指定 ID 的会话已存在,返回已有会话.
func (*SessionManager) GetSession ¶
func (sm *SessionManager) GetSession(id string) (*Session, error)
GetSession 获取已有的会话. 如果会话不存在,返回 nil 和错误.
func (*SessionManager) ListSessions ¶
func (sm *SessionManager) ListSessions() []SessionInfo
ListSessions 列出所有活跃会话的摘要信息.
返回按最后活跃时间倒序排列的列表.
func (*SessionManager) RemoveSession ¶
func (sm *SessionManager) RemoveSession(id string)
RemoveSession 移除并关闭一个会话.
func (*SessionManager) ResumeSession ¶
func (sm *SessionManager) ResumeSession(transcriptPath string) (*Session, error)
ResumeSession 从磁盘恢复一个会话.
读取指定路径的 transcript 文件,恢复消息历史和统计信息.
func (*SessionManager) SessionCount ¶
func (sm *SessionManager) SessionCount() int
SessionCount 返回当前活跃会话数.
func (*SessionManager) SetSessionTitle ¶
func (sm *SessionManager) SetSessionTitle(id, title string)
SetSessionTitle 设置会话标题.
func (*SessionManager) UpdateSessionInfo ¶
func (sm *SessionManager) UpdateSessionInfo(id string, inputTokens, outputTokens int, costUSD float64)
UpdateSessionInfo 更新会话的统计信息.
每轮对话结束后由引擎调用,更新 token 计数,花费等信息.
type SessionManagerConfig ¶
type SessionManagerConfig struct {
// MaxSessions 最大并发会话数,0 使用默认值 (100)
MaxSessions int
// AutoSaveEvery 每多少轮自动保存一次 transcript,0 表示不自动保存
AutoSaveEvery int
// ExpireAfter 会话过期时间,超过此时间未活跃的会话将被清理.
// 0 表示不过期
ExpireAfter time.Duration
// Cwd 工作目录,用于生成 transcript 路径
Cwd string
// Model 模型名称,用于保存 transcript 元数据
Model string
}
SessionManagerConfig 是会话管理器的配置.
type SessionProvider ¶
type SessionProvider interface {
// ListSince 返回自 sinceTime 以来被修改/创建的会话 ID 列表.
// 实现者可以过滤当前会话(避免把还没写完的会话计入).
ListSince(sinceTime time.Time) ([]string, error)
}
SessionProvider 提供自上次巩固以来的会话 ID 列表.
升华改进(ELEVATED): 早期实现 直接扫描 transcript 目录(listSessionsTouchedSince). 我们提取为接口,解耦具体存储格式--
CLI 场景:FileSessionProvider 扫描 JSONL 文件 SDK/API 场景:实现者可以从数据库,消息队列或自定义存储查询 跨行业:仓储/金融 Agent 的"会话"可能是工单/交易,与 JSONL 完全无关
替代方案:<TranscriptDir string 直接传路径,在引擎内硬编码 JSONL 扫描> - 否决原因:绑死了 CLI 场景,SDK/API 嵌入无法定制数据源.
type SessionSnapshot ¶
type SessionSnapshot struct {
// ConversationID 会话唯一标识,与 SnapshotStore.Save/Load/Delete 的 key 对应.
// 由消费层分配(UUID,时间戳+随机数等).
ConversationID string `json:"conversation_id"`
// Messages 是中断点时的完整消息历史(含已发送的 user/assistant/tool_result 消息).
// 恢复时作为 WithMessages 的参数传入 Engine.Run().
//
// 升华改进(ELEVATED): 使用 query.Message 而非 internal/api.RequestMessage--
// query.Message 是引擎公开 API 层的类型(带 Metadata 字段),
// 序列化更稳定,且消费层可以检查/修改 Messages(例如注入安全过滤).
// 内部执行前 convertQueryMessage 会转换为 api.RequestMessage.
Messages []query.Message `json:"messages"`
// TurnIndex 中断时完成的轮次数(从 0 开始,已完成的最后一轮的索引).
// 仅供调试和日志使用;恢复执行时不用此字段控制流程.
TurnIndex int `json:"turn_index"`
// PartialToolUse 记录流被截断时已收到的不完整工具调用(可选).
// 非 nil 表示中断发生在工具调用流式传输过程中.
// 恢复时 maybeInjectResumeSentinel 会根据消息列表末尾的工具结果状态
// 自动注入哨兵,PartialToolUse 作为附加信息供调试使用.
PartialToolUse *PartialToolUse `json:"partial_tool_use,omitempty"`
// SavedAt 快照创建时间(UTC).
// 用于判断快照是否过期(例如超过 24 小时不再恢复,从头开始).
SavedAt time.Time `json:"saved_at"`
}
SessionSnapshot 是会话断点的完整状态快照.
快照包含恢复对话所需的最小数据集--不是完整 Transcript, 只保存"恢复执行需要什么",删除后不影响审计(Transcript 独立记录审计数据).
精妙之处(CLEVER): TurnIndex 与 Messages 配对存储-- 恢复时 Engine.Run() 通过 WithMessages(snap.Messages) 注入历史, TurnIndex 仅用于日志/调试,不影响执行路径. 若 Messages 已包含完整轮次,TurnIndex 也可从 Messages 推断-- 独立存储是为了"快速判断恢复点在第几轮"而无需遍历消息列表.
func BuildSnapshot ¶
func BuildSnapshot(convID string, messages []query.Message, turnIndex int) SessionSnapshot
BuildSnapshot 基于当前对话状态构造一个会话快照, 供消费层保存到 SnapshotStore.
典型用法(在 runLoop 外部监听 channel 的消费层):
// 消费层在收到 TurnEndEvent 后调用
if store != nil && convID != "" {
snap := engine.BuildSnapshot(convID, messages, turnIndex)
store.Save(ctx, snap)
}
注意:引擎本身不主动调用此函数--快照是消费层的责任. 引擎内部 runLoop 不感知 SnapshotStore,保持引擎核心的简洁.
升华改进(ELEVATED): 将"何时保存快照"的决策权留给消费层-- 有些场景只需每 N 轮保存一次(降低 I/O 开销), 有些场景需要每轮都保存(高可靠性要求). 替代方案:<引擎内部每轮保存> - 否决:增加引擎核心复杂度,且 I/O 策略因场景不同无法统一.
type SessionStats ¶
type SessionStats struct {
TurnCount int // Total turn count. 总轮次数.
InputTokens int // Cumulative input tokens. 累计输入 token 数.
OutputTokens int // Cumulative output tokens. 累计输出 token 数.
CostUSD float64 // Cumulative cost in USD. 累计花费 (美元).
MessageCount int // Total message count in session history. 会话历史消息数量.
}
SessionStats is the cumulative statistics of a session, exposed on both a pull API and a push event, so consumers can pick the shape that fits:
- Pull: call Session.Stats() at any time for a current snapshot (cheap, synchronous, holds the session mutex briefly). Suitable for UIs that render a "session summary" on demand or CLIs that print at exit.
- Push: subscribe to observer event `session_cost_threshold_crossed`. The engine emits this exactly once per crossed threshold from the preset ladder ($1/$5/$10/$50/$100) the first time cumulative CostUSD passes it in this session. The event payload carries the full 5 fields plus `threshold_usd` + `session_id`, so downstream cost-alert wiring (Slack/PagerDuty/dashboard) needs no additional pull.
These two paths are same-source: the snapshot shipped in the event payload is constructed inside the same mutex region as the field accumulators, so push payloads and a concurrent Stats() pull cannot diverge for a given turn. If you need "current full snapshot" use pull; if you need "somebody just crossed a cost tier" use push; the two do not overlap by design.
SessionStats 是会话的累计统计信息, 同时暴露 pull API 和 push 事件两条消费 路径, 消费者按场景选形:
- Pull: 任意时刻调 Session.Stats() 拿当前快照 (便宜, 同步, 仅短暂持会话 锁). 适合 UI 按需渲染"会话摘要"或 CLI 退出时打印.
- Push: 订阅 observer 事件 `session_cost_threshold_crossed`. 当会话累计 CostUSD 首次跨过预设档位 ($1/$5/$10/$50/$100) 任一档时, 引擎对每个 跨过的档位各 emit 一次. payload 含完整 5 字段 + `threshold_usd` + `session_id`, 下游成本告警接线 (Slack/PagerDuty/dashboard) 无需再 pull.
两条路径同源: 事件 payload 的快照在 mutex 保护区内和字段累加器同一区域 构造, 因此同一轮的 push payload 和并发 pull 的 Stats() 不会偏离. 要"当前 完整快照"走 pull, 要"成本跨档告警"走 push, 两者刻意不重叠.
type Skill ¶
type Skill struct {
// Name 技能名称(调用时使用的标识符)
Name string
// Description 技能描述(用于 LLM 决策"何时调用此 Skill")
Description string
// WhenToUse 详细的使用场景说明
WhenToUse string
// AllowedTools 此技能限制使用的工具列表(空 = 不限制)
AllowedTools []string
// Content 技能正文(Markdown,兼作提示词模板)
Content string
// Source 技能来源:"bundled" | "user" | "project" | "plugin"
Source string
// FilePath is the absolute path of the skill source file ("" = in-memory
// registration). Wire: SkillRegistry.Invoke emits a skill_invoked observer
// event whose payload carries this value verbatim (empty string is kept,
// NOT omitted) so audit pipelines can distinguish in-memory Skills ("") from
// disk-loaded ones.
//
// FilePath 是 skill 源文件的绝对路径 ("" = 内存注册). Wire 路径:
// SkillRegistry.Invoke 发 skill_invoked 观测事件, payload 原样携带本值 (空串
// 显式保留, **非** omitempty), 审计管道据此区分 in-memory Skill ("") 和磁盘
// 加载的 skill.
FilePath string
// SkillDir 技能目录路径(用于 ${FLYTO_SKILL_DIR} 替换)
SkillDir string
// Context 执行模式(默认 inline)
Context ExecutionContext
// AgentType selects the Agent type used for fork-mode execution ("" =
// engine's general-purpose default). Wire: invokeFork resolves this name
// via SkillRegistry.agentRegistry and delivers the FULL promise of
// "SubAgent behaves as this Agent type":
// - cfg.AllowedTools = ResolveToolset(def, parentTools) four-layer filter
// (parent ∩ def.AllowedTools, Background narrow, minus DisallowedTools
// and globalDisallowed). MCP tools auto-pass the whitelist.
// - If skill itself declares AllowedTools, intersect with the resolved
// set — skill cannot widen beyond what the Agent type permits.
// - cfg.Model falls back to def.Model when skill.Model is "".
// - cfg.MaxTurns falls back to def.MaxTurns when skill.MaxTurns is 0;
// both zero → default 10.
// - cfg.AllowedSubAgentTypes = def.AllowedSubAgentTypes.
// On miss (typo): emit skill_fork_unknown_agent_type diagnostic then
// fall back to the empty-AgentType branch (no resolution). Silent
// fallback would hide typos, hard error would break UX.
//
// AgentType 指定 fork 模式下使用的 Agent 类型 ("" = 引擎 general-purpose
// 默认). Wire 路径: invokeFork 经 SkillRegistry.agentRegistry 解析该名称,
// 命中后完整兑现 "SubAgent 表现成该 Agent 类型" 承诺:
// - cfg.AllowedTools = ResolveToolset(def, parentTools) 四层过滤
// (父工具 ∩ def.AllowedTools → Background 收窄 → 去
// DisallowedTools ∪ globalDisallowed). MCP 工具自动通过白名单.
// - skill 自己声明了 AllowedTools 时, 与上述结果取交集 -- skill 不能
// 扩张到 Agent 类型允许之外的工具.
// - cfg.Model 在 skill.Model 为 "" 时 fallback 到 def.Model.
// - cfg.MaxTurns 在 skill.MaxTurns 为 0 时 fallback 到 def.MaxTurns;
// 两者都 0 → 默认 10.
// - cfg.AllowedSubAgentTypes = def.AllowedSubAgentTypes.
// 未命中 (拼写错): 发 skill_fork_unknown_agent_type 诊断事件后 fallback
// 到空串分支 (不解析). 静默 fallback 会隐藏拼写错误, 硬报错又会破坏 UX.
AgentType string
// Model 模型覆盖(空 = 继承父 Engine 的模型)
Model string
// MaxTurns fork 模式下的最大轮数(0 = 默认 10)
MaxTurns int
// Paths glob 激活过滤(P1):仅当 CWD 前缀匹配或 glob 匹配时激活此 Skill.
// 例如:["src/*.ts", "src/components/**"] 表示只在匹配目录下激活.
//
// 升华改进(ELEVATED): 早期实现 paths 对照的是"已读过的文件";
// 我们的实现对照 CWD(更实时,更轻量,不需要查历史 I/O 记录).
// CWD 前缀匹配 + filepath.Match glob 双模式:
// 前缀匹配:Paths = ["src/"] → CWD 以 src/ 打头时激活.
// glob 匹配:Paths = ["*/frontend/**"] → filepath.Match 判断.
// 两种模式 OR 关系--任意一条命中即激活.
// 替代方案:<原 LEGACY 方案:字段已解析但暂不生效> - 已废弃.
Paths []string
// Version is the semantic-version string declared in the skill's YAML
// frontmatter (e.g. "1.0.0"), free-form (no parse-time validation). Wire:
// SkillRegistry.Invoke carries this value in the skill_invoked observer
// event so audit / telemetry can correlate each invocation with the
// skill revision that was live at the time. Future uses (cache
// invalidation, version gating) are intentionally NOT implemented here —
// the minimal commitment is "every invocation records which version ran".
//
// Version 是 skill YAML frontmatter 声明的语义化版本字符串 (如 "1.0.0"),
// 自由格式不在加载期校验. Wire 路径: SkillRegistry.Invoke 把本值写进
// skill_invoked 观测事件, 审计 / telemetry 据此把每次调用和当时在册的 skill
// 修订对齐. 缓存失效 / 版本 gate 等进一步用途故意暂不实现 -- 当前最小承诺
// 是"每次调用都记录了跑的是哪一版".
Version string
// UserInvocable 是否允许用户通过 /skill-name 直接调用
UserInvocable bool
// ArgumentHint 参数提示(在 slash 命令补全中显示)
ArgumentHint string
}
Skill 是一个已加载的技能定义.
精妙之处(CLEVER): Content 字段同时用于两种场景-- (1) FormatAsPrompt() 用于生成系统提示词片段(向 LLM 描述 Skill 的用途) (2) ExpandPrompt() 用于运行时模板展开(调用时作为提示词注入) 这样一个字段覆盖两种用途,避免 Content 与 PromptTemplate 的冗余维护.
func ScanSkillsDir ¶
ScanSkillsDir 扫描指定目录中的所有 Skill 文件.
支持两种格式:
- 子目录格式:dir/skill-name/SKILL.md
- 扁平格式:dir/skill-name.md
参数:
- dir: 技能目录路径(不存在则返回空列表,不报错)
- source: 来源标识("bundled" | "user" | "project" | "plugin")
精妙之处(CLEVER): 扁平格式和子目录格式可以共存于同一目录. 若同名(skill-name.md 和 skill-name/SKILL.md),子目录格式优先. 原因:子目录格式是新格式(与早期实现 兼容),扁平是遗留格式.
func (*Skill) ExpandPrompt ¶
ExpandPrompt 展开提示词模板,替换变量.
支持的模板变量:
$ARGUMENTS → 传入的参数字符串
${FLYTO_SKILL_DIR} → Skill 文件目录路径
${FLYTO_SESSION_ID} → 会话 ID
精妙之处(CLEVER): 顺序替换而非正则-- $ARGUMENTS 先替换,避免参数内容被错误地识别为其他变量名. 如果参数值包含 "${FLYTO_SKILL_DIR}",也只是文本,不会二次展开.
func (*Skill) FormatAsPrompt ¶
FormatAsPrompt 将技能格式化为系统提示词片段(用于描述 Skill).
func (*Skill) MatchesCwd ¶
MatchesCwd 判断此 Skill 是否在给定的工作目录下激活.
规则:
- Paths 为空 → 无限制,任何目录都激活(返回 true).
- 逐条检查 Paths 元素: a. 前缀匹配:如果 cwd 以 path(CleanPath 后)开头 → 激活. b. glob 匹配:filepath.Match(pattern, cwd) == true → 激活. c. 任意一条命中即返回 true(OR 关系).
精妙之处(CLEVER): 前缀 + glob 双模式-- 前缀模式直观(paths: ["src/"]),glob 模式强大(paths: ["*/frontend/**"]). filepath.Match 已处理通配符,前缀检查覆盖目录树匹配,两者互补. 替代方案:只用 glob("src/**" 比 "src/" 冗长,用户体验差).
type SkillInvokeResult ¶
type SkillInvokeResult struct {
Mode ExecutionContext // "inline" | "fork"
Content string // 展开的提示词(inline)或子 Agent 结果(fork)
AllowedTools []string // 工具限制(来自 Skill.AllowedTools)
Model string // 模型建议(来自 Skill.Model)
}
SkillInvokeResult 是 engine 内部 Invoke 方法的返回结果. 对外(SkillTool)通过 builtin.SkillResult 暴露(见 InvokeSkill 方法).
精妙之处(CLEVER): 保留引擎内部类型 + 暴露 builtin 兼容类型-- Go SDK 用户调用 registry.Invoke() 得到完整类型(含 ExecutionContext 枚举), SkillTool 调用 registry.InvokeSkill() 得到 builtin.SkillResult(接口兼容). 两个 API 并存,不同场景各取所需. 替代方案:直接暴露 builtin.SkillResult(损失 ExecutionContext 类型安全性).
type SkillRegistry ¶
type SkillRegistry struct {
// contains filtered or unexported fields
}
SkillRegistry 管理所有已注册的 Skill.
线程安全(RWMutex). 通过 Engine.skillRegistry 访问,不暴露为全局变量.
func SetupSkillTool ¶
func SetupSkillTool(engine *Engine, extraDirs []string) *SkillRegistry
SetupSkillTool 为 Engine 扫描并加载文件系统 Skill,绑定 SkillTool 执行器.
默认扫描路径(按优先级,后加载覆盖先加载):
- ~/.flyto/skills/ 和 ~/.claude/skills/ (用户级)
- <cwd>/.flyto/skills/ 和 <cwd>/.claude/skills/ (项目级)
- extraDirs (额外目录)
返回 SkillRegistry 供外部继续注册内置 Skill.
func (*SkillRegistry) Get ¶
func (r *SkillRegistry) Get(name string) (*Skill, bool)
Get 按名称查找 Skill.
func (*SkillRegistry) Invoke ¶
func (r *SkillRegistry) Invoke(ctx context.Context, name, args, sessionID string) (*SkillInvokeResult, error)
Invoke 执行指定 Skill.
根据 Skill.Context 决定执行方式:
- ExecutionContextInline(默认):展开提示词模板,直接返回文本
- ExecutionContextFork:spawn SubAgent,同步运行,返回结果
参数:
- ctx: 调用上下文(包含 Skill 嵌套深度)
- name: Skill 名称
- args: 用户传入的参数字符串(替换 $ARGUMENTS)
- sessionID: 当前会话 ID(替换 ${FLYTO_SESSION_ID})
func (*SkillRegistry) InvokeSkill ¶
func (r *SkillRegistry) InvokeSkill(ctx context.Context, name, args string) (*builtin.SkillResult, error)
InvokeSkill 实现 builtin.SkillExecutor 接口. 供 SkillTool 通过接口调用(避免 builtin 包直接依赖 engine 包).
精妙之处(CLEVER): 将引擎内部的 *SkillInvokeResult 转换为 *builtin.SkillResult-- 保持引擎 API 类型安全(ExecutionContext 枚举)的同时满足 builtin 包接口要求. 这与 agentExecutor 把 engine.SubAgent 结果转换为字符串返回的做法一致.
func (*SkillRegistry) List ¶
func (r *SkillRegistry) List(filters ...func(*Skill) bool) []*Skill
List 返回所有已注册的 Skill 列表. 可选传入过滤函数(返回 false 则排除).
func (*SkillRegistry) ListForCwd ¶
func (r *SkillRegistry) ListForCwd(cwd string, filters ...func(*Skill) bool) []*Skill
ListForCwd 返回在给定工作目录下激活的 Skill 列表.
升华改进(ELEVATED): 14.P1-A paths 激活-- 过滤出 Skill.Paths 匹配当前 CWD 的 Skill,用于系统提示词生成和 UI 补全. 与 List() 不同,此方法按 CWD 激活语义过滤,而非返回全量注册表. Paths 为空的 Skill 视为"全局激活",始终包含在结果中.
精妙之处(CLEVER): 委托给 Skill.MatchesCwd,逻辑单一责任-- 激活判断逻辑只在 Skill.MatchesCwd 里,ListForCwd 只做 filter, 测试可以单独测 MatchesCwd 而无需构造 Registry. 替代方案:在 ListForCwd 内部内联路径匹配逻辑(逻辑分散,难维护).
func (*SkillRegistry) ListSkillEntries ¶
func (r *SkillRegistry) ListSkillEntries() []*builtin.SkillEntryDesc
ListSkillEntries 实现 builtin.SkillExecutor 接口.
func (*SkillRegistry) Register ¶
func (r *SkillRegistry) Register(s *Skill) error
Register 注册一个 Skill.若已存在同名 Skill,返回错误. 用于文件加载的 Skill(允许冲突报错,提示用户命名重复).
func (*SkillRegistry) RegisterAll ¶
func (r *SkillRegistry) RegisterAll(skills []*Skill)
RegisterAll 批量注册 Skill 列表(覆盖同名,后加载优先). 用于 ScanSkillsDir 返回的结果批量加载.
func (*SkillRegistry) RegisterBuiltin ¶
func (r *SkillRegistry) RegisterBuiltin(s *Skill)
RegisterBuiltin 注册一个内置 Skill(无条件覆盖同名). 用于代码内置的 Skill(RegisterBuiltin 模式).
精妙之处(CLEVER): 内置 Skill 总是胜过文件 Skill(后注册 vs 先加载)-- 这与 Plugin.RegisterBuiltin 的语义一致:内置实现是权威定义, 文件 Skill 是用户自定义扩展,不能覆盖核心功能. 替代方案:文件优先(会让 discuss.md 覆盖内置 discuss Skill,难以调试).
func (*SkillRegistry) SetAgentRegistry ¶
func (r *SkillRegistry) SetAgentRegistry(ar *AgentRegistry)
SetAgentRegistry wires the AgentRegistry used by invokeFork to resolve skill.AgentType. Idempotent; nil clears. Engine.New calls this in the post-construction wiring block to avoid breaking the 16 existing newSkillRegistry(nil) test call sites with a signature change.
SetAgentRegistry 注入供 invokeFork 解析 skill.AgentType 使用的 AgentRegistry. 幂等, 传 nil 清空. Engine.New 在后置回填块里调用, 避免改动 newSkillRegistry 签名 打破现有 16 处 newSkillRegistry(nil) 测试调用.
func (*SkillRegistry) SetObserver ¶
func (r *SkillRegistry) SetObserver(o EventObserver)
SetObserver wires the EventObserver that receives skill_invoked / skill_fork_unknown_agent_type events. Idempotent; nil disables emission.
SetObserver 注入接收 skill_invoked / skill_fork_unknown_agent_type 事件的 EventObserver. 幂等, 传 nil 关闭发射.
func (*SkillRegistry) SetParentToolNames ¶
func (r *SkillRegistry) SetParentToolNames(f func() []string)
SetParentToolNames wires the provider function for parent tool name list, used by invokeFork to call AgentRegistry.ResolveToolset. Engine.New injects `func() []string { return eng.tools.Names() }`. nil clears.
SetParentToolNames 注入父工具名列表的 provider, 供 invokeFork 调用 AgentRegistry.ResolveToolset. Engine.New 传入 `func() []string { return eng.tools.Names() }`. 传 nil 清空.
type SkillSpawner ¶
type SkillSpawner interface {
// SpawnSkillAgent 创建并同步运行一个 SubAgent,返回其结果文本.
// cfg 描述 SubAgent 的工具限制和模型配置,prompt 是传入的提示词.
SpawnSkillAgent(ctx context.Context, cfg *SubAgentConfig, prompt string) (string, error)
}
SkillSpawner 是 SkillRegistry 执行 fork 类型 Skill 所需的最小接口.
升华改进(ELEVATED): 从 *Engine 具体类型提取为最小接口-- SkillRegistry 不再持有整个 Engine 引用,只持有"能启动子Agent并运行"这一个能力. 好处:
- 打破循环依赖:SkillRegistry ↛ Engine(单向:Engine → SkillRegistry)
- 测试可注入 mock,无需构造真实 Engine
- 未来可以替换实现(如远程 Agent,受限沙箱 Agent)
替代方案:<保留 *Engine>--否决:循环依赖,测试困难,职责错配.
type SlashCommand ¶
SlashCommand 表示一个斜杠命令.
type SlashCommandEvent ¶
type SlashCommandEvent = flyto.SlashCommandEvent
type SnapshotStore ¶
type SnapshotStore interface {
// Save 保存(或覆盖)快照.返回写入错误.
Save(ctx context.Context, snap SessionSnapshot) error
// Load 加载快照.若不存在返回 (zero, false, nil).
Load(ctx context.Context, conversationID string) (SessionSnapshot, bool, error)
// Delete 删除快照(成功恢复后调用,清理存储).幂等,不存在时不报错.
Delete(ctx context.Context, conversationID string) error
}
SnapshotStore 是会话快照的读写接口.
实现约定:
- Save 必须是原子性的(不能写一半)
- Load 若 key 不存在,返回 (zero, false, nil)
- Delete 若 key 不存在,静默返回 nil(幂等)
升华改进(ELEVATED): 接口只有 3 个方法(Save/Load/Delete)-- 简单的接口更容易实现,Redis 接入只需 SET/GET/DEL,数据库只需 UPSERT/SELECT/DELETE. 如果需要列出所有快照(管理界面),可以在接口外部通过 ListSnapshots(dir) 工具函数实现, 不污染核心接口. 替代方案:<更丰富的接口(List/Expire/CAS)> - 否决:YAGNI,过度设计的接口让实现方负担重,且 80% 场景不需要这些方法.
type StderrObserver ¶
type StderrObserver struct {
MinLevel string // "debug" / "info" / "warn" / "error",默认 "info"
Output io.Writer // 输出目标,默认 os.Stderr
// contains filtered or unexported fields
}
StderrObserver 开发调试用的 stderr 输出(默认).
升华改进(ELEVATED): 与 internal/logger 互补而非替代. logger 是给开发者看的文本日志(调试用),Observer 是给系统消费的结构化事件流. StderrObserver 在两者之间架桥--开发时可以在 stderr 看到 Observer 事件. 替代方案:直接用 logger.Info()(丢失结构化数据,无法被监控系统消费).
type StepExecStatus ¶
type StepExecStatus string
StepExecStatus 是单个步骤的执行状态.
const ( StepExecPending StepExecStatus = "pending" StepExecRunning StepExecStatus = "running" StepExecDone StepExecStatus = "done" StepExecSkipped StepExecStatus = "skipped" // 因依赖失败而跳过 StepExecFailed StepExecStatus = "failed" )
type StepProgress ¶
type StepProgress struct {
// Step 原始步骤定义(来自 PlanApprovalEvent.Steps 或手动注册).
Step PlanStep
// Status 当前执行状态.
Status StepStatus
// StartedAt 步骤开始时间(Status 变为 Running 时记录,零值表示未开始).
StartedAt time.Time
// FinishedAt 步骤结束时间(Status 变为 Done/Failed/Skipped 时记录).
FinishedAt time.Time
// ErrorMessage 失败原因(仅 StatusFailed 时非空).
ErrorMessage string
// AgentID 执行此步骤的 Agent ID(可选,SubAgent 场景中有意义).
AgentID string
}
StepProgress 是单个计划步骤的执行进度快照.
升华改进(ELEVATED): StepProgress 是不可变快照(每次状态变更产生新快照)-- 消费方可以对比前后两个快照来决定 UI 怎么刷新,也可以把快照序列存入数据库做审计. 替代方案:<步骤直接持有可变状态> - 否决原因:并发时消费方读到中间态,需要额外加锁.
func (StepProgress) Duration ¶
func (s StepProgress) Duration() time.Duration
Duration 返回步骤已执行的时间. 未开始时返回 0;运行中时返回已经过的时间;完成后返回总耗时.
func (StepProgress) IsTerminal ¶
func (s StepProgress) IsTerminal() bool
IsTerminal 返回步骤是否处于终态(done/failed/skipped).
type StepStatus ¶
type StepStatus string
StepStatus 是计划步骤的执行状态.
精妙之处(CLEVER): 只有 5 个状态,不引入"paused"等复杂中间态-- 引擎不做调度(那是消费方的责任),所以不需要暂停概念. 消费方可以在 pending 状态的步骤上"不调用 Start"来实现暂停语义.
const ( // StepStatusPending 步骤尚未开始(初始状态). StepStatusPending StepStatus = "pending" // StepStatusRunning 步骤正在执行(至少有一个 agent 在处理). StepStatusRunning StepStatus = "running" // StepStatusDone 步骤成功完成. StepStatusDone StepStatus = "done" // StepStatusFailed 步骤执行失败(触发重规划或上报错误). StepStatusFailed StepStatus = "failed" // StepStatusSkipped 步骤因依赖失败而跳过(不是执行失败,是路径规划放弃). StepStatusSkipped StepStatus = "skipped" )
type StrictMode ¶
type StrictMode struct {
ToolResultPairing bool // 消息配对异常:true=panic false=修复
CompactFailure bool // 压缩失败:true=panic false=降级
NormalizerError bool // 规范化异常:true=panic false=跳过
}
StrictMode 严格模式配置.
func (*StrictMode) Check ¶
func (s *StrictMode) Check(condition string, enabled bool, observer EventObserver, detail string)
Check 在严格模式开启时 panic,否则记录 observer 事件.
精妙之处(CLEVER): 一行代码同时处理严格模式和可观测性. 调用点只需要 `strictMode.Check("condition", enabled, observer, "detail")`, 不需要自己写 if/else + observer.Event(). 替代方案:每个调用点自己写 if strict { panic } else { observer.Event() }(重复代码 20+ 处).
func (*StrictMode) CheckCompactFailure ¶
func (s *StrictMode) CheckCompactFailure(observer EventObserver, detail string)
CheckCompactFailure 检查压缩失败.
func (*StrictMode) CheckNormalizerError ¶
func (s *StrictMode) CheckNormalizerError(observer EventObserver, detail string)
CheckNormalizerError 检查规范化异常.
func (*StrictMode) CheckToolResultPairing ¶
func (s *StrictMode) CheckToolResultPairing(observer EventObserver, detail string)
CheckToolResultPairing 检查 tool_result 配对异常.
type SubAgent ¶
type SubAgent struct {
// ID 是子 Agent 的唯一标识符
ID string
// Description 是子 Agent 的任务描述
Description string
// Model 是子 Agent 使用的模型
Model string
// ParentEngine 是创建此子 Agent 的父引擎(fork 的源头).
// 使用 EngineRef 接口而非 *Engine,使 SubAgent 可在测试中接受 mock.
// 注:SpawnSubAgent 函数仍接受 *Engine;此处字段仅用于 activity/observer/context 访问.
ParentEngine EngineRef
// Status 是当前运行状态
Status SubAgentStatus
// Progress 是运行进度跟踪器
Progress *SubAgentProgress
// StartTime 是启动时间
StartTime time.Time
// EndTime 是结束时间
EndTime time.Time
// Result 是运行结果文本
Result string
// Error 是运行错误(如果有)
Error error
// Cwd 是子 Agent 的工作目录(可能与父 Agent 不同,如 worktree 模式)
Cwd string
// contains filtered or unexported fields
}
SubAgent 是一个子 Agent 实例(Fork 模式).
精妙之处(CLEVER): 不再创建独立的 Engine 实例,而是 fork 父 Engine 的关键资源. 共享:API 客户端,系统提示,完整工具列表(缓存命中) 独立:消息历史,权限限制,轮数限制,abort controller
func SpawnSubAgent ¶
func SpawnSubAgent(parentEngine *Engine, cfg *SubAgentConfig) *SubAgent
SpawnSubAgent 从父 Engine fork 一个子 agent.
精妙之处(CLEVER): 共享父 engine 的 API 客户端,系统提示,完整工具列表(缓存命中), 但独立消息历史和权限限制.这样每个子 agent 的首轮 API 调用都能命中 父 engine 已建立的 prompt cache,省下 10K+ 系统提示 token 的重新传输费用.
参数:
- parentEngine: 父 Engine 实例,fork 的源头
- cfg: 子 Agent 的配置
返回创建的子 Agent(尚未启动,需要调用 Run 或 RunBackground).
func (*SubAgent) Run ¶
Run 执行子 agent 的查询循环. 返回事件 channel,调用者可以从中读取子 Agent 产生的所有事件. 运行完成后 channel 会被关闭.
和父 engine 的 runLoop 类似,但:
- 系统提示复用父 engine 的(cache 命中)
- API 请求传完整工具列表(cache key 一致)
- 工具执行前用 canUseTool 过滤(运行时拦截)
- 独立的消息历史和轮数限制
- 不执行 Dream,Memory 提取等后台任务(精简版 runLoop)
Channel contract (2026-04-20 行为变更) ¶
返回的 channel 发送**裸类型事件** (*TextEvent / *ToolUseEvent / *TurnEndEvent / *ErrorEvent / *DoneEvent / *TextDeltaEvent / *ToolResultEvent / *WarningEvent), 对齐父 Engine.Run() 的 channel shape. 消费端可以直接 `for evt := range ch; switch v := evt.(type) { case *TextEvent: ... }` 无需 unwrap.
**Breaking**: 2026-04-20 之前本 channel 发送的是 `*SubAgentEvent` 包装 (SubAgentID + Inner). 直接消费 sa.Run() channel 的外部 SDK 若 在 type switch 上 `case *SubAgentEvent:` 现在将收不到任何事件; 请 改为 type-switch 裸事件类型. SubAgentID 归属现在走父引擎 Run channel 的 `*SubAgentEvent` 包装 (在父 engine 视角) 或 observer 路径 (后台子 agent), 不在 sa.Run() 自己的 channel 上重复暴露.
Channel contract (2026-04-20 behavior change) ¶
The returned channel sends **bare-typed events** (*TextEvent, *ToolUseEvent, *TurnEndEvent, *ErrorEvent, *DoneEvent, *TextDeltaEvent, *ToolResultEvent, *WarningEvent), aligning with the parent Engine.Run() channel shape. Consumers can directly `for evt := range ch; switch v := evt.(type) { case *TextEvent: ... }` without unwrapping.
**Breaking**: prior to 2026-04-20 this channel sent `*SubAgentEvent` wrappers (SubAgentID + Inner). External SDKs that consumed sa.Run() channel and type-switched on `case *SubAgentEvent:` will now receive nothing; update to type-switch the bare event types. SubAgentID attribution is exposed via `*SubAgentEvent` wrappers on the parent engine's Run channel (from the parent engine's viewpoint) or via the observer path (background sub-agents), not re-emitted on sa.Run()'s own channel.
func (*SubAgent) RunBackground ¶
RunBackground 在后台 goroutine 中运行子 Agent. 立即返回,不阻塞调用者. 运行完成后可以通过 Wait() 等待结果,或通过 Status/Progress 查询进度.
func (*SubAgent) RunSync ¶
RunSync 同步运行子 Agent,阻塞直到完成,返回最终结果文本. 这是最常用的同步模式--父 Agent 等待子 Agent 完成后使用结果.
func (*SubAgent) RunSyncWithCallback ¶
func (sa *SubAgent) RunSyncWithCallback( ctx context.Context, prompt string, onTurn func(text string, toolUses []ToolUseInfo), ) (string, error)
RunSyncWithCallback 同步运行子 Agent,每个 assistant 轮次完成后调用 onTurn 回调.
onTurn 接收:
- text:本轮 assistant 的文本内容(可能为空,如纯工具调用轮次)
- toolUses:本轮工具调用列表(含 file_path 提取,用于 filesTouched 追踪)
升华改进(ELEVATED): 早期实现 runForkedAgent 接受 onMessage callback, 但只能拿到完整 Message 对象(包含原始 content blocks). 我们提供更高级别的抽象:text + []ToolUseInfo,消费方(Dream)不需要解析 blocks. 替代方案:<暴露原始 []Event>--否决原因:消费方需要自己处理轮次边界(TurnEndEvent), 增加了不必要的复杂度.onTurn 的语义更清晰:每个 assistant 轮次调用一次.
onTurn 为 nil 时退化为 RunSync,无额外开销.
type SubAgentConfig ¶
type SubAgentConfig struct {
// Description 任务描述
Description string
// Model 使用的模型(为空则继承父 Agent 的模型)
Model string
// AllowedTools 实际允许使用的工具(权限层限制).
// nil = 不限制(可使用所有工具),非 nil = 白名单.
AllowedTools map[string]bool
// MaxTurns 最大轮数(0 = 默认 10)
MaxTurns int
// Cwd 工作目录(为空则继承父 Agent 的工作目录)
Cwd string
// Chain 父查询链(用于 fork 子链追踪)
Chain *QueryChainTracking
// HistoryMessages 预置的对话历史消息(在任务 prompt 之前追加到消息列表).
//
// 升华改进(ELEVATED): 早期实现 runForkedAgent 通过 cacheSafeParams 传入父对话历史,
// SubAgent 能看到完整的父对话消息,记忆提取才能分析"发生了什么".
// 早期方案 Go 的 runMemoryExtraction 只传了一条 prompt 消息,SubAgent 没有任何历史可分析.
// 我们把历史消息作为显式参数传给 SubAgent,修复这个核心缺口.
// 替代方案:在 prompt 里把历史消息拼成文本(丢失结构,模型难以理解轮次边界).
HistoryMessages []query.Message
// MemoryDirRestrict 非空时,Edit/Write 工具只允许写入此目录下的路径.
//
// 升华改进(ELEVATED): 早期实现 createAutoMemCanUseTool 用 isAutoMemPath(filePath)
// 在 canUseTool 中检查路径.我们把"允许写入哪个目录"提升到 SubAgentConfig 层,
// 不是硬编码在 canUseTool 里,而是配置驱动--记忆提取传 memDir,
// 其他场景传空字符串(不限制路径).
// 替代方案:在 SubAgent 内部硬编码特殊规则(不通用,无法用于其他场景).
MemoryDirRestrict string
//
// 精妙之处(CLEVER): Prompt Cache 命中依赖内容字节完全一致--
// 父 Engine 把实际发送的系统提示字节直接传给 SubAgent,
// SubAgent 用相同字节发请求,哈希必然一致,100% 命中缓存 slot.
//
// nil 时退化为独立渲染(向后兼容,SubAgent 自己调 buildSystemPrompt()).
// 非 nil 时,SubAgent 的 runLoop 优先使用此字节,跳过渲染.
//
// 替代方案:<SubAgent 独立渲染系统提示>
// - 否决:时间戳,随机值等动态内容会导致字节不一致,缓存 miss.
SharedSystemPromptBytes []byte
// AllowedSubAgentTypes 限制此子 Agent 可以通过 Agent 工具 spawn 的子 Agent 类型.
//
// 升华改进(ELEVATED): 对应 AgentDefinition.AllowedSubAgentTypes 字段--
// 在 SpawnSubAgent 时从 AgentDefinition 传入,在 canUseTool 中运行时检查.
// 例如:Explore agent 的 AllowedSubAgentTypes=["Plan","Verification"],
// 防止只读 agent 通过 spawn general-purpose 子 agent 来绕过工具限制.
// nil/空 = 无限制(可 spawn 任何 AgentType).
// 注意:Agent 工具本身可能被 globalDisallowed 禁用(见 DefaultGlobalDisallowedTools),
// 此字段仅在 Agent 工具可用时生效.
AllowedSubAgentTypes []string
// SilentEvents 非 true 时子 agent 不 emit SubAgentStart/End 生命周期事件
// 也不向父引擎 Run channel 转发业务事件 (sa.Run() 自己的 channel 仍收
// 业务事件供本地消费者 — RunSync 的 resultTexts 收集等). 用于后台静默
// 任务 (runMemoryExtraction) — 不污染用户可见的事件流. 默认 false
// (正常子 agent 要可见).
//
// When SilentEvents is true, the sub-agent skips SubAgentStart/End
// lifecycle events and does not forward business events to the parent
// engine's Run channel (sa.Run()'s own channel still receives business
// events for local consumers — e.g. RunSync's resultTexts collector).
// Used for background silent tasks (runMemoryExtraction) to avoid
// polluting user-visible event streams. Default false (normal
// sub-agents should be visible).
SilentEvents bool
}
SubAgentConfig 是创建子 Agent 的配置.
type SubAgentEndEvent ¶
type SubAgentEndEvent struct {
// SubAgentID 是子 agent 的唯一标识符, 与对应 SubAgentStartEvent 配对.
SubAgentID string
// Duration 是子 agent 总运行时长 (EndTime - StartTime).
Duration time.Duration
// Status 是子 agent 退出时的最终状态 (Completed / Failed / Cancelled).
Status SubAgentStatus
// Result 是子 agent 的最终文本结果, 截断到 subAgentResultMaxBytes.
Result string
// Error 是子 agent 错误消息 (error.Error()); nil 错误为空串.
Error string
}
SubAgentEndEvent 是子 agent 生命周期结束事件, 由 sa.runLoop 在所有业务 事件之后 (包括取消 / 错误 / maxTurns 等各种退出路径) emit 到父引擎 Run channel. Duration = EndTime - StartTime, Status 反映最终状态 (Completed/ Failed/Cancelled), Result 是最终文本结果 (截断到 2KB 避免 SSE payload 膨胀), Error 是错误消息字符串 (用 error.Error() 扁平化, 不序列化原始 error 类型, 让 SSE JSON 稳定).
SubAgentEndEvent is the sub-agent lifecycle end event emitted by sa.runLoop after all business events (through any exit path: cancel / error / maxTurns). Duration = EndTime - StartTime; Status reflects the final state (Completed/Failed/Cancelled); Result is the final text output (truncated to 2KB to keep SSE payload sizes sane); Error is the flattened error message (via error.Error()) — the underlying error type is not serialized so JSON stays stable across versions.
func (*SubAgentEndEvent) EventType ¶
func (e *SubAgentEndEvent) EventType() string
EventType 实现 Event 接口. 返回 "subagent_end". EventType implements the Event interface. Returns "subagent_end".
type SubAgentEvent ¶
type SubAgentEvent struct {
// SubAgentID 是产生此事件的子 Agent 的 ID
SubAgentID string
// Inner 是原始事件
Inner Event
}
SubAgentEvent 是子 Agent 产生的事件的包装. 在子 Agent 的原始事件基础上增加了子 Agent 的 ID 信息, 使父 Agent / 消费层能够区分来自不同子 Agent 的事件.
func (*SubAgentEvent) EventType ¶
func (e *SubAgentEvent) EventType() string
type SubAgentProgress ¶
type SubAgentProgress struct {
ToolUseCount int // 工具调用计数
InputTokens int // 累计输入 token 数
OutputTokens int // 累计输出 token 数
Activities []string // 最近的工具调用描述(最多 5 个)
// contains filtered or unexported fields
}
SubAgentProgress 追踪子 Agent 的运行进度.
func (*SubAgentProgress) AddActivity ¶
func (p *SubAgentProgress) AddActivity(activity string)
AddActivity 添加一条活动记录(保留最近 5 条).
func (*SubAgentProgress) Snapshot ¶
func (p *SubAgentProgress) Snapshot() SubAgentProgress
Snapshot 返回进度的快照副本(线程安全).
type SubAgentRegistry ¶
type SubAgentRegistry struct {
// contains filtered or unexported fields
}
SubAgentRegistry 是子 Agent 注册表,管理所有活跃的子 Agent.
生命周期:创建后使用,不再需要时调用 Close() 释放资源.
func NewSubAgentRegistry ¶
func NewSubAgentRegistry() *SubAgentRegistry
NewSubAgentRegistry 创建一个新的子 Agent 注册表.
func SetupAgentExecutor ¶
func SetupAgentExecutor(engine *Engine, taskStore *builtin.TaskStore, agentRegistry *AgentRegistry) *SubAgentRegistry
SetupAgentExecutor 为 Engine 设置 AgentExecutor. 在 Engine 初始化后调用,连接 AgentTool 和 SubAgent 系统.
参数:
- engine: 父 Engine 实例
- taskStore: 任务存储(用于后台模式的任务追踪)
- agentRegistry: Agent 类型注册表(用于工具集解析,可为 nil)
返回创建的 SubAgentRegistry(供外部使用).
func (*SubAgentRegistry) CancelAll ¶
func (r *SubAgentRegistry) CancelAll()
CancelAll 取消所有正在运行的子 Agent.
func (*SubAgentRegistry) Close ¶
func (r *SubAgentRegistry) Close()
Close 停止注册表,通知所有 watchCompletion goroutine 退出. 幂等(多次调用安全).通常在 Engine.Close() 时调用.
func (*SubAgentRegistry) Get ¶
func (r *SubAgentRegistry) Get(id string) (*SubAgent, bool)
Get 按 ID 获取子 Agent.
func (*SubAgentRegistry) ListByStatus ¶
func (r *SubAgentRegistry) ListByStatus(status SubAgentStatus) []*SubAgent
ListByStatus 列出指定状态的子 Agent.
func (*SubAgentRegistry) OnComplete ¶
func (r *SubAgentRegistry) OnComplete(fn func(sa *SubAgent))
OnComplete 注册一个子 Agent 完成时的回调函数. 当任何注册的子 Agent 完成运行(成功,失败或取消)时,回调会被调用.
func (*SubAgentRegistry) Register ¶
func (r *SubAgentRegistry) Register(sa *SubAgent) error
Register 注册一个子 Agent. 如果同 ID 的子 Agent 已存在,返回错误.
func (*SubAgentRegistry) Remove ¶
func (r *SubAgentRegistry) Remove(id string) bool
Remove 从注册表中移除子 Agent. 不会取消正在运行的子 Agent--调用者应先调用 Cancel().
func (*SubAgentRegistry) Summary ¶
func (r *SubAgentRegistry) Summary() map[SubAgentStatus]int
Summary 返回注册表的摘要信息.
type SubAgentStartEvent ¶
type SubAgentStartEvent struct {
// SubAgentID 是子 agent 的唯一标识符 (nextSubAgentID 生成).
SubAgentID string
// Description 是子 agent 的任务描述 (来自 SubAgentConfig.Description).
Description string
// Cwd 是子 agent 的工作目录 (worktree 模式下为分支路径, 否则继承父 engine cwd).
Cwd string
// Model 是子 agent 使用的模型 (空串表示继承父 agent).
Model string
// StartTime 是子 agent 启动的 wall-clock 时间 (sa.StartTime 同步填值).
StartTime time.Time
}
SubAgentStartEvent 是子 agent 生命周期开始事件, 由 sa.runLoop 在首个业务 事件之前 emit 到父引擎 Run channel (通过 ctx 注入的 EventEmitter). 让父 agent / SDK 消费者 / SSE / 审计 sink 知道子 agent 何时开工以及领了什么 任务. Description/Cwd/Model 快照来自 SubAgent.Description/Cwd/Model 字段 (SpawnSubAgent 构造期 populate).
SubAgentStartEvent is the sub-agent lifecycle start event emitted by sa.runLoop before the first business event, forwarded to the parent engine's Run channel via the ctx-injected EventEmitter. Lets parent agent / SDK consumers / SSE / audit sinks know when a sub-agent began and what task it took. Description/Cwd/Model are snapshots of the SubAgent.* fields populated at SpawnSubAgent time.
func (*SubAgentStartEvent) EventType ¶
func (e *SubAgentStartEvent) EventType() string
EventType 实现 Event 接口. 返回 "subagent_start". EventType implements the Event interface. Returns "subagent_start".
type SubAgentStatus ¶
type SubAgentStatus string
SubAgentStatus 是子 Agent 的运行状态.
const ( SubAgentStatusPending SubAgentStatus = "pending" // 已创建,尚未启动 SubAgentStatusRunning SubAgentStatus = "running" // 正在运行 SubAgentStatusCompleted SubAgentStatus = "completed" // 已完成 SubAgentStatusFailed SubAgentStatus = "failed" // 运行失败 SubAgentStatusCancelled SubAgentStatus = "cancelled" // 被取消 )
type Team ¶
type Team struct {
// contains filtered or unexported fields
}
Team 管理一组并行 Worker,协调 Leader 与 Workers 的通信.
升华改进(ELEVATED): Team 是 Agent 协作的第一层抽象-- 上层可以组合多个 Team(嵌套协调),实现层级式 Agent 网络. 跨行业扩展:制造场景的"工程师 + 多个机器人臂",金融场景的"分析师 + 多个数据抓取器". 替代方案:直接在 Engine 中内置 Worker 调度(耦合高,难以测试).
func NewTeam ¶
func NewTeam(cfg TeamConfig) *Team
NewTeam 创建 Team 实例, 同时布线 Leader 的 Agent Teams 收件箱.
精妙之处(CLEVER): Leader 的 IncomingInbox 在 NewTeam 里直接赋值 -- 调用方只需在构造 Engine 时不设 IncomingInbox (默认), 由 NewTeam 接管. 已设置的 IncomingInbox 会被保留 (允许 SaaS 场景注入自定义 Inbox 实现).
替代方案: <要求调用方显式调用 Engine.SetIncomingInbox()> - 否决: 多一步骤容易漏, 默认不接线的 Team 等于退化成 P0 单向模式.
func (*Team) ContextForLeader ¶
ContextForLeader 返回一个包装了 Leader TeammateMessageSender + TaskListProvider 的 context. 调用方在 Engine.Run 之前用这个 context 替换原 ctx, Leader 即可 调用 send_message / add_shared_task / list_shared_tasks / 等工具.
使用示例:
team := engine.NewTeam(teamCfg) ctx := team.ContextForLeader(context.Background()) ch := leaderEngine.Run(ctx, "协调 Worker 完成 X")
精妙之处(CLEVER): 把"给 Leader 绑定发件身份 + 共享清单"的复杂性藏在 Team 对象里 -- 调用方不需要知道 builtin.WithTeammateMessageSender 等底层 API, 也不需要手动构造 teamMessageSender.
func (*Team) RunWorkers ¶
func (t *Team) RunWorkers(ctx context.Context, specs []WorkerSpec) ([]WorkerResult, error)
RunWorkers 并发启动所有 Worker,等待全部完成(或 ctx 取消),返回结果列表.
执行流程:
- 为每个 WorkerSpec 创建 SubAgent(从 Leader 的 AgentRegistry 解析工具集)
- 并发 RunSync 所有 Worker
- 每个 Worker 完成后构建 task-notification XML,注入 Leader 消息队列
- 等待所有 Worker 完成(或 ctx 取消)
精妙之处(CLEVER): 用 WaitGroup 管理并发,用独立 goroutine 收集结果-- 不需要复杂的 fan-out/fan-in channel 设计. 替代方案:errgroup(需要在第一个错误时取消,但 Team 希望收集所有结果).
type TeamConfig ¶
type TeamConfig struct {
// LeaderEngine 是 Leader Agent 引擎
LeaderEngine *Engine
// PermissionHandler 消费层实现的权限确认(nil = 自动批准所有请求)
PermissionHandler PermissionHandler
// 非 nil 时, Worker/Leader 的 ctx 会携带 TaskListProvider, 四个 shared_task_*
// 工具可正常使用 (add_shared_task / list_shared_tasks / claim_shared_task /
// complete_shared_task). nil 时工具返回 "not in a Team" 错误.
//
// 跨行业扩展: 消费层可传入任意 Store 实现 (tasklist.New(myStore)) --
// 医疗用 HIPAA 合规存储, 金融用 PostgreSQL, 仓储用 WMS DB 等.
// 编程客户可用 tasklist.NewMarkdownStore 和 Anthropic Claude Code 互操作.
//
// 升华改进(ELEVATED): 可选字段 -- 不配置时 Team 退化为 P0 (Leader+Workers 单向
// 汇报), 配置后升级为 peer-to-peer + shared task board 完整 Agent Teams 体验.
SharedTaskList *tasklist.TaskList
}
TeamConfig 是创建 Team 的配置.
type TextDeltaEvent ¶
type TextDeltaEvent = flyto.TextDeltaEvent
type ThinkingDeltaEvent ¶
type ThinkingDeltaEvent = flyto.ThinkingDeltaEvent
type ThinkingEvent ¶
type ThinkingEvent = flyto.ThinkingEvent
type ThinkingOptions ¶
type ThinkingOptions struct {
// Enabled 是否启用
Enabled bool
// BudgetTokens thinking token 预算(0=不限制)
BudgetTokens int
}
ThinkingOptions 是 extended thinking 的配置选项.
type TokenBudgetManager ¶
type TokenBudgetManager struct {
// contains filtered or unexported fields
}
TokenBudgetManager 管理上下文窗口的 token 预算.
职责:
- 混合估算当前上下文占用量(精确锚点 + 粗估增量)
- 计算有效上下文窗口(扣除各种预留)
- 计算压缩阈值和警告阈值
- 模型切换时检测溢出
- 三种 token 计算函数(总量/计费/预算)
func NewTokenBudgetManager ¶
func NewTokenBudgetManager(registry *config.ModelRegistry, observer EventObserver) *TokenBudgetManager
NewTokenBudgetManager 创建 token 预算管理器.
func (*TokenBudgetManager) AutoCompactThreshold ¶
func (m *TokenBudgetManager) AutoCompactThreshold(model string) int
AutoCompactThreshold 计算自动压缩触发阈值. 阈值 = 有效窗口 - AutoCompactBufferTokens
func (*TokenBudgetManager) AutoCompactThresholdWithThinking ¶
func (m *TokenBudgetManager) AutoCompactThresholdWithThinking(model string, thinkingBudget int) int
AutoCompactThresholdWithThinking 计算扣除 thinking 后的自动压缩阈值.
func (*TokenBudgetManager) CalculateWarningState ¶
func (m *TokenBudgetManager) CalculateWarningState(tokenUsage int, model string) *TokenWarningState
CalculateWarningState 计算各种警告状态.
阈值层级(从松到紧):
- 自动压缩阈值:有效窗口 - 13K
- 黄色警告阈值:有效窗口 - 20K(还有 20K 空间)
- 红色警告阈值:有效窗口 - 20K + 20K = 有效窗口的 ~90%(紧急状态)
- 阻塞限制:100% 有效窗口
func (*TokenBudgetManager) EffectiveContextWindow ¶
func (m *TokenBudgetManager) EffectiveContextWindow(model string) int
EffectiveContextWindow 计算有效上下文窗口(扣除摘要输出预留).
有效窗口 = 模型窗口 - min(模型 maxOutput, MaxOutputTokensForSummary) 为什么要扣除?压缩时需要为摘要输出预留空间,否则压缩 API 调用本身会溢出.
func (*TokenBudgetManager) EffectiveContextWindowWithThinking ¶
func (m *TokenBudgetManager) EffectiveContextWindowWithThinking(model string, thinkingBudget int) int
EffectiveContextWindowWithThinking 计算扣除 thinking 预算后的有效窗口.
升华改进(ELEVATED): thinking token 从有效窗口中扣除. 如果 thinking budget 是 10K,实际可用上下文 = 200K - 10K = 190K. 不扣减的话压缩阈值会偏高,导致 thinking 和内容争抢窗口空间. 替代方案:不扣减,让 thinking 和内容自然竞争(原始实现,可能导致意外截断).
func (*TokenBudgetManager) EstimateCurrentUsage ¶
func (m *TokenBudgetManager) EstimateCurrentUsage(messages []query.Message) int
EstimateCurrentUsage 估算当前上下文窗口占用的 token 数.
精妙之处(CLEVER): 混合估算--最后一个 API 响应的精确 usage + 之后新增消息的粗估. 实现逻辑:
- 从后往前找最后一个有 Usage 的 assistant 消息
- 应用 sibling 回溯(并行工具调用的消息共享同一 api_response_id)
- 用 Usage 的 input_tokens + cache_creation + cache_read + output_tokens 作为锚点
- 锚点之后的消息用 tokenizer.EstimateTokens 粗估
- 总量 = 锚点 + 粗估
替代方案:纯粗估(整个消息列表用 tokenizer 估算,误差可达 30%).
func (*TokenBudgetManager) ManualCompactThreshold ¶
func (m *TokenBudgetManager) ManualCompactThreshold(model string) int
ManualCompactThreshold 计算手动压缩触发阈值(更激进). 阈值 = 有效窗口 - ManualCompactBuffer
func (*TokenBudgetManager) OnModelSwitch ¶
func (m *TokenBudgetManager) OnModelSwitch(oldModel, newModel string, currentUsage int) *TokenWarningState
OnModelSwitch 模型切换时检查当前用量是否超过新模型的阈值.
精妙之处(CLEVER): 切换模型时上下文窗口可能变化. 从 Opus (200K) 切到 Haiku (200K) 没事, 但如果切到某个只有 128K 窗口的模型,压缩阈值要跟着变. 立刻检查并通知消费层,避免下一轮 API 调用才发现溢出.
type TokenWarningState ¶
type TokenWarningState struct {
// PercentUsed 已使用百分比 (0-100)
PercentUsed int
// PercentLeft 剩余百分比 (0-100)
PercentLeft int
// IsAboveWarningThreshold 是否超过黄色警告阈值
IsAboveWarningThreshold bool
// IsAboveErrorThreshold 是否超过红色警告阈值
IsAboveErrorThreshold bool
// IsAboveAutoCompactThreshold 是否超过自动压缩阈值
IsAboveAutoCompactThreshold bool
// IsAtBlockingLimit reports whether usage has reached 100% of the effective
// window (remaining <= 0). Observability signal only — the engine does not
// refuse the next turn here; the pre-turn maybeCompact path (engine.go
// ShouldCompact + forceCompact) is the real enforcement. Consumers use
// this to render a blocking-level indicator; it strictly implies
// IsAboveErrorThreshold and IsAboveWarningThreshold.
//
// IsAtBlockingLimit 表示 token 用量已达有效窗口 100% (remaining <= 0).
// 仅为可观测信号 — 引擎并不在此处拒绝下轮, 真正的兜底是下轮 pre-turn
// maybeCompact (engine.go ShouldCompact + forceCompact). 消费层据此渲染
// 阻塞级指示符; 此字段严格蕴含 IsAboveErrorThreshold 和
// IsAboveWarningThreshold 为 true.
IsAtBlockingLimit bool
}
TokenWarningState 描述当前 token 用量的警告状态. 消费层根据此状态展示不同的 UI 指示(绿/黄/红/阻塞).
type ToolProgressEvent ¶
type ToolProgressEvent = flyto.ToolProgressEvent
type ToolResultEvent ¶
type ToolResultEvent = flyto.ToolResultEvent
type ToolResultPairingNormalizer ¶
type ToolResultPairingNormalizer struct {
Observer EventObserver // 可观测性接口(nil 安全)
StrictMode *StrictMode // 严格模式(nil 安全)
}
ToolResultPairingNormalizer 完整的 tool_use / tool_result 配对修复器.
Priority: 8(在 OrphanToolResultRemover(10) 之前执行). 精妙之处(CLEVER): 必须在 OrphanToolResultRemover 之前-- 本步骤先注入合成 tool_result(case 1),然后 OrphanToolResultRemover 不会误删它们(因为此时已有配对的 tool_use). 如果顺序反过来,OrphanToolResultRemover 会先跑, 本步骤注入的合成 tool_result 不会被二次检查.
func (*ToolResultPairingNormalizer) Name ¶
func (n *ToolResultPairingNormalizer) Name() string
func (*ToolResultPairingNormalizer) Normalize ¶
func (n *ToolResultPairingNormalizer) Normalize(messages []query.Message) []query.Message
Normalize 执行 4 种配对修复.
func (*ToolResultPairingNormalizer) Priority ¶
func (n *ToolResultPairingNormalizer) Priority() int
type ToolSummaryEvent ¶
type ToolSummaryEvent = flyto.ToolSummaryEvent
type ToolSummaryGenerator ¶
type ToolSummaryGenerator struct {
// contains filtered or unexported fields
}
ToolSummaryGenerator 生成工具执行结果的简短摘要.
不硬编码模型 ID -- 通过构造参数传入 fast 模型 ID(如从 ModelRegistry 的 RoleFast 获取).
func NewToolSummaryGenerator ¶
func NewToolSummaryGenerator(provider flyto.ModelProvider, modelID string) *ToolSummaryGenerator
NewToolSummaryGenerator 创建一个工具摘要生成器.
provider 是模型提供商(flyto.ModelProvider 接口). modelID 是用于生成摘要的 fast 模型 ID(如 "claude-haiku-..."). 由调用方从 ModelRegistry 的 RoleFast 获取,不在此处硬编码.
func (*ToolSummaryGenerator) GenerateSummary ¶
func (g *ToolSummaryGenerator) GenerateSummary(ctx context.Context, toolName string, input string, output string) string
GenerateSummary 生成工具执行结果的摘要.
如果输出很短(<100 字符),直接截取作为摘要. 否则调用 fast 模型生成摘要. 失败时返回 toolName 作为 fallback.
func (*ToolSummaryGenerator) GenerateSummaryAsync ¶
func (g *ToolSummaryGenerator) GenerateSummaryAsync(ctx context.Context, toolName, toolID, input, output string, ch chan<- Event)
GenerateSummaryAsync 异步生成摘要,通过 channel 返回结果. 不阻塞主流程.
升华改进(ELEVATED): 使用独立 context 而非直接复用外部 ctx-- 外部 ctx 通常绑定到"一次工具调用"的生命周期,工具返回后 ctx 立即取消; 如果 AI provider 不支持流式取消,goroutine 会在 provider.Stream 上永久阻塞. 独立 ctx 配合 30s 超时确保 goroutine 有明确的退出路径. 替代方案:<直接复用外部 ctx> - 否决:ctx 取消 → provider.Stream 阻塞不退出 → goroutine 泄漏.
type ToolUseEvent ¶
type ToolUseEvent = flyto.ToolUseEvent
type ToolUseInfo ¶
type ToolUseInfo struct {
Name string // 工具名(如 "Edit", "Write", "Bash")
FilePath string // file_path 参数(仅 Edit/Write 有值,其他工具为空)
}
ToolUseInfo 是工具调用的简化描述,供 RunSyncWithCallback 回调使用. 只暴露 Dream/MemoryExtraction 等消费方关心的字段,不暴露完整 Input map.
type ToolUseInputNormalizer ¶
type ToolUseInputNormalizer struct {
// InternalFields 需要从 tool_use input 中剥离的字段名.
// 如果为 nil,使用 DefaultInternalInputFields.
InternalFields []string
// Aliases 工具名别名映射(别名 -> 规范名).
// 如果为 nil,不做工具名替换.
Aliases map[string]string
}
ToolUseInputNormalizer 规范化 tool_use 块的输入.
func (*ToolUseInputNormalizer) Name ¶
func (n *ToolUseInputNormalizer) Name() string
func (*ToolUseInputNormalizer) Normalize ¶
func (n *ToolUseInputNormalizer) Normalize(messages []query.Message) []query.Message
func (*ToolUseInputNormalizer) Priority ¶
func (n *ToolUseInputNormalizer) Priority() int
type TraceObserver ¶
type TraceObserver = flyto.TraceObserver
type Transcript ¶
type Transcript struct {
// FormatVersion 是文件格式的 schema 版本号(int,从 1 开始).
// 对应迁移表 transcriptMigrations 的 key.
// 旧文件(缺少此字段)反序列化为 0,LoadTranscript 会将其规范化为 1.
FormatVersion int `json:"format_version"`
// EngineVersion 是写入此文件的引擎应用版本,仅用于 audit/排查.
// 不参与迁移逻辑.空字符串表示未知(旧文件).
EngineVersion string `json:"engine_version,omitempty"`
// SessionID 会话 ID
SessionID string `json:"session_id"`
// Model 使用的模型 ID
Model string `json:"model"`
// CreatedAt 会话创建时间
CreatedAt time.Time `json:"created_at"`
// UpdatedAt 最后更新时间
UpdatedAt time.Time `json:"updated_at"`
// Messages 完整的消息历史
Messages []query.Message `json:"messages"`
// Stats 会话统计信息
Stats TranscriptStats `json:"stats"`
}
Transcript 是会话记录,包含完整的对话历史和元数据. 序列化为 JSON 文件用于持久化.
版本兼容设计(INF-6):
FormatVersion - 格式 schema 版本(int,迁移用). 单调递增,每次 breaking change 时 bump. LoadTranscript 用此字段决定是否需要迁移. EngineVersion - 写入此文件的引擎应用版本(string,仅 audit 用). 格式如 "1.2.3",只用于问题排查,不参与迁移逻辑.
升华改进(ELEVATED): 早期实现 SerializedMessage.version 只存应用版本号(如 "1.2.3"), 无法区分"格式变了"和"应用版本升了".我们将两层分开:
FormatVersion int → schema 迁移用(int 比较一行,无需 semver) EngineVersion string → audit 用(人类可读)
替代方案:<只保留 Version string 存应用版本> - 否决原因:无法在不引入 semver 解析的情况下做格式迁移判断.
func LoadTranscript ¶
func LoadTranscript(path string) (*Transcript, error)
LoadTranscript 从 JSON 文件加载会话记录. 返回 Transcript 或错误(文件不存在,格式错误等).
用于 --resume 场景:从上次保存的位置恢复对话.
type TranscriptStats ¶
type TranscriptStats struct {
// TurnCount 总轮次数
TurnCount int `json:"turn_count"`
// TotalInputTokens 总输入 token 数
TotalInputTokens int `json:"total_input_tokens"`
// TotalOutputTokens 总输出 token 数
TotalOutputTokens int `json:"total_output_tokens"`
// TotalCostUSD 总花费(美元)
TotalCostUSD float64 `json:"total_cost_usd"`
}
TranscriptStats 是会话统计信息.
type TurnEndEvent ¶
type TurnEndEvent = flyto.TurnEndEvent
type TurnStartEvent ¶
type TurnStartEvent = flyto.TurnStartEvent
type UndoExecutor ¶
UndoExecutor 撤销执行器(Engine 提供). 升华改进(ELEVATED): 用接口而非函数--方便测试时 mock,也方便跨进程执行. 替代方案:直接传 func(简单但不可测试,不可序列化).
type WarningEvent ¶
type WarningEvent = flyto.WarningEvent
type WhitespaceAssistantFilter ¶
type WhitespaceAssistantFilter struct{}
WhitespaceAssistantFilter 过滤只包含空白文本的 assistant 消息.
精妙之处(CLEVER): 只过滤 assistant 消息.user 消息即使只有空白也可能 是有效输入(用户真的发了一个回车),而 assistant 的纯空白一定是 bug.
func (*WhitespaceAssistantFilter) Name ¶
func (f *WhitespaceAssistantFilter) Name() string
func (*WhitespaceAssistantFilter) Normalize ¶
func (f *WhitespaceAssistantFilter) Normalize(messages []query.Message) []query.Message
func (*WhitespaceAssistantFilter) Priority ¶
func (f *WhitespaceAssistantFilter) Priority() int
type WorkerResult ¶
type WorkerResult struct {
WorkerID string
AgentType string
Description string
Result string
Error error
Duration time.Duration
}
WorkerResult 是单个 Worker 的执行结果.
type WorkerSpec ¶
type WorkerSpec struct {
// AgentType 使用的 Agent 类型(从 LeaderEngine.agentRegistry 查找)
// 空字符串默认为 "general-purpose"
AgentType string
// Prompt 分配给该 Worker 的任务提示
Prompt string
// Description 任务描述(用于追踪)
Description string
// Model 覆盖默认模型("" = 使用 AgentDefinition 的 Model 或继承 Leader)
Model string
}
WorkerSpec 定义一个 Worker 的规格.
type WorktreeCleanup ¶
type WorktreeCleanup func() error
WorktreeCleanup 是 worktree 的清理函数类型. 调用此函数将删除 worktree 目录并从 git 中移除引用.
type WorktreeInfo ¶
type WorktreeInfo struct {
// Path 是 worktree 的绝对路径
Path string
// Branch 是 worktree 所在的分支名
Branch string
// RepoRoot 是原始仓库的根目录
RepoRoot string
}
WorktreeInfo 包含创建的 worktree 的信息.
Source Files
¶
- activity.go
- agent_def.go
- agent_executor.go
- agent_loader.go
- audit_local.go
- audit_observer.go
- backend.go
- context_calibrator.go
- doc.go
- dream.go
- dream_lock.go
- dream_lock_unix.go
- dream_prompt.go
- dream_task.go
- elicitation.go
- elicitation_adapter.go
- engine.go
- engine_ref.go
- errors.go
- event_emitter.go
- events.go
- fallback.go
- file_history.go
- file_scratchpad.go
- filecache.go
- flush_gate.go
- input.go
- mcp_proxy_tool.go
- migrate.go
- norm_attachment_reorder.go
- norm_consecutive_role.go
- norm_empty_message.go
- norm_error_content_strip.go
- norm_image_validator.go
- norm_orphan_thinking.go
- norm_orphan_tool_result.go
- norm_tool_input.go
- norm_tool_result_pairing.go
- norm_whitespace_assistant.go
- normalize.go
- normalizer.go
- observer.go
- operation_log.go
- plan.go
- plan_command_server.go
- plan_progress.go
- plan_queue.go
- plan_store.go
- query_chain.go
- query_source.go
- reminders.go
- result_store.go
- scratchpad.go
- secret_store.go
- session.go
- session_manager.go
- session_persist.go
- session_snapshot.go
- skill_def.go
- skill_loader.go
- strict.go
- subagent.go
- subagent_registry.go
- team.go
- token_budget.go
- tool_summary.go
- worktree.go