// Package engine 是 Flyto Agent Engine 的顶层入口包. // // 这是整个系统的核心:可嵌入的 Agent 引擎. // 任何消费层(CLI,SDK,HTTP Server,IDE 插件)都通过这个包与 Agent 交互. // // 设计原则: // - 零 UI 依赖:引擎不知道自己跑在终端还是 Web 服务里 // - 流式优先:所有输出通过 channel 推送,消费者自己决定怎么展示 // - 可组合:每个子系统(工具,权限,内存等)都是接口,可替换 package engine import ( "context" "encoding/json" "errors" "fmt" "math" "os" "path/filepath" "sort" "strings" "sync" "sync/atomic" "time" enginecache "git.flytoex.net/yuanwei/flyto-agent/internal/cache" "git.flytoex.net/yuanwei/flyto-agent/internal/mcp" "git.flytoex.net/yuanwei/flyto-agent/internal/tokenizer" "git.flytoex.net/yuanwei/flyto-agent/internal/transport" "git.flytoex.net/yuanwei/flyto-agent/internal/transport/retry" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" agentctx "git.flytoex.net/yuanwei/flyto-agent/pkg/context" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" "git.flytoex.net/yuanwei/flyto-agent/pkg/hooks" "git.flytoex.net/yuanwei/flyto-agent/pkg/inbox" "git.flytoex.net/yuanwei/flyto-agent/pkg/memory" "git.flytoex.net/yuanwei/flyto-agent/pkg/permission" "git.flytoex.net/yuanwei/flyto-agent/pkg/plugin" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" "git.flytoex.net/yuanwei/flyto-agent/pkg/security" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools/builtin" ) // maxInputChars 是用户输入的最大字符数(防御点 d: 超大输入截断). // // 精妙之处(CLEVER): 100K 字符 ≈ 25K tokens(中文约 1 char/token,代码约 4 chars/token)-- // 这个阈值能覆盖绝大多数合理输入,同时防止误操作(粘贴超大文件内容)导致的 API 超额计费. // 注意截断的是字节/字符边界而非 token 边界--字符截断更简单,无误差,且 100K 已有足够缓冲. // 替代方案:直接传给 API(超限时 API 返回 400,用户收到不友好的错误). const maxInputChars = 100_000 // Engine 是 Agent 引擎的主入口. // 核心查询引擎 + 会话管理 + 工具编排, // 剥离了所有 UI 和终端依赖. type Engine struct { cfg *Config // executor 是子进程启动器, 从 Config.Executor 持有. 传给 mcpMgr 等 // exec 消费者. 本地模式 DefaultExecutor, 云端模式 sandbox.Backend. // 方案 β 严格 DI: 引擎对执行环境零假设. 2026-04-15 M1 commit 4b-1 引入. executor execenv.Executor tools *tools.Registry perms permission.Checker mem memory.Store plugins *plugin.Host hooksMgr *hooks.Manager // mcpMgr manages the lifecycle of MCP server subprocesses declared by // plugins (plugin.json "mcpServers" field). One Manager per engine, // server keys namespaced by pluginMCPServerKeyPrefix to coexist with // user-configured MCP servers from settings.json. // // Why engine-owned and not plugin.Host-owned: plugin.Host is a pure // configuration container; spawning subprocesses and handling // reconnect loops is an engine concern. This keeps plugin.Host // dependency-free (no internal/mcp import, no subprocess lifecycle). // Replaced approach: - // rejected, would force plugin package to import internal/mcp and // conflate config with runtime state. mcpMgr *mcp.Manager fileCache *FileStateCache // 文件状态缓存(跟踪读过的文件,检测外部修改) resultStore *ResultStore // 大结果磁盘持久化(防止长输出撑爆上下文) dreamEngine *DreamEngine // AutoDream 记忆巩固引擎(可选,查询结束时触发) extractor memory.MemoryExtractor // 记忆提取器(可选,查询结束时异步提取) // extractState 记忆提取单飞状态,extractState.mu 保护所有字段. // 精妙之处(CLEVER): mutex 与它保护的字段物理共存-- // 违规访问(不加锁就读写字段)在 code review 时一眼可见. // 注:Go 不提供编译期强制,这是视觉强制,不是类型强制. // // 原始设计(已重构):extractMu/extractInProgress/extractPending 作为 Engine 顶层字段散列, // mutex 与被保护字段在 struct 中相距数行,所有权关系不可见. // // 精妙之处(CLEVER): 用 Mutex + bool 实现单飞(single-flight)+ 最新优先(stash)-- // 不用 golang.org/x/sync/singleflight 是因为我们需要"最新上下文"而非"第一个上下文": // 提取时若有新对话到来,应在当前提取完成后再用最新消息跑一次, // 而不是丢弃新消息.stash 只保留最后一次,中间的上下文被覆盖-- // 这和早期实现 pendingContext(overwrite)策略一致. extractState struct { mu sync.Mutex inProgress bool pending *extractionSnap // 等待中的后置补跑上下文 lastExtractTurn int // 上次提取时的轮数(用于触发判断) lastExtractMsgIdx int // 对话消息列表游标(hasMemoryWritesSince 用) } observer EventObserver // 可观测性接口(永不为 nil,未配置时为 NoopObserver) strictMode *StrictMode // 严格模式(nil 表示关闭) fileHistory *FileHistory // 文件操作历史(编辑/写入前自动备份) operationLog *OperationLog // 统一操作日志(支持按消息回滚) tokenBudget *TokenBudgetManager // Token 预算管理器(混合估算 + 多层阈值) // sessionState 会话管理,sessionState.mu 保护 sessions map. // 精妙之处(CLEVER): mutex 与它保护的字段物理共存--违规访问一眼可见. // 原始设计(已重构):mu(RWMutex) 和 sessions 作为顶层字段,所有权关系靠注释约定. sessionState struct { mu sync.RWMutex sessions map[string]*Session } // 活动追踪 activity *ActivityTracker // 会话活动追踪(心跳/空闲检测/refcount) // UDS Inbox 服务端(可选,EnableUDSInbox=true 时启用) // 升华改进(ELEVATED): 工具子进程通过 Unix Domain Socket 向引擎推送进度消息, // 引擎转发为 InboxMessageEvent 给观察者. // CLI 模式下默认启用;SDK 嵌入模式下可选. // 替代方案:轮询(浪费 CPU)或信号量(受限于进程边界). inboxSrv *inbox.UDSServer // 异步计划队列(可选,EnablePlanQueue=true 时启用,模块 20.3). // 升华改进(ELEVATED): fire-and-forget 执行模式-- // 提交计划后立即返回 planID,daemon 后台串行执行,任何时候都能通过 // PlanCommandServer 或直接读文件查询进度. // 区别于 SubAgent(同步 fork):适合"超大计划要干很久而发起方要继续干别的"的场景. // 替代方案:SubAgent 阻塞等待(Orchestrator goroutine 无法并行处理其他任务). planQueue *FilePlanQueue planCmdSrv *PlanCommandServer // 优雅关闭基础设施 // 精妙之处(CLEVER): root context 贯穿引擎生命周期-- // 所有 go func() 通过参数/闭包继承这个 context, // Close() 只需 cancel() 就能通知全部 goroutine 停止. // 不需要 WaitGroup 追踪每个 goroutine(侵入性太强). rootCtx context.Context // 引擎生命周期 context rootCancel context.CancelFunc // Close() 时调用 closed atomic.Bool // 防重入标志 // backend 封装双路径请求构建,替代主循环中的 Provider==nil 判断. // 升华改进(ELEVATED): 通过 EngineBackend 接口,engine.go 主循环不再感知 // "直连 Anthropic API" 和 "通过 flyto.Provider 接口" 两条路径的差异. backend EngineBackend // agentRegistry Agent 类型注册表(内置 4 种类型 + SDK 可扩展) // 升华改进(ELEVATED): Engine 直接持有注册表,子系统通过 SetupAgentExecutor 注入-- // 消费层可在 New() 之后,Run() 之前调用 engine.AgentRegistry() 注册自定义类型. // 替代方案:注册表作为全局变量(线程安全代价高,多 Engine 实例互相污染). agentRegistry *AgentRegistry // skillRegistry Skill 注册表(文件加载 + 代码内置) // 升华改进(ELEVATED): 与 agentRegistry 同等地位--绑定到 Engine 实例, // SaaS 多租户每个 workspace 独立的 Skill 集合,互不干扰. // 替代方案:全局 sync.Map(无法多租户隔离). skillRegistry *SkillRegistry // bundleRegistry PromptBundle 注册表(模块 15) // 升华改进(ELEVATED): 引擎级别持有,SDK 用户通过 RegisterPromptBundle() 注册行业 Bundle-- // 支持 SaaS 多租户下不同工作区使用不同 Bundle,互不影响. // 替代方案:<全局 Bundle 注册表> - 否决原因:多租户场景下租户 A 的 Bundle 改动会影响租户 B. bundleRegistry *agentctx.BundleRegistry // sectionRegistry Section 计算缓存(模块 15) // 精妙之处(CLEVER): 绑定到 Engine 实例,Reset() 在 /clear 或 /compact 时调用-- // 确保动态 sections(FLYTO.md,工具列表等)在新会话中重新计算, // 而不是跨会话复用旧内容. sectionRegistry *agentctx.SectionRegistry // toolSchemaTracker 逐轮追踪工具 Schema 变化,统计稳定性,优化工具缓存排序. // 升华改进(ELEVATED): 与 promptHashTracker 互补-- // promptHashTracker 追踪系统提示词(system 参数)的整体变化; // toolSchemaTracker 追踪工具列表(tools 参数)中每个工具的独立变化, // 并提供 StableFirstWithBoundary 排序让稳定工具缓存不受不稳定工具破坏. toolSchemaTracker *enginecache.ToolSchemaTracker // cachedToolDefs 是最近一次 buildToolDefs 输出的快照. // // 精妙之处(CLEVER): SubAgent fork 读此字段而非重调 buildToolDefs-- // buildToolDefs 内部会调用 toolSchemaTracker.Track(),推进 turnCount. // 若 SpawnSubAgent 重调,turnCount 额外+1,稳定性计算结果可能偏移, // 导致 cache_control 打到不同工具 → cache key 与父 engine 不一致 → cache miss. // 缓存方案:buildToolDefs 末尾写入(由 e.mu 保护),SpawnSubAgent 只读. // 替代方案:<拆出 buildToolDefsNoTrack 纯排序函数> // - 否决:调用方需手动维护"先 Track 后排序"的顺序,易遗漏;缓存方案更内聚. cachedToolDefs []api.ToolDef // promptHashTracker 跟踪系统提示词内容哈希,检测变化后重置 Prompt Cache. // // 升华改进(ELEVATED): 模块 22.2 - Prompt Cache 内容哈希刷新. // Anthropic Prompt Cache 按内容哈希命中,提示词内容变化时旧缓存自动失效. // 我们在引擎侧主动检测变化,变化时调用 ResetSectionCache(), // 让下一次 buildSystemPromptWithContext 触发完整重渲,生成新的缓存 slot. // 静态内容不变时 Check() 返回 false,跳过重置(SectionCache 继续命中). // 替代方案:<每次请求前强制 ResetSectionCache> // - 否决:SectionCache 含有 FLYTO.md I/O 等,无谓重置浪费 latency(通常 >1ms). promptHashTracker *enginecache.PromptHashTracker // notifState Team Worker 完成通知队列,notifState.mu 保护 pendingNotifications. // 精妙之处(CLEVER): mutex 与它保护的字段物理共存--违规访问一眼可见. // 通知以 XML 字符串队列暂存,runLoop 每轮开始时 drain-- // Worker goroutine 和 runLoop 主 goroutine 通过队列解耦,无需直接通信. // 替代方案:channel(容量有限,Worker 多时可能阻塞;select 逻辑复杂). // 原始设计(已重构):pendingTeamNotifications 和 notifMu 作为顶层字段,所有权靠注释约定. notifState struct { mu sync.Mutex pendingNotifications []string } // agentName Agent Teams 通讯中的身份 (对应 Config.AgentName, 消息 from 字段). // 空字符串 = 不参与 Teams. agentName string // incomingInbox Agent Teams peer-to-peer 收件入口 (对应 Config.IncomingInbox). // // 精妙之处(CLEVER): 复用 notifState 作为消息注入通道 -- runLoop 每轮开始先 drain // notifState (原有 team-notification XML) 再 poll incomingInbox (teammate-message XML), // 两类消息通过同一注入路径进入对话流, 模型以 XML tag 区分 ( 对 // Worker 完成, 对同伴发件). // // 替代方案: <独立 goroutine + 独立 channel> - 否决: 额外协程增加死锁面, Poll // 同步在 runLoop 里已足够, MVP 延迟上限可接受. incomingInbox inbox.Inbox // teamRouter is the Agent Teams message router, set by NewTeam on the Leader // Engine. Used by runLoop to send MsgPermissionResponse back to a Worker after // resolving a MsgPermissionRequest via cfg.TeamCfg.PermissionHandler. nil for // non-Team Engines -- runLoop's permission-bubble branch is then skipped. // // teamRouter 是 Agent Teams 消息路由器, NewTeam 设置到 Leader Engine 上. // runLoop 用它在通过 cfg.TeamCfg.PermissionHandler 解决 MsgPermissionRequest // 后, 发 MsgPermissionResponse 回 Worker. 非 Team Engine 为 nil -- // runLoop 的权限冒泡分支直接跳过. teamRouter *inbox.Router // teamPermissionHandler is the Leader-side consumer permission handler set by // NewTeam from TeamConfig.PermissionHandler. nil = auto-approve all // MsgPermissionRequest from Workers (matches TeamConfig.PermissionHandler nil // semantics: "auto-approve all requests"). // // teamPermissionHandler 是 NewTeam 从 TeamConfig.PermissionHandler 同步过来的 // Leader 侧消费层权限 handler. nil = 自动批准所有来自 Worker 的 // MsgPermissionRequest (对齐 TeamConfig.PermissionHandler nil 语义: "自动批准 // 所有请求"). teamPermissionHandler PermissionHandler // scratchpad 是引擎级别的键值暂存区(模块 18.3). // // 升华改进(ELEVATED): 接口化,支持两种后端-- // Config.ScratchpadDir="" : in-memory Scratchpad,生命周期绑定 Engine,GC 自动回收. // Config.ScratchpadDir 非空 : FileScratchpad,持久化到目录,多 Worker 跨进程共享. // Agent 通过 scratchpad_write/scratchpad_read/scratchpad_list 工具访问. // 替代方案:<全局 sync.Map> - 否决:多 Engine 实例(多租户 SaaS)下互相污染. scratchpad builtin.ScratchpadStore // secrets 是引擎级别的凭据注册表(模块 P0). // // 升华改进(ELEVATED): 两层 secret 架构-- // 引擎级(SetSecret):对所有 Run 生效,适合 SDK 嵌入模式(长期凭据). // 请求级(WithSecret RunOption):仅对单次 Run 生效,适合 HTTP API 模式(租户隔离). // runLoop 合并两层后,通过 context 注入 Bash 子进程 env,并对所有工具输出脱敏. // 替代方案:<让调用方在 prompt 里写凭据> - 否决:明文凭据进入 AI 上下文,不可接受. secrets *SecretStore // calibrator 上下文窗口自适应校准器. // // 升华改进(ELEVATED): ModelInfo.ContextWindow 是静态文档值,但 provider 的实际限制 // 可能因版本/区域/账号等级等而不同.calibrator 在运行时观测 context_too_long 错误, // 持久化校准记录,下次启动直接使用经过验证的有效窗口值. // 替代方案:<完全信任静态文档值> - 否决:不同区域 / 计费等级可能有不同的实际上限, // 静态值无法覆盖所有场景. calibrator *ContextWindowCalibrator } // Config 是引擎的配置. // 统一收敛所有可配置项到一个结构体. type Config struct { // Model 指定默认使用的模型 ID(向后兼容,设置 RoleMain) Model string // Models 模型注册表(如果为 nil,使用默认) // 通过角色系统管理不同用途的模型选择 Models *config.ModelRegistry // Cwd 是工作目录,工具执行的根目录 Cwd string // Executor 是子进程启动器, 必填. engine.New 校验 nil 返回 error. // // 本地模式: 消费方传 execenv.DefaultExecutor{} (零开销包装 os/exec). // 云端模式: platform 层传 sandbox.Backend{} 实现 microVM 隔离. // // 方案 β 严格 DI: 引擎对执行环境零假设, 所有 exec 子进程走此抽象. // 详见 platform/common/internal/sandbox/DESIGN.md section 7 + 7.2. // // 2026-04-15 M1 commit 4b-1 引入. Executor execenv.Executor // Tools 指定启用的工具列表,为空则启用所有内置工具 Tools []string // MaxTurns 限制单次 Run 的最大对话轮次,0 表示无限制 MaxTurns int // MaxBudgetUSD 限制单次 Run 的最大花费(美元),0 表示无限制 MaxBudgetUSD float64 // PermissionMode 权限模式:default / accept_edits / bypass / plan PermissionMode permission.Mode // PermissionHandler 自定义权限处理器,消费层实现 // 当工具需要用户批准时调用此函数 // 如果为 nil,则根据 PermissionMode 自动决策 PermissionHandler permission.Handler // PermissionTimeout 权限 Handler 调用超时(默认 5 分钟). // 0 表示使用 permission.DefaultPermissionTimeout. // // 升华改进(ELEVATED): 暴露到 Config 而非硬编码在引擎内部-- // 自动化测试场景可以设为 100ms(快速失败), // 企业审批流程可以设为 30min(等人工审批). // 替代方案:硬编码 DefaultPermissionTimeout(无法按场景覆盖). PermissionTimeout time.Duration // 升华改进(ELEVATED): 策略和处理器改为数组,支持多场景叠加. // 替代方案:单个 CompactionPolicy 和 PermissionHandler(锁定单场景). // CompactionPolicies 压缩策略列表(叠加为 CompositePolicy). // 为空时使用 DefaultCodePolicy. // 如果只有一个策略,不会包装 CompositePolicy,直接使用. CompactionPolicies []agentctx.CompactionPolicy // PermissionHandlers 权限处理器列表(叠加为 CompositeHandler). // 为空时使用 Config.PermissionHandler(向后兼容). PermissionHandlers []permission.NamedHandler // SystemPrompt 自定义系统提示词(覆盖默认) SystemPrompt string // AppendSystemPrompt 追加到默认系统提示词之后 AppendSystemPrompt string // Scenario 场景标识符,用于 PromptBundle 选择("programming"/"warehouse"/"medical" 等). // 空字符串 = "programming"(默认). // 引擎内置 claude+programming Bundle;其他场景需通过 RegisterPromptBundle 注册. Scenario string // ModelFamily 模型族标识符("claude"/"gpt"/"gemini"/"local" 等). // 空字符串时从 Model 字段推断(前缀匹配 "claude-" → "claude"). // 用于 PromptBundle 选择. ModelFamily string // Plugins declares plugin definitions that the engine registers at // construction time (engine.New) via plugin.Host.Load. Each definition // is a metadata triple (Name / Description / Source); it does NOT carry // a filesystem path to a plugin.json, so the registration is a // metadata-only placeholder -- skills, hooks, tools, and MCP servers // are NOT parsed from this path. For those, the sibling paths are: // - engine.LoadPlugin(dir): reads a plugin.json on disk and registers // all declared skills/hooks/tools/MCPServers. This is the primary // path for "I have a packaged plugin on the filesystem". // - plugin.Host.RegisterBuiltin: SDK consumers that want to ship a // plugin in-binary (builtin) call this before engine.New and the // host-wide LoadAll picks it up. // Config.Plugins is the narrow niche of "declaratively announce this // plugin's existence to the host registry without packaging it" -- // useful for SDK tests, bookkeeping, or consumers that want the host to // know about a plugin before a later LoadFromDir call materializes it. // // Lifecycle: definitions are consumed once in engine.New // (loadConfigPlugins), before syncPluginHooks / syncPluginTools / // syncPluginMCPServers so any subsequent plugin operation observes a // stable registry. The slice is read once and not watched for changes // after construction. // // Failure strategy: an empty Name produces a config_plugin_skipped // observer event and is discarded. A Host.Load failure (rare -- the // current implementation only writes to a map under lock) produces a // config_plugin_load_failed event and is not fatal -- a broken entry // must not prevent the engine from starting or other entries from // loading. // // Plugins 声明 engine.New 时要往 plugin.Host 注册的插件定义. 每个 // Definition 只是 metadata 三元组 (Name / Description / Source), 不 // 带 plugin.json 的文件系统路径, 因此这条路径只做 metadata 占位登记 // -- skills / hooks / tools / MCPServers 不会从这里解析. 要解析走 // 两条姊妹路径: // - engine.LoadPlugin(dir): 读磁盘上的 plugin.json 并注册里面声明 // 的 skills / hooks / tools / MCPServers. 文件系统里有打包 plugin // 时用这条. // - plugin.Host.RegisterBuiltin: SDK 消费方想把 plugin 内嵌进二进制 // (builtin), 在 engine.New 之前调这个, 随后 host 的 LoadAll 合并 // 进 registry. // Config.Plugins 是个窄场景: "声明式地让 host registry 知道有这个 // plugin, 但不打包". 用于 SDK 测试、bookkeeping, 或消费方希望在后续 // LoadFromDir 真正物化之前 host 就知道 plugin 存在的场合. // // 生命周期: engine.New 里 loadConfigPlugins 消费一次, 位置在 // syncPluginHooks / syncPluginTools / syncPluginMCPServers 之前, // 让随后任何 plugin 操作观察到一份稳定的 registry. slice 读一次, // 构造后不再 watch 变化. // // 失败策略: 空 Name 触发 config_plugin_skipped 事件并丢弃. Host.Load // 失败 (罕见, 当前实现仅是带锁 map 写入) 触发 config_plugin_load_failed // 事件且不 fatal -- 坏 entry 不能挡引擎启动或其他 entry 加载. Plugins []plugin.Definition // MCPServers declares MCP servers that the engine launches at construction // time (engine.New). Each entry becomes an mcp.Manager client under the // "config." namespace (see configMCPServerKey); its tools are wrapped as // mcpProxyTool and registered in the engine's tool registry with names of // the form "config:/". // // This is the SDK-main-process path for "I want to plug in an external // MCP server subprocess without packaging it as a plugin". The two // sibling paths with different scopes: // - plugin.Manifest.MCPServers: same subprocess style but declared // via plugin.json, lifecycle tied to plugin enable/disable. // - ExtraTools: same-process Go tools (tools.Tool interface) with no // IPC round-trip, no subprocess, no MCP client -- preferable when // the tool can be expressed in Go. // // Lifecycle: servers start once in engine.New (startConfigMCPServers) // and stop in engine.Close via mcpMgr.CloseAll. The slice is read once // and not watched for changes after construction. // // Failure strategy: a single server's connect/list-tools failure is an // observer event, not fatal -- a broken server must not prevent the // engine from starting or other servers from loading. // // MCPServers 声明 engine.New 时要拉起的 MCP server. 每个 entry 在 // mcp.Manager 里以 "config." 前缀建 client (见 configMCPServerKey), // 发现的 tool 包成 mcpProxyTool 注册到引擎 tool registry, 名字形如 // "config:/". // // 这是 SDK 主进程侧 "我想接一个 external MCP server subprocess, 但不 // 愿意打成 plugin 包" 的路径. 两条姊妹路径: // - plugin.Manifest.MCPServers: 同样是 subprocess, 但经 plugin.json // 声明, 生命周期随 plugin enable/disable. // - ExtraTools: same-process Go 工具 (tools.Tool 接口), 无 IPC 往返, // 无 subprocess, 无 MCP client -- 能用 Go 表达的工具首选这条路. // // 生命周期: engine.New 里 startConfigMCPServers 拉起一次, // engine.Close 里 mcpMgr.CloseAll 一并关闭. slice 读一次, 构造后不再 // watch 变化. // // 失败策略: 单个 server connect / list-tools 失败是 observer 事件, 不 // fatal -- 坏 server 不能挡引擎启动或其他 server 加载. MCPServers []config.MCPServerConfig // HooksConfig Hook 配置 HooksConfig *hooks.Config // EnableCaching 启用 prompt caching(静态系统提示会被标记为可缓存) // EnableCaching 启用 prompt caching(静态系统提示会被标记为可缓存) // 当 Provider==nil 时用于创建默认 AnthropicProvider; // 当 Provider!=nil 时由 Provider 自己实现(如 anthropic.New(Config{EnableCaching: true})). EnableCaching bool // Thinking extended thinking 配置 // 当 Provider==nil 时用于创建默认 AnthropicProvider; // 当 Provider!=nil 时通过 flyto.Request.NeedsThinking 控制. Thinking *ThinkingOptions // Effort 努力级别 ("low"/"medium"/"high"),空字符串表示不设置 // 通过 flyto.Request.Effort 传递给 Provider. Effort string // FastMode 启用快速模式(调整 max_tokens 默认值) // 通过 flyto.Request.FastMode 传递给 Provider. FastMode bool // JSONSchema 结构化输出的 JSON Schema(如果设置,约束模型输出) // 通过 flyto.Request.ResponseFormat 传递给 Provider. JSONSchema json.RawMessage // Fallback 模型降级配置(可选). // 当主模型 API 调用失败时,自动降级到备用模型. Fallback *FallbackConfig // MemoryExtractor 记忆提取器(可选). // 设置后,查询循环结束时会异步 fork SubAgent 执行记忆提取. MemoryExtractor memory.MemoryExtractor // Verbose 详细日志模式 Verbose bool // Observer 可观测性接口(可选). // 接收引擎运行时的结构化事件,指标和调用链. // 如果为 nil,使用 NoopObserver(零开销空实现). // // 升华改进(ELEVATED): Observer 是结构化事件流,与 Verbose 日志互补. // Verbose 是给开发者看的文本日志,Observer 是给监控系统消费的数据. // 替代方案:只用 Verbose + fmt.Fprintf(黑盒,无法被外部系统消费). Observer EventObserver // StrictMode 严格模式(可选). // 安全评估/测试环境开启,让引擎在检测到异常时 panic 而不是静默修复. // 如果为 nil,所有异常静默修复+记录. StrictMode *StrictMode // ActivityTracker 活动追踪器配置(可选). // 设置后启用心跳/空闲检测.nil 时不追踪. ActivityConfig *ActivityTrackerConfig // CloseTimeout 优雅关闭超时(默认 10s). // Close() 在 cancel context 后等待此时间让 goroutine 退出, // 然后强制清理剩余资源. // 仓储 daemon 场景可能需要更长(如 30s). CloseTimeout time.Duration // SecretGuard 秘密扫描器(可选). // FileWrite / FileEdit 工具写入前调用此扫描器,检测 API key,密码等敏感信息. // // 升华改进(ELEVATED): 默认行为"默认开,全路径保护"-- // 若调用方不设置此字段,New() 自动注入 DefaultSecretGuard(45条内置规则). // 若要关闭扫描(测试场景,已知安全的场景),显式传入 security.NoopSecretGuard{}. // 若要自定义规则(行业特有 key 格式),传入 NewSecretGuardWithRules(...). // // 精妙之处(CLEVER): nil 与 NoopSecretGuard 的区别-- // nil 触发默认注入(保护性行为),NoopSecretGuard 是显式关闭(必须有意为之). // 不可能因为"忘记设置"而静默绕过安全检查. // 替代方案: - 否决原因:遗漏配置等价于静默关闭安全,风险太高. SecretGuard security.SecretGuard // AuditSink 审计落地后端(可选). // 设置后,engine.New() 自动创建 AuditObserver 并与 Config.Observer 叠加, // 将 operation_recorded / secret_scan_blocked 事件翻译为审计条目写入此 sink. // // 升华改进(ELEVATED): 审计通过 Observer 模式接入,工具层无感知-- // FileWrite/FileEdit 只需触发事件,AuditObserver 在 Observer 链中接收并写入. // 零侵入性:不需要修改任何工具代码就能开启/关闭审计. // // nil 表示不开启审计.想开启本地审计(JSONL 文件),使用 NewLocalAuditSink(). // 替代方案:<在 OperationLog.Record 内部直接写 AuditSink> // - 否决原因:将审计落地耦合进操作日志,违反单一职责,且跨包导致循环导入. AuditSink security.AuditSink // AuditSessionID 审计会话 ID(可选). // 注入到每条 AuditEntry.SessionID 字段,用于多用户/多会话场景区分来源. // CLI 单用户场景可以为空(audit.jsonl 里会显示为空字符串). AuditSessionID string // AuditIncludeToolInput 决定是否把工具调用的 Input 参数写入 AuditEntry.Extra["tool_input"]. // // 默认 false (早期实现的 beta 也默认关, 要求双 env 才开). // 开启前提: Input 已由 engine 层经 SecretStore.Redact 脱敏, 所以已注册的 secret 值 // 会替换为 [SECRET:NAME]. 未注册的明文密码/PII 仍会进入审计日志, 调用方自担风险. // // 场景: 合规审计 (HIPAA/SOC2) 要求留存完整请求载荷做 forensics. // 单用户 CLI 场景保持默认关闭更安全. // // 替代方案: <引入可插拔 InputRedactor 接口 + SecretGuard regex 兜底> - // 否决原因: SecretStore.Redact 已覆盖用户注册的所有凭据, 再叠一层 regex 脱敏边际收益 // 很低, 且违反 CLAUDE.md 原则 10 "叠加而非替换" (不应另造 redact 层, 应复用). AuditIncludeToolInput bool // AuditInputMaxBytes 限制 Extra["tool_input"] 单条字节数上限, 超出会截断并加 // Extra["tool_input_truncated"]="true". <=0 表示不截断 (不推荐, 单条审计条目可能爆表). // 仅当 AuditIncludeToolInput=true 时生效. 建议值 4096 (早期方案 truncateContent 默认). AuditInputMaxBytes int // FreshnessConfig 记忆新鲜度警告配置(可选). // nil = 不启用新鲜度功能(UpdateIndex 回落到早期方案 30 天硬编码,CheckMemoryRelevance 不注入警告). // 启用:传 &memory.FreshnessConfig{GlobalThreshold: 24 * time.Hour} // 或使用便捷函数:memory.DefaultFreshnessConfig() 然后取地址. // // 升华改进(ELEVATED): 早期实现 无任何新鲜度提示,只有 MEMORY.md 里的静态年龄注记. // 模型无法根据静态注记判断"这条记忆当前对话是否仍然有效". // 我们在 CheckMemoryRelevance 注入时加 FreshnessNote,让模型在看到记忆内容的 // 同时收到"此记忆已 N 天未更新,请核实"的提醒. // 替代方案:<在 Entry.Content 里追加警告文本> - // 否决原因:污染记忆内容本身,影响提取/检索评分. FreshnessConfig *memory.FreshnessConfig // MemoryScorer 自定义记忆相关性评分器(可选). // nil 时走默认 TextScorer (Jaccard 文本相似度). // 替代算法场景(嵌入向量/BM25)传入自定义实现. MemoryScorer memory.RelevanceScorer // MemoryTypeRegistry 自定义记忆类型注册表(可选). // nil 时用 memory.DefaultTypeRegistry (user/feedback/project/reference 四类型). // 多租户或行业特化场景可注册自定义类型(如 medical/legal/warehouse). MemoryTypeRegistry *memory.MemoryTypeRegistry // MemoryStrictSymlink 启用记忆文件严格符号链接保护(可选). // false(默认): 检测到符号链接记日志但放行(兼容 NFS/Docker/WSL). // true: 检测到符号链接拒绝操作(服务端部署/CI 环境推荐). MemoryStrictSymlink bool // MemorySyncAdapter 记忆远程同步适配器(可选). // nil 时不同步(纯本地). 团队协作场景传 memory.NewGitSyncAdapter(...). MemorySyncAdapter memory.SyncAdapter // MemorySyncConfig 同步策略配置(仅 MemorySyncAdapter 非 nil 时生效). MemorySyncConfig memory.SyncConfig // ElicitationHandler 处理 MCP 服务器发出的用户输入请求(可选). // 对应 MCP 2025-03-26 规范的 elicitation/create server-to-client 请求. // // 升华改进(ELEVATED): 早期实现 无此接口--遇到服务器主动请求时直接挂起. // 设置后,MCP Manager 会在 Initialize 时将此 handler 注入到每个 Client, // dispatchLoop 识别到 server-to-client 请求时路由到此 handler 处理. // // nil 时自动使用 NoopElicitationHandler(返回 cancel,不阻塞,不 panic). // 替代方案:<全局变量注册> - 否决:多 Engine 实例下每个引擎可以有不同的 UI 处理器. ElicitationHandler ElicitationHandler // EnableUDSInbox 启用 Unix Domain Socket Inbox(默认 false). // 启用后,引擎创建 UDS 服务端(路径由 resolveSockPath 决定,优先用户私有目录), // 通过 FLYTO_SESSION_SOCK 环境变量注入到所有工具子进程. // 工具子进程可向此 socket 发送 JSON 消息(进度,日志等), // 引擎接收后转为 InboxMessageEvent 推送给观察者. // // CLI 模式下建议设为 true;SDK 嵌入/无 shell 环境下可保持 false. // // 精妙之处(CLEVER): 默认 false 确保向后兼容-- // 现有 SDK 用户不受影响,不会多出意外的 socket 文件. // 只有明确需要工具进度报告的场景才开启. // 替代方案:默认 true(更多功能但有副作用,/tmp 多文件,老用户可能困惑). EnableUDSInbox bool // UDSInboxSessionID 是 UDS socket 文件名的 sessionID 部分(可选). // 空字符串时自动生成(基于时间戳+随机数). // 显式设置:测试中可固定 socket 路径;多进程场景可通过外部协调分配路径. // // 升华改进(ELEVATED): sessionID 与 Engine 生命周期绑定(不是 Session.ID)-- // 一个 Engine 实例有且仅有一个 UDS 路径,与会话数量无关. // 替代方案:每个 Session 独立 UDS(socket 文件爆炸,管理复杂). UDSInboxSessionID string // EnablePlanQueue 启用异步计划队列(默认 false). // 启用后: // 1. 引擎在 ~/.flyto/plans/ 创建 FilePlanQueue,daemon 重启时自动 RecoverPending. // 2. 引擎启动 PlanCommandServer(UDS 路径由 resolvePlanSockPath 决定), // 通过 FLYTO_PLAN_SOCK 环境变量注入,供外部进程提交/查询/取消计划. // // 适用场景:daemon 模式(长期驻留),需要 fire-and-forget 执行超大计划. // CLI 单次调用模式无需启用(每次调用都是独立进程). // // 精妙之处(CLEVER): 与 EnableUDSInbox 分开控制-- // UDSInbox 是工具进度上报(从工具子进程 → 引擎), // PlanQueue 是计划异步执行(从外部客户端 → daemon 引擎). // 两者虽然都用 UDS,但方向和语义完全不同,应独立开关. EnablePlanQueue bool // PlanQueueDir 是计划状态文件的存储目录(可选). // 空字符串使用默认路径 ~/.flyto/plans/. // 显式设置主要用于测试隔离或自定义数据目录. PlanQueueDir string // PlanQueueSessionID 是 PlanCommandServer socket 文件名的 sessionID 部分(可选). // 空字符串时自动生成(基于时间戳).与 UDSInboxSessionID 分开是因为两个服务独立. PlanQueueSessionID string // SharedSystemPromptBytes 是父 Engine fork SubAgent 时传入的已渲染系统提示词字节(可选). // // 精妙之处(CLEVER): Anthropic Prompt Cache 按内容字节哈希命中-- // 父子 Agent 如果分别渲染系统提示词,哪怕逻辑相同, // 任何微小差异(时间戳注入,随机 ID,空格)都会导致哈希不同,无法命中同一缓存 slot. // 父 Engine 将已发送给 API 的字节直接传给 SubAgent, // SubAgent 用完全相同的字节发送请求,100% 命中缓存. // // 设置规则: // nil = 不共享(SubAgent 独立渲染,向后兼容旧行为). // 非nil = 优先使用此字节,跳过 buildSystemPromptWithContext 渲染. // // 注意:此字段由引擎内部(SpawnSubAgent)设置,SDK 用户通常不需要手动填写. // 替代方案:<每次都独立渲染> // - 否决:父子 Agent 分别渲染导致缓存 miss,每个 SubAgent 首轮都要付完整系统提示费用. SharedSystemPromptBytes []byte // IsOrchestrator 标记此 Engine 是否运行在协调器(Orchestrator)模式. // // 升华改进(ELEVATED): 协调器模式是"多 Agent 拓扑"的中间层角色-- // 它不直接完成叶子任务,而是分解任务,分派 SubAgent,合并结果. // 开启此标志后,系统提示词会注入协调器专用指导段落, // 告知模型如何并行分派任务,如何检查一致性,如何处理子代理失败. // // 精妙之处(CLEVER): bool 标志而非字符串角色-- // IsOrchestrator=true 精确表达"我是协调器",消除歧义. // 如果将来需要更多角色("peer"/"leaf"),可扩展为 AgentRole string, // 当前阶段 bool 足够且最简单. // 替代方案:<在 SystemPrompt 里手动追加协调器指导文字> // - 否决:手动追加无法被 SectionRegistry 缓存,每轮都重算; // 且散落在调用方代码里难以统一维护. IsOrchestrator bool // ScratchpadDir 指定文件持久化 Scratchpad 的目录路径(模块 18.3). // // 升华改进(ELEVATED): 早期实现 无持久化 Scratchpad--所有暂存数据绑定单个进程. // 我们通过此字段启用跨进程共享(FileScratchpad 替换 in-memory Scratchpad): // - ScratchpadDir=""(默认):使用 in-memory Scratchpad,生命周期绑定 Engine // - ScratchpadDir 非空:使用 FileScratchpad,数据持久化到目录,支持跨进程共享 // 跨行业应用: // 仓储调度:多个 picking-robot Worker 进程共享"已分配库位"暂存数据 // 金融对账:并发 reconcile Worker 共享"已处理批次 ID 集合" // 替代方案: - 否决:破坏零外部依赖;文件系统已满足最终一致性需求. ScratchpadDir string // ExtraTools 是额外注册的工具(通过 --tools-schema 或 SDK 注入). // // 升华改进(ELEVATED): 叠加而非替换--ExtraTools 在内置工具注册完成后追加, // 支持"内置工具 + 外部工具"并存.SDK 用户可通过此字段注入任何实现了 tools.Tool 接口的工具, // 无需 fork 引擎源码. // // 跨行业扩展: // ExecTool(外部进程工具)是最常见的注入方式--将任意 CLI 程序包装为 Agent 工具. // 也可以注入纯 Go 实现的定制工具(如特定行业 SDK 封装). // // 与 cfg.Tools 的交互: // cfg.Tools 非空时会过滤内置工具,但 ExtraTools 不受此过滤-- // 调用方既然显式注入了,就一定是想启用的. // // 替代方案:<通过 Plugin 系统注入> - Plugin 机制更重(需要实现 plugin.Definition 接口 + 注册), // ExtraTools 对简单场景(注入少量工具)代价更低. ExtraTools []tools.Tool // Provider 指定模型提供者(可选). // // 升华改进(ELEVATED): 当 Provider 非 nil 时,引擎通过 flyto.ModelProvider 接口 // 而非 internal/api/client.go 直接调用 API-- // - 支持任何实现 flyto.ModelProvider 的 provider:Gemini/OpenAI/Ollama/本地模型等 // - provider 自己处理鉴权,SSE 解析,思考模式等细节,引擎与 API 格式解耦 // - nil 时回退到 internal/api 路径(保持对 Anthropic 原生 API 的完整向后兼容) // // 使用示例: // // cfg := &engine.Config{ // Provider: gemini.New(gemini.Config{APIKey: os.Getenv("GEMINI_API_KEY")}), // } // // 注意:Provider 路径目前不支持 Anthropic 专有功能(cache_control 分块,ExtendedThinking 配置). // 需要这些功能时,使用 nil(默认 api.Client 路径)+ Anthropic provider 的对应配置. // 替代方案:<强制所有 provider 都走 flyto.ModelProvider 路径> - // 否决:Anthropic 路径有独特的 beta headers,cache_control 分块等, // 需要 api.Client 特定功能,无法通过 flyto.Request 表达. Provider flyto.ModelProvider // AgentName 是此 Engine 在 Agent Teams 通讯中的名称 (peer-to-peer 消息的 from 字段). // 空字符串时 = 不参与 Teams 通讯 (独立运行 / Leader 单机模式). // // 升华改进(ELEVATED): AgentName 解耦"Engine 实例"与"Team 成员身份"-- // 同一个 Engine 实例在不同 Team 中可用不同名字 (通过 Config 新建 Engine), // Team 内成员名字由消费层决定 (code-reviewer / analyst / diagnoser 等跨行业语义). // // 跨行业扩展: 金融场景 "risk-analyst-1", 医疗场景 "triage-doctor", 仓储场景 "wave-planner" // 都是合法值, 引擎不解释此字段的含义, 只作为消息路由 key. // // 替代方案: <把 name 硬绑到 session_id> - 否决: 一个 session 可能涉及多个 agent 身份. AgentName string // IncomingInbox 是 Agent Teams peer-to-peer 通讯的收件入口. // nil = 不参与 Teams 通讯 (Engine 仍可正常运行, 只是无同伴消息). // // 升华改进(ELEVATED): 接口而非具体实现-- // MemoryInbox 用于单进程 dogfood / 测试, 跨行业客户可注入自己的 Inbox 实现 // (医疗合规加密存储 / 金融审计表 / SaaS 中间件桥接等). // // 消息由 Engine.runLoop 每轮开始时 Poll (非阻塞), 包装成 XML // 注入到对话流, 模型下一轮可自然读到同伴消息. // // 设计权衡: Poll 而非 goroutine + Recv 阻塞-- // Recv 需要独立 goroutine 和 channel 桥接到 runLoop, 复杂度高且死锁面大; // Poll 和 runLoop 同步, 无额外 goroutine, 延迟上限 = 一轮对话时间 (可接受). // // 替代方案: <每条消息起独立 goroutine enqueueTeamNotification> - // 否决: 额外协程和锁, 对 MVP 场景不必要. IncomingInbox inbox.Inbox } // ThinkingOptions 是 extended thinking 的配置选项. type ThinkingOptions struct { // Enabled 是否启用 Enabled bool // BudgetTokens thinking token 预算(0=不限制) BudgetTokens int } // ModelForRole 获取指定角色对应的模型 ID. // 如果 Models 未初始化,使用默认注册表. // 所有角色 fallback 到 c.Model(主模型),不再区分 RoleMain 特殊处理. // 历史包袱(LEGACY): 早期方案对 RoleMain 有特殊分支--现在统一 fallback 到 c.Model, // 这意味着未配置 fast/thinking 角色时,它们默认使用主模型而非空字符串. func (c *Config) ModelForRole(role config.ModelRole) string { reg := c.ModelRegistry() modelID := reg.GetRole(role) if modelID == "" { return c.Model } return modelID } // ModelRegistry 获取模型注册表. // 如果 Models 未初始化,创建默认注册表并应用 Model 字段. func (c *Config) ModelRegistry() *config.ModelRegistry { if c.Models == nil { c.Models = config.NewModelRegistry() } // 如果 Model 字段被设置,同步到 RoleMain if c.Model != "" { c.Models.SetRole(config.RoleMain, c.Model) } return c.Models } // resolvePolicy 解析压缩策略配置. // 向后兼容: // - CompactionPolicies 有多个 → 叠加为 CompositePolicy // - CompactionPolicies 有一个 → 直接使用 // - CompactionPolicies 为空 → 使用 DefaultCodePolicy // // 使用示例: // // cfg := &Config{ // CompactionPolicies: []agentctx.CompactionPolicy{ // &agentctx.DefaultCodePolicy{}, // &myWarehousePolicy{}, // }, // } // policy := cfg.resolvePolicy() // → CompositePolicy // // resolvePolicy 解析压缩策略配置. // 升华改进(ELEVATED): 总是返回 CompositePolicy,即使只有一个策略. // 这样引擎拿到的策略永远支持运行时 Add,不会因为初始化时只传了一个就失去叠加能力. // 替代方案:单策略时直接返回(省一层包装,但丧失动态 Add 能力). func (cfg *Config) resolvePolicy() agentctx.CompactionPolicy { if len(cfg.CompactionPolicies) > 0 { return agentctx.NewCompositePolicy(cfg.CompactionPolicies...) } // 零配置 → CompositePolicy 内部 fallback 到 DefaultCodePolicy return agentctx.NewCompositePolicy() } // resolvePermissionHandler 解析权限处理器配置. // 向后兼容: // - PermissionHandlers 非空 → 叠加为 CompositeHandler.Handle // - PermissionHandlers 为空 + PermissionHandler 非 nil → 直接使用(向后兼容) // - 都为 nil → 返回 nil // // 使用示例: // // cfg := &Config{ // PermissionHandlers: []permission.NamedHandler{ // {Name: "cli", Handler: cliHandler, IsDecisionMaker: true}, // {Name: "audit", Handler: auditHandler, IsDecisionMaker: false}, // }, // } // handler := cfg.resolvePermissionHandler() // → CompositeHandler.Handle func (cfg *Config) resolvePermissionHandler() permission.Handler { if len(cfg.PermissionHandlers) > 0 { return permission.NewCompositeHandler(cfg.PermissionHandlers...).Handle } if cfg.PermissionHandler != nil { return cfg.PermissionHandler } return nil } // New 创建一个新的 Flyto Agent Engine 实例. // 这是消费层唯一需要调用的构造函数. func New(cfg *Config) (*Engine, error) { // ── 1. 验证 ────────────────────────────────────────────────────────────── // 初始化模型注册表(确保后续代码可以使用角色系统) _ = cfg.ModelRegistry() if cfg.Model == "" { cfg.Model = cfg.ModelForRole(config.RoleMain) } if cfg.Cwd == "" { return nil, fmt.Errorf("flyto-agent: Cwd is required") } // Executor 是方案 β 严格 DI 的注入点, 本地模式传 execenv.DefaultExecutor{}, // 云端模式传 sandbox.Backend{}. 不做 nil fallback - 强制消费方显式决策. if cfg.Executor == nil { return nil, fmt.Errorf("flyto-agent: Config.Executor is required (use execenv.DefaultExecutor{} for local mode)") } // 升华改进(ELEVATED): Provider 和 Model 都是必填--引擎完全领域无关, // 不预设任何供应商或模型.便利函数(如 QuickStart)在 SDK 层提供. // 替代方案: - 否决:引擎核心耦合 Anthropic. if cfg.Provider == nil { return nil, fmt.Errorf("flyto-agent: Provider is required; use anthropic.New(...), openai.New(...), or other providers") } if cfg.Model == "" { return nil, fmt.Errorf("flyto-agent: Model is required; set Config.Model (e.g., \"claude-sonnet-4-6\", \"gpt-4o\")") } // ── 2. 全局注入 ──────────────────────────────────────────────────────────── reg := cfg.ModelRegistry() // 精妙之处(CLEVER): 通过函数回调注入依赖,打破 engine → context → config 的循环引用. // context 包需要查询模型窗口大小,但不能直接 import config 包(否则和 engine 循环依赖). // 用全局回调函数作为依赖注入点,虽然不如接口优雅但避免了大范围重构. agentctx.SetContextWindowProvider(func(model string) int { return reg.ContextWindow(model) }) agentctx.SetCompactModelProvider(func() string { return cfg.ModelForRole(config.RoleFast) }) // ── 3. 构建各子系统 ──────────────────────────────────────────────────────── guard := buildSecretGuard(cfg) fileCache := buildFileCache() perms := buildPerms(cfg) hooksMgr := buildHooksMgr(cfg) pluginHost := plugin.NewHost(cfg.Executor) resultStore := buildResultStore() // L1224: buildDreamEngine 调用下移到 observer/activity/rootCtx 就绪之后(见下方), // 不再在此处构造--DreamEngine 现在直接持有这三项依赖而非通过 parentEngine 间接访问. observer, auditObs := buildObserver(cfg) injectGuardCallback(guard, observer) fileHistory := buildFileHistory(cfg.Cwd, observer) // L1282 (2026-04-14): AI 记忆选择器构造提前 - 原位于第 5 步后置回填 (memory.InjectMemorySelector). // aiSel 仅依赖 cfg.Provider + model role (两者在 L659 cfg.ModelRegistry() 已就绪), // 零 eng 字段依赖, 可安全提前到 mem 构造之前, 用 WithMemorySelector 构造时注入. // 历史包袱(LEGACY): Provider==nil (旧路径) 不支持 AI 记忆选择, 保持原判断逻辑. var aiSel memory.MemorySelector if cfg.Provider != nil { aiModel := cfg.ModelForRole(config.RoleFast) // Fast 模型做 side query, 节省成本 if aiModel == "" { aiModel = cfg.ModelForRole(config.RoleMain) } if aiModel != "" { aiSel = memory.NewAIMemorySelector(makeMemoryQueryFn(cfg.Provider, aiModel)) } } // L1282 (2026-04-14): mem 构造下移到 observer/fileHistory/aiSel 就绪之后. // buildMemory 直接持有 observer + fileHistory + aiSelector 三项依赖, // 通过 WithObserver/WithFileHistory/WithMemorySelector 构造时注入, // 不再通过 memory.InjectBackupper / memory.InjectMemorySelector 后置注入. // // 原设计动机溯源 (前人不是吃饱了饭): // InjectBackupper 原意是 "Store 接口不暴露 setter + 支持 mem 先于 fileHistory 构造". // 但 "mem 先建" 是偶然演化 (mem/observer 都只依赖 cfg, 构造顺序自由), // 调整构造顺序后 WithFileHistory 同样不污染 Store 接口, 原动机被完全覆盖. mem := buildMemory(cfg, guard, observer, fileHistory, aiSel) scratchpad, err := buildScratchpad(cfg) if err != nil { return nil, err } registry, err := buildToolRegistry(cfg, fileCache, fileHistory, scratchpad) if err != nil { return nil, err } // L1191 修复: 注册表就绪后回填 AuditObserver, 启用工具自描述的 AuditOperation 查询. // 精妙之处(CLEVER): 顺序上 buildObserver 先于 buildToolRegistry (见 buildObserver // 注释中的依赖说明), 所以用后置 SetToolRegistry 而非构造参数.auditObs 为 nil // (未配置 cfg.AuditSink) 时跳过, 不 panic. if auditObs != nil { auditObs.SetToolRegistry(registry) } operationLog := buildOperationLog(observer) tokenBudget := buildTokenBudget(reg, observer) activity := buildActivity(cfg, observer) agentRegistry := buildAgentRegistry() bundleRegistry, sectionRegistry := buildBundleAndSectionRegistry(observer) promptHashTracker, toolSchemaTracker := buildCacheTrackers() rootCtx, rootCancel := context.WithCancel(context.Background()) // L1224: DreamEngine 构造时注入 observer/activity/rootCtx - 这三项依赖必须在此处就绪. // parentEngine 仍需 post-fill(见下方 L749),因为 eng 还没组装完,ForkSubAgent 需要完整 Engine. dreamEngine := buildDreamEngine(mem, reg, observer, activity, rootCtx) // ── 4. 组装 Engine ──────────────────────────────────────────────────────── eng := &Engine{ cfg: cfg, executor: cfg.Executor, tools: registry, perms: perms, mem: mem, plugins: pluginHost, hooksMgr: hooksMgr, mcpMgr: mcp.NewManager(cfg.Executor), fileCache: fileCache, resultStore: resultStore, dreamEngine: dreamEngine, extractor: cfg.MemoryExtractor, observer: observer, strictMode: cfg.StrictMode, fileHistory: fileHistory, operationLog: operationLog, tokenBudget: tokenBudget, sessionState: struct { mu sync.RWMutex sessions map[string]*Session }{ sessions: make(map[string]*Session), }, activity: activity, rootCtx: rootCtx, rootCancel: rootCancel, agentRegistry: agentRegistry, bundleRegistry: bundleRegistry, sectionRegistry: sectionRegistry, promptHashTracker: promptHashTracker, toolSchemaTracker: toolSchemaTracker, scratchpad: scratchpad, secrets: newSecretStore(), calibrator: newContextWindowCalibrator(cfg.Cwd), agentName: cfg.AgentName, incomingInbox: cfg.IncomingInbox, } eng.backend = eng // Engine 实现 EngineBackend,自引用 // MCP elicitation handler wiring: must happen before mcpMgr.ConnectAll so // the handler is in place when the MCP server's first elicitation/create // arrives during its initialize phase. nil cfg.ElicitationHandler still gets // the NoopElicitationHandler adapter (auto-cancel) -- previously the field // was a no-op silent dead, now nil semantics are explicit and observable. // // MCP elicitation handler 接线: 必须在 mcpMgr.ConnectAll 之前完成, 这样 // MCP server initialize 阶段第一个 elicitation/create 到达时 handler 已就位. // nil cfg.ElicitationHandler 仍会接 NoopElicitationHandler adapter // (auto-cancel) -- 之前字段是 no-op silent dead, 现在 nil 语义显式可观测. eng.mcpMgr.SetElicitationHandler(adaptElicitationHandler(cfg.ElicitationHandler, eng.observer)) // ── 5. 后置回填(鸡蛋问题)────────────────────────────────────────────────── // 精妙之处(CLEVER): 先构造子系统,再让子系统反向持有 eng 引用-- // 两处都需要 eng 就绪才能绑定,不能在 struct literal 里完成. eng.skillRegistry = newSkillRegistry(eng) // Wire Skill.AgentType resolution and skill_invoked / skill_fork_unknown_agent_type // observer events. Separate setters (not extending newSkillRegistry signature) // so the 16 existing newSkillRegistry(nil) test call sites stay untouched. // // Wire Skill.AgentType 解析和 skill_invoked / skill_fork_unknown_agent_type // 观测事件. 用独立 setter 而非扩 newSkillRegistry 签名, 保持现有 16 处 // newSkillRegistry(nil) 测试调用不动. eng.skillRegistry.SetAgentRegistry(eng.agentRegistry) eng.skillRegistry.SetObserver(eng.observer) eng.skillRegistry.SetParentToolNames(func() []string { return eng.tools.Names() }) dreamEngine.parentEngine = eng // L1282 (2026-04-14): INF-AI-MEM 块已上移到 mem 构造之前 (第 3 节). // AI selector 现在通过 buildMemory 参数 + memory.WithMemorySelector 构造时注入, // 不再通过 memory.InjectMemorySelector 后置注入 (该函数已删). // ── 6. 可选 IPC 子系统 ──────────────────────────────────────────────────── eng.initInbox(cfg, rootCtx) eng.initPlanQueue(cfg) // ── 7. 插件 hooks 同步 ──────────────────────────────────────────────────── // 精妙之处(CLEVER): 在全局 hooks(来自 HooksConfig)注册之后调用-- // hooks.Manager.Execute 按注册顺序执行,全局 hooks 先于插件 hooks, // 保证用户的安全拦截脚本不会被插件 hooks 抢先运行. // 如果没有启用插件,syncPluginHooks 是 no-op(两个 nil guard). // // loadConfigPlugins runs first so any definitions from Config.Plugins // land in the host registry before the three sync* passes observe it. // Keeping the ordering explicit guards against future refactors that // might reorder eng.New construction steps. // // loadConfigPlugins 先跑, 保证 Config.Plugins 里的定义在三次 sync* // 观察 registry 之前就落位. 顺序显式写出来防止将来 refactor 重排 // engine.New 构造步骤时把这条先后关系弄丢. eng.loadConfigPlugins() eng.syncPluginHooks() eng.syncPluginTools() eng.syncPluginMCPServers() // SDK-declared MCP servers (engine.Config.MCPServers) are wired after // plugin-owned ones. Namespace prefixes ("config." vs "plugin.") keep // the two paths isolated; order is arbitrary but plugin-first matches // the order in which the subsystems were added, making log timelines // easier to read. // // SDK 声明的 MCP server (engine.Config.MCPServers) 在 plugin-owned 之后 // wire. 命名空间前缀 ("config." / "plugin.") 隔离两条路径, 顺序本身不 // 敏感, 先 plugin 再 config 只为日志时间线跟子系统添加顺序一致, 读起来 // 方便. eng.startConfigMCPServers() // Verbose-only startup snapshot. This is the read-site that wires // Config.Verbose out of dead-field status: before 07fe345-followup the // field was defined on the SDK-facing Config but engine.New never // consulted it. Deliberately a NEW event rather than gating any // existing Event on Verbose -- observers may already depend on those // events firing unconditionally, so adding a gate would be a silent // behavior change for downstream consumers. // // Verbose-only 启动快照. 本处是把 Config.Verbose 从死字段挖出来的 // 读取点: 本次 followup 之前, 此字段定义在 SDK 面向的 Config 上但 // engine.New 从未读过它. 故意新加事件, 不给已有事件挂 Verbose gate // -- observer 侧可能已经依赖那些事件无条件 emit, 加 gate 会是 // 对下游消费方的静默行为变化. if eng.cfg.Verbose && eng.observer != nil { eng.observer.Event("verbose_startup", map[string]any{ "cwd": eng.cfg.Cwd, "model": eng.cfg.Model, "scenario": eng.cfg.Scenario, }) } return eng, nil } // ── build* 自由函数(New() 的各子系统构建逻辑)──────────────────────────────── // // 精妙之处(CLEVER): 无接收者(free functions)而非 (e *Engine) 方法-- // build* 函数在 eng 完全构造之前调用,用接收者会引入隐蔽的初始化顺序依赖. // 依赖全部通过参数显式传入,编译器保证顺序. // 替代方案:(e *Engine) 方法(init 时引擎半构造状态下访问字段,隐蔽竞态风险). // buildFileCache 创建文件状态缓存(跟踪读过的文件,检测外部修改). func buildFileCache() *FileStateCache { return NewFileStateCache(0) // 使用默认大小 1000 } // buildPerms 创建权限引擎(含超时保护). // 升华改进(ELEVATED): 通过 resolvePermissionHandler 支持多处理器叠加; // 通过 NewEngineWithTimeout 注入可配置超时,防止 Handler 永久阻塞 runLoop. // 替代方案:直接使用 permission.NewEngine(超时固定为默认值,无法按场景定制). func buildPerms(cfg *Config) permission.Checker { return permission.NewEngineWithTimeout(cfg.PermissionMode, cfg.resolvePermissionHandler(), cfg.PermissionTimeout) } // buildSecretGuard 初始化 SecretGuard(Step1:nil 自动注入 DefaultSecretGuard). // 精妙之处(CLEVER): 将初始化分为两步-- // // Step1(此处): nil 自动注入 DefaultSecretGuard,确保 memory store 获得有效 guard. // Step2(injectGuardCallback): 注入 OnBlocked 回调,让 guard 能触发 observer 事件. // // 两步分开是因为 observer 在 memory store 之后初始化, // 但 memory store 需要 guard 在构造时注入(functional options 不支持延迟绑定). func buildSecretGuard(cfg *Config) security.SecretGuard { if cfg.SecretGuard != nil { return cfg.SecretGuard } guard := security.NewDefaultSecretGuard() cfg.SecretGuard = guard return guard } // buildMemory 创建记忆存储, 构造时注入 5 项依赖: // - SecretGuard: 阻止 Agent 将 API key 写入记忆文件 // - Observer: Save/Delete/FindRelevant 事件发送到 engine observer // - FileHistory: Save 前备份 (Backupper 接口, *FileHistory 已满足) // - Freshness: 可选, 记忆新鲜度警告 (cfg.FreshnessConfig==nil 时不启用) // - MemorySelector: 可选, AI 驱动的 FindRelevant (aiSel==nil 时走默认 TextScorer) // // L1282 (2026-04-14): 签名从 (cfg, guard) 扩展为 5 参, 构造时接线 WithObserver + // WithFileHistory + WithMemorySelector, 消除旧的 memory.InjectBackupper / // memory.InjectMemorySelector 后置注入路径. // // L1326 (2026-04-16) 重构后: memory 包直接消费 flyto.EventObserver 作为 // Observer 契约, 不再定义本地 MemoryObserver 接口. engine 的 observer 字段 // 就是 flyto.EventObserver (见 observer.go:48 类型别名), 这里直接透传给 // memory.WithObserver, 契约变化编译期强制同步, 消除原先鸭子类型隐性 coupling. func buildMemory( cfg *Config, guard security.SecretGuard, observer EventObserver, fileHistory memory.Backupper, aiSel memory.MemorySelector, ) memory.Store { memOpts := []memory.FileStoreOption{ memory.WithSecretGuard(guard), memory.WithObserver(observer), memory.WithFileHistory(fileHistory), } if cfg.FreshnessConfig != nil { memOpts = append(memOpts, memory.WithFreshness(*cfg.FreshnessConfig)) } if aiSel != nil { memOpts = append(memOpts, memory.WithMemorySelector(aiSel)) } if cfg.MemoryScorer != nil { memOpts = append(memOpts, memory.WithScorer(cfg.MemoryScorer)) } if cfg.MemoryTypeRegistry != nil { memOpts = append(memOpts, memory.WithTypeRegistry(cfg.MemoryTypeRegistry)) } if cfg.MemoryStrictSymlink { memOpts = append(memOpts, memory.WithStrictSymlink()) } if cfg.MemorySyncAdapter != nil { memOpts = append(memOpts, memory.WithSyncAdapter(cfg.MemorySyncAdapter, cfg.MemorySyncConfig)) } return memory.NewFileStoreWithOptions(cfg.Cwd, memOpts...) } // buildHooksMgr 创建 Hook 管理器. // // M1 方案 β 之后, hooks.NewManager 需要显式传 execenv.Executor. cfg.Executor // 由 Config.Validate 保证 non-nil (本地 CLI 填 DefaultExecutor{}, 云端 // platform 层填 sandbox.Backend), 所以这里不再做兜底. func buildHooksMgr(cfg *Config) *hooks.Manager { return hooks.NewManager(cfg.HooksConfig, cfg.Executor) } // buildResultStore 创建大结果磁盘持久化存储. func buildResultStore() *ResultStore { homeDir, _ := os.UserHomeDir() return NewResultStore(filepath.Join(homeDir, ".flyto", "tool-results"), "") } // buildDreamEngine 创建 AutoDream 记忆巩固引擎. // // L1224 (2026-04-13): 签名从 (mem, models) 扩展为 (mem, models, observer, activity, rootCtx). // 早期方案 DreamEngine 通过 parentEngine.Observer()/Activity()/Context() 间接访问, // 现改为构造期直接注入 - parentEngine 字段仅保留用于 ForkSubAgent. // 调用方必须在 observer/activity/rootCtx 就绪后才能 buildDreamEngine(见 NewEngine 构造顺序). func buildDreamEngine( mem memory.Store, models *config.ModelRegistry, observer EventObserver, activity *ActivityTracker, rootCtx context.Context, ) *DreamEngine { return NewDreamEngine(&DreamConfig{ MemStore: mem, Models: models, Observer: observer, Activity: activity, RootCtx: rootCtx, }) } // buildObserver 创建可观测性接口(nil 安全:未配置时用 NoopObserver;叠加 AuditObserver). // // 升华改进(ELEVATED): 引擎内部所有路径都通过 observer 发送事件, // 不需要 nil 检查--NoopObserver 所有方法都是空操作. // 精妙之处(CLEVER): AuditSink 通过 AuditObserver 叠加进 Observer 链-- // 无需修改任何工具代码,所有 "operation_recorded"/"secret_scan_blocked" 事件 // 自动流入 AuditSink.叠加而非替换:原有 observer(DataDog/stderr 等)继续正常工作. // 替代方案:每个调用点 if observer != nil(散布 50+ 处,容易遗漏导致 panic). // // 第二返回值 auditObs 是叠加进 observer 链里的 AuditObserver (cfg.AuditSink != nil 时), // 或 nil (未配置 audit).调用方拿到后应在 tool registry 建好后调用 // auditObs.SetToolRegistry(registry) 启用 L1191 修复的"工具自描述 AuditOperation" // 查询路径.未调用或 auditObs==nil 时 AuditObserver 回退到硬编码启发式, 零回归. // // 精妙之处(CLEVER): 本函数在 NewEngine 初始化顺序里先于 buildToolRegistry 调用 // (observer 需要先存在给 fileHistory / injectGuardCallback 等依赖), 所以 registry // 不能作为参数传入.返回 auditObs 指针让调用方后置注入, 是最简单的解耦方式. // 替代方案: <调整顺序让 registry 先建> - 否决: fileHistory 和 injectGuardCallback // 依赖 observer 存在, 顺序倒过来会产生更深的依赖纠缠. func buildObserver(cfg *Config) (EventObserver, *AuditObserver) { observer := cfg.Observer if observer == nil { observer = &NoopObserver{} } var auditObs *AuditObserver if cfg.AuditSink != nil { auditObs = NewAuditObserver(cfg.AuditSink, cfg.AuditSessionID) // L1223 修复: Input 审计策略后置注入 (与 SetToolRegistry 同模式, 不扩构造签名). auditObs.SetInputAudit(cfg.AuditIncludeToolInput, cfg.AuditInputMaxBytes) observer = NewCompositeObserver(observer, auditObs) } return observer, auditObs } // injectGuardCallback 在 observer 就绪后注入 OnBlocked 回调(Step2). // 精妙之处(CLEVER): 此处 observer 已经叠加了 AuditObserver(如果配置了 AuditSink), // 所以 secret_scan_blocked 事件会同时到达 AuditObserver 和用户配置的 Observer. // 工具层(FileWrite/FileEdit)和 memory 层都共享同一个 guard 实例, // OnBlocked 也只注入一次,事件来源通过 path 字段区分. // 仅对 DefaultSecretGuard 注入--接口实现者自行决定是否支持回调. func injectGuardCallback(guard security.SecretGuard, observer EventObserver) { if dg, ok := guard.(*security.DefaultSecretGuard); ok { obs := observer // 捕获当前 observer(含 AuditObserver 叠加) dg.OnBlocked = func(path string, matches []security.SecretMatch) { ruleIDs := strings.Join(security.MatchRuleIDs(matches), ",") obs.Event("secret_scan_blocked", map[string]any{ "path": path, "rule_ids": ruleIDs, "count": len(matches), }) } } } // buildFileHistory 创建文件操作历史管理器(编辑/写入前自动备份). // 升华改进(ELEVATED): 文件历史在 Engine 层创建,注入到 FileEdit/FileWrite 工具. // 替代方案:每个工具自己创建 FileHistory 实例(散乱,无法跨工具协调回滚). func buildFileHistory(cwd string, observer EventObserver) *FileHistory { return NewFileHistory(cwd, observer) } // buildScratchpad 创建 Scratchpad 暂存区(模块 18.3). // 升华改进(ELEVATED): ScratchpadDir="" 使用 in-memory,否则使用 FileScratchpad-- // 无需改动工具层(builtin.ScratchpadStore 接口),只在引擎初始化时切换后端. // 替代方案:<每次 Get/Set 时检查 ScratchpadDir,动态选择后端> - 否决: // 并发安全性需要额外锁,且 Hot-path 每次都做 string 比较(纯开销). func buildScratchpad(cfg *Config) (builtin.ScratchpadStore, error) { if cfg.ScratchpadDir != "" { fs, err := NewFileScratchpad(cfg.ScratchpadDir) if err != nil { return nil, fmt.Errorf("flyto-agent: init file scratchpad: %w", err) } return fs, nil } return NewScratchpad(), nil } // buildToolRegistry 创建工具注册表并注册内置工具和 ExtraTools. // 精妙之处(CLEVER): ExtraTools 不受 cfg.Tools 过滤-- // 调用方显式注入的工具一定是要启用的,不需要再被工具名白名单过滤. // 内置工具的过滤逻辑在 registerBuiltinTools 内部处理. func buildToolRegistry(cfg *Config, fileCache *FileStateCache, fileHistory *FileHistory, scratchpad builtin.ScratchpadStore) (*tools.Registry, error) { registry := tools.NewRegistry() if err := registerBuiltinTools(registry, cfg, fileCache, fileHistory, scratchpad); err != nil { return nil, fmt.Errorf("flyto-agent: register tools: %w", err) } for _, et := range cfg.ExtraTools { if err := registry.Register(et); err != nil { return nil, fmt.Errorf("flyto-agent: register extra tool %q: %w", et.Name(), err) } } return registry, nil } // buildOperationLog 创建统一操作日志(支持按消息回滚). func buildOperationLog(observer EventObserver) *OperationLog { return NewOperationLog(observer) } // buildTokenBudget 创建 Token 预算管理器. // 升华改进(ELEVATED): 集中管理 token 预算计算,替代散落在各处的粗估逻辑. // 替代方案:每个需要 token 信息的地方各自估算(不一致,难维护). func buildTokenBudget(reg *config.ModelRegistry, observer EventObserver) *TokenBudgetManager { return NewTokenBudgetManager(reg, observer) } // buildActivity 创建 ActivityTracker(可选,cfg.ActivityConfig==nil 时返回 nil). func buildActivity(cfg *Config, observer EventObserver) *ActivityTracker { if cfg.ActivityConfig == nil { return nil } return NewActivityTracker(cfg.ActivityConfig, observer) } // buildAgentRegistry 创建 Agent 类型注册表(内置 4 种类型)并注册内置 Agent. func buildAgentRegistry() *AgentRegistry { r := NewAgentRegistry() RegisterBuiltinAgents(r) return r } // buildBundleAndSectionRegistry provisions PromptBundle + Section infra // (module 15). Preloads the claude+programming default Bundle; SDK users can // RegisterPromptBundle after New(). The observer is threaded into the // SectionRegistry so CacheBreak sections emit `section_cache_break` diagnostic // events carrying Section.NoCacheReason — operators tracing prompt-cache // misses can see which section invalidated and why. // // buildBundleAndSectionRegistry 创建 PromptBundle + Section 基础设施 (模块 15). // 预装 claude+programming 默认 Bundle, SDK 用户可在 New() 后通过 // RegisterPromptBundle 添加. observer 传入 SectionRegistry 让 CacheBreak // section 发 `section_cache_break` 诊断事件携带 Section.NoCacheReason // (运维追踪 prompt-cache miss 能看到哪个 section 打碎 cache + 原因). func buildBundleAndSectionRegistry(observer EventObserver) (*agentctx.BundleRegistry, *agentctx.SectionRegistry) { return agentctx.NewDefaultBundleRegistry(), agentctx.NewSectionRegistryWithObserver(observer) } // buildCacheTrackers 创建 Prompt Cache 哈希追踪器和工具 Schema 变化追踪器(模块 22.2). // // 精妙之处(CLEVER): 两个追踪器分离维护-- // promptHashTracker 追踪整体系统提示词;toolSchemaTracker 逐工具追踪 Schema, // 只要有一个工具变化就能精确定位,且提供 StableFirstWithBoundary 优化排序. // 首次 promptHashTracker.Check 必然返回 changed=true,确保首次请求前触发 SectionCache 预热. func buildCacheTrackers() (*enginecache.PromptHashTracker, *enginecache.ToolSchemaTracker) { return enginecache.NewPromptHashTracker(), enginecache.NewToolSchemaTracker() } // initInbox 初始化 UDS Inbox(可选,EnableUDSInbox=true 时启用,模块 UDS). // 升华改进(ELEVATED): 必须在 eng 完全初始化后才能启动后台 goroutine-- // goroutine 通过 eng.observer 发事件,observer 在此时已经就绪. // 替代方案:在 engine 字段初始化时启动(observer 可能为 nil,需要额外 nil guard). func (e *Engine) initInbox(cfg *Config, rootCtx context.Context) { if !cfg.EnableUDSInbox { return } inboxSessionID := cfg.UDSInboxSessionID if inboxSessionID == "" { inboxSessionID = fmt.Sprintf("%d", time.Now().UnixNano()) } if srv, srvErr := inbox.NewUDSServer(inboxSessionID); srvErr != nil { // 精妙之处(CLEVER): 降级而非失败-- // 某些环境(容器沙箱,只读 /tmp)可能无法创建 socket. e.observer.Event("uds_inbox_init_failed", map[string]any{ "error": srvErr.Error(), }) } else if startErr := srv.Start(); startErr != nil { e.observer.Event("uds_inbox_start_failed", map[string]any{ "error": startErr.Error(), }) } else { e.inboxSrv = srv // 升华改进(ELEVATED): 通过 BashTool 实例方法注入 socket 路径,而非 os.Setenv 全局污染-- // 多租户场景下第二个 Engine.New() 不再覆盖第一个 Engine 的 FLYTO_SESSION_SOCK, // 每个 Engine 实例的 Bash 子进程只能连接属于自己的 UDS socket. // 替代方案(原方案):os.Setenv("FLYTO_SESSION_SOCK", srv.SockPath()) // -- 修改进程全局状态,多 Engine 实例时静默跨租户数据污染. if bt, ok := e.tools.Get("Bash"); ok { if bashTool, ok := bt.(*builtin.BashTool); ok { bashTool.SetSessionSockPath(srv.SockPath()) } } // 精妙之处(CLEVER): 替换 MonitorTool-- // registerBuiltinTools 注册了 nil 版本(UDS 未启用时的降级). // 现在 UDS 已就绪,Unregister + Register 升级为真实版本. // 这样 MonitorTool 的 Send 会走 UDS channel,不再静默丢弃. // 替代方案:让 MonitorTool 持有 *atomic.Pointer[UDSServer](过度复杂). e.tools.Unregister("monitor_progress") _ = e.tools.Register(builtin.NewMonitorTool(srv)) go e.inboxForwardLoop(rootCtx) e.observer.Event("uds_inbox_started", map[string]any{ "sock_path": srv.SockPath(), "session_id": inboxSessionID, }) } } // initPlanQueue 初始化异步计划队列(可选,EnablePlanQueue=true 时启用,模块 20.3). // 精妙之处(CLEVER): 必须在 eng 完全初始化后启动-- // // PlanExecFunc 闭包引用 eng.Run(),Run() 依赖 eng.tools/eng.observer 等字段. // 若在 tools 注册前启动,提交的计划执行时找不到工具. // 与 UDS Inbox 初始化紧邻,保持"可选 IPC 服务"的代码局部性. func (e *Engine) initPlanQueue(cfg *Config) { if !cfg.EnablePlanQueue { return } planSessionID := cfg.PlanQueueSessionID if planSessionID == "" { planSessionID = fmt.Sprintf("pq%d", time.Now().UnixNano()) } // PlanExecFunc:把计划的步骤描述转化为 Agent 交互执行. // 升华改进(ELEVATED): 依赖反转--FilePlanQueue 不导入 Engine, // 只持有此函数指针,测试可以用 mock 替换. // 替代方案:FilePlanQueue 持有 *Engine 指针(循环依赖,测试困难). execFunc := func(ctx context.Context, plan *QueuedPlan, onStepDone func(string, error)) error { // 将计划步骤合并为一个 prompt 发给 Agent 执行. // 每个步骤完成后通过 onStepDone 回调更新状态文件,客户端轮询可见进度. // // 历史包袱(LEGACY): 目前只传一个合并 prompt,不做单步骤精确回调. // 理想做法:解析 Agent 输出,当步骤被明确完成时调用对应的 onStepDone. // 当前:整个 plan 完成后统一回调,后续可改进精度. var sb strings.Builder sb.WriteString("你正在执行以下计划的各个步骤,请依次完成:\n\n") for i, s := range plan.Steps { sb.WriteString(fmt.Sprintf("%d. **%s**: %s\n", i+1, s.ID, s.Description)) } sb.WriteString("\n每完成一个步骤后告知进度,全部完成后输出\"所有步骤已完成\"。") evtCh := e.Run(ctx, sb.String()) var lastErr error for evt := range evtCh { if errEvt, ok := evt.(*ErrorEvent); ok { lastErr = errEvt.Err } } if lastErr == nil { for _, s := range plan.Steps { onStepDone(s.ID, nil) } } return lastErr } if pq, pqErr := NewFilePlanQueue(cfg.PlanQueueDir, execFunc); pqErr != nil { e.observer.Event("plan_queue_init_failed", map[string]any{ "error": pqErr.Error(), }) } else { e.planQueue = pq // daemon 重启恢复:将 running→pending(at-least-once 语义) if recErr := pq.RecoverPending(); recErr != nil { e.observer.Event("plan_queue_recover_failed", map[string]any{ "error": recErr.Error(), }) } // 启动 PlanCommandServer(UDS 请求-响应端点) if pcs, pcsErr := NewPlanCommandServer(planSessionID, pq); pcsErr != nil { e.observer.Event("plan_cmd_server_init_failed", map[string]any{ "error": pcsErr.Error(), }) } else if startErr := pcs.Start(); startErr != nil { e.observer.Event("plan_cmd_server_start_failed", map[string]any{ "error": startErr.Error(), }) } else { e.planCmdSrv = pcs // 升华改进(ELEVATED): 同 FLYTO_SESSION_SOCK--通过实例方法注入,不污染全局 env. if bt, ok := e.tools.Get("Bash"); ok { if bashTool, ok := bt.(*builtin.BashTool); ok { bashTool.SetPlanSockPath(pcs.SockPath()) } } e.observer.Event("plan_queue_started", map[string]any{ "sock_path": pcs.SockPath(), "session_id": planSessionID, "queue_dir": pq.dir, }) } } } // Run 执行一次完整的 Agent 交互. // 返回一个 channel,消费层从中读取流式事件. // 用 Go channel 实现流式输出,消费层用 for range 读取即可. // // 用法: // // events := agent.Run(ctx, "修复 src/app.ts 中的类型错误") // for event := range events { // switch e := event.(type) { // case *TextEvent: // fmt.Print(e.Text) // case *ToolUseEvent: // fmt.Printf("调用工具: %s\n", e.ToolName) // } // } func (e *Engine) Run(ctx context.Context, prompt string, opts ...RunOption) <-chan Event { runCfg := &runConfig{} for _, opt := range opts { opt(runCfg) } ch := make(chan Event, 64) go func() { defer close(ch) e.runLoop(ctx, prompt, runCfg, ch) }() return ch } // Session 创建一个有状态的多轮会话. func (e *Engine) Session(id string) *Session { e.sessionState.mu.Lock() defer e.sessionState.mu.Unlock() if s, ok := e.sessionState.sessions[id]; ok { return s } s := newSession(id, e) e.sessionState.sessions[id] = s e.observer.Event("session_created", map[string]any{ "session_id": id, "total_sessions": len(e.sessionState.sessions), }) return s } // Tools 返回工具注册表,允许消费层动态注册/移除工具. func (e *Engine) Tools() *tools.Registry { return e.tools } // FileCache 返回文件状态缓存,允许消费层查询文件读取历史. func (e *Engine) FileCache() *FileStateCache { return e.fileCache } // ResultStoreRef 返回大结果存储实例. func (e *Engine) ResultStoreRef() *ResultStore { return e.resultStore } // Dream 返回 AutoDream 记忆巩固引擎,消费层可用于查询巩固任务状态. func (e *Engine) Dream() *DreamEngine { return e.dreamEngine } // Observer 返回引擎的可观测性接口. // 消费层可以用它查询当前 observer 或在子系统中复用. func (e *Engine) Observer() EventObserver { return e.observer } // observeWithChain 在 event data 中自动注入查询链字段. // // 精妙之处(CLEVER): 不修改 Observer 接口,而是在调用侧 merge 字段. // 这样所有现有 Observer 实现无需任何改动就能收到链追踪数据. // 替代方案:给 Observer 接口加 ChainEvent 方法(破坏所有已有实现). func (e *Engine) observeWithChain(name string, chain *QueryChainTracking, data map[string]any) { if chain != nil { for k, v := range chain.EventFields() { data[k] = v } } e.observer.Event(name, data) } // StrictModeRef 返回严格模式配置(可能为 nil). func (e *Engine) StrictModeRef() *StrictMode { return e.strictMode } // FileHistoryRef 返回文件历史管理器. func (e *Engine) FileHistoryRef() *FileHistory { return e.fileHistory } // FileHistoryView 返回文件历史的只读查询接口. // // 升华改进(ELEVATED): L1186 修复 - 比 FileHistoryRef() 更窄的契约. // 新代码优先用本访问器, 消费者只能调用 CanRollback / SnapshotCount 两个 // 只读查询方法, 无法直接调用 Rollback/Prune (后者应通过 Engine.Rollback // 走完整路径). // // FileHistoryRef() 保留用于向后兼容 (docs/configuration.md 示例 + // 已有消费者), 但新代码/新文档应引用 FileHistoryView(). // // 精妙之处(CLEVER): Go 结构化 typing 让 *FileHistory 自动实现 FileHistoryView, // 无需任何显式 "implements" 声明或 adapter 类型, 零运行时成本. func (e *Engine) FileHistoryView() FileHistoryView { return e.fileHistory } // OperationLogRef 返回统一操作日志. func (e *Engine) OperationLogRef() *OperationLog { return e.operationLog } // TokenBudget 返回 Token 预算管理器. // 消费层可用于查询上下文窗口状态,计算警告阈值. func (e *Engine) TokenBudget() *TokenBudgetManager { return e.tokenBudget } // Rollback 按消息 ID 回滚所有操作. // 升华改进(ELEVATED): 同时回滚文件历史和操作日志-- // 文件历史负责物理恢复,操作日志负责逻辑编排. // 替代方案:只回滚文件历史(无法回滚数据库/API 操作). func (e *Engine) Rollback(ctx context.Context, messageID string) error { // 先回滚文件历史(物理层面恢复文件内容) if e.fileHistory != nil { if err := e.fileHistory.Rollback(messageID); err != nil { e.observer.Error(err, map[string]any{ "message_id": messageID, "phase": "file_history_rollback", }) // 继续尝试操作日志回滚(best-effort) } } // 再回滚操作日志(逻辑层面执行补偿操作) if e.operationLog != nil { return e.operationLog.RollbackMessage(ctx, messageID, e) } return nil } // ExecuteUndo 执行撤销操作. // 实现 UndoExecutor 接口,供 OperationLog 回调. func (e *Engine) ExecuteUndo(ctx context.Context, undo *tools.UndoInfo) error { tool, ok := e.tools.Get(undo.ToolName) if !ok { return fmt.Errorf("undo tool not found: %s", undo.ToolName) } input, err := json.Marshal(undo.Input) if err != nil { return fmt.Errorf("failed to marshal undo input: %w", err) } _, execErr := tool.Execute(ctx, input, nil) return execErr } // Close 优雅关闭引擎,释放所有资源. // // 精妙之处(CLEVER): 分层关闭,每层有独立超时保护-- // 1. 防重入(atomic CAS) // 2. Cancel root context(通知所有 goroutine 停止) // 3. 短暂等待(让 goroutine 有机会干净退出) // 4. 按依赖顺序关闭资源(先消费者后生产者) // 5. Observer 最后刷新(等前面的清理事件都发完) // // 整个 Close 在 CloseTimeout(默认 10s)内完成, // failsafe 超时保护确保不会永远挂起. // // 升华改进(ELEVATED): 早期方案 gracefulShutdown 混合了终端清理/UI/分析刷新, // 我们只做引擎资源清理--终端和 UI 是消费层的事. // 替代方案:<原方案 530 行混合引擎+终端+UI+分析> func (e *Engine) Close() error { // 1. 防重入--多次 Close 只执行一次 if !e.closed.CompareAndSwap(false, true) { return nil } // 确定超时时间 closeTimeout := e.cfg.CloseTimeout if closeTimeout <= 0 { closeTimeout = 10 * time.Second } // 发出关闭事件 e.observer.Event("engine_closing", map[string]any{ "timeout": closeTimeout.String(), }) // 2. Cancel root context--通知所有 goroutine 停止 // 精妙之处(CLEVER): 一个 cancel 通知所有 goroutine,不需要逐个追踪. // Run() 传入的 caller ctx 和 rootCtx 是独立的--caller 可以取消自己的请求, // Close() 取消引擎级的后台 goroutine(Dream,Memory extraction 等). e.rootCancel() // 3. 短暂等待--给 goroutine 一点时间干净退出 // 精妙之处(CLEVER): 不用 WaitGroup(侵入性太强), // 100ms 对轻量 goroutine(文件 I/O,内存操作)绑绑有余. // 如果 goroutine 在 100ms 内没退出也没关系--后面的资源清理不依赖它们. time.Sleep(100 * time.Millisecond) // 4. 分层资源清理--带超时保护 ctx, cancel := context.WithTimeout(context.Background(), closeTimeout) defer cancel() var errs []string // 4a. 关闭所有会话(可能有活跃的 SubAgent) e.sessionState.mu.Lock() for id, s := range e.sessionState.sessions { if err := closeWithTimeout(ctx, "session:"+id, func() error { s.Close() return nil }); err != nil { errs = append(errs, err.Error()) } } e.sessionState.sessions = nil e.sessionState.mu.Unlock() // 4a2. Close MCP manager before closing plugins so that any in-flight // JSON-RPC requests sees a clean shutdown (manager cancels pending // requests, stops reconnect loops, kills stdio subprocesses). This // must run before plugins.Close() clears the configuration map, to // keep the (already-informational) ordering consistent with the // observer events that reference plugin names. // // Replaced approach: - // rejected, mcp.Manager's stdio transport owns exec.Cmd instances // whose subprocesses keep running until explicit Close(), ignoring // context cancellation alone. if e.mcpMgr != nil { if err := closeWithTimeout(ctx, "mcp_manager", func() error { e.mcpMgr.CloseAll() return nil }); err != nil { errs = append(errs, err.Error()) } } // 4b. 关闭插件宿主(可能有 MCP 连接) if e.plugins != nil { if err := closeWithTimeout(ctx, "plugins", func() error { e.plugins.Close() return nil }); err != nil { errs = append(errs, err.Error()) } } // 4b2. 等待 DreamEngine goroutine 退出(context 已被 cancel,goroutine 应很快结束) // 精妙之处(CLEVER): dreamEngine.Close 在插件关闭后,文件缓存清理前-- // Dream 可能正在写记忆文件,等它结束后再清理缓存更安全. // rootCancel() 已在步骤 2 调用,Dream 的 ctx.Done() 已触发, // 这里只是等它走完收尾(一般 <100ms). if e.dreamEngine != nil { // 从剩余超时中取 3s 作为 Dream 退出等待上限 dreamTimeout := 3 * time.Second if deadline, ok := ctx.Deadline(); ok { if remaining := time.Until(deadline); remaining > 0 && remaining < dreamTimeout { dreamTimeout = remaining } } e.dreamEngine.Close(dreamTimeout) } // 4c. 清理文件缓存 if e.fileCache != nil { e.fileCache.Clear() } // 4d. 清理结果磁盘存储 // ResultStore 无需显式关闭(文件级操作,无后台 goroutine) // 4e. 备份保留策略清理--删除超期/超版本的备份文件 // 精妙之处(CLEVER): Prune 在 Close 末尾执行--此时所有 session 和 goroutine 已停止, // 不会有并发备份操作干扰 Prune 的目录遍历. // 默认策略:MaxAgeDays=30 + MaxVersions=50,确保 ~/.flyto/history/ 不会无限增长. // Prune 失败只记录 observer 事件,不影响 Close 返回值(备份清理是非关键操作). if e.fileHistory != nil { if pruneErr := closeWithTimeout(ctx, "file_history_prune", func() error { return e.fileHistory.Prune(ctx, DefaultRetentionPolicy) }); pruneErr != nil { errs = append(errs, pruneErr.Error()) } } // 4f. 关闭 UDS Inbox 服务端(在 ActivityTracker 之前--inbox 是低级资源) // 精妙之处(CLEVER): rootCancel() 已触发,inboxForwardLoop goroutine 会退出. // 调用 srv.Close() 确保 socket 文件被清理 + msgs channel 被关闭. // 即使 inboxForwardLoop 已退出,Close() 是幂等的,安全多次调用. if e.inboxSrv != nil { if err := closeWithTimeout(ctx, "uds_inbox", func() error { return e.inboxSrv.Close() }); err != nil { errs = append(errs, err.Error()) } // 升华改进(ELEVATED): os.Unsetenv 已无需调用-- // 我们不再用 os.Setenv 写入全局环境(改为 BashTool.SetSessionSockPath 实例注入), // 所以关闭时也不需要清理全局环境变量. // 原方案(已替换):_ = os.Unsetenv("FLYTO_SESSION_SOCK") } // 4g. 关闭异步计划命令服务器(在 PlanQueue 之前--先停止接受新命令) if e.planCmdSrv != nil { if err := closeWithTimeout(ctx, "plan_cmd_server", func() error { return e.planCmdSrv.Close() }); err != nil { errs = append(errs, err.Error()) } // 升华改进(ELEVATED): 同上--os.Unsetenv 已无需调用. // 原方案(已替换):_ = os.Unsetenv("FLYTO_PLAN_SOCK") } // 4h. 关闭异步计划队列(等当前执行的计划完成,最多 30 秒) // 精妙之处(CLEVER): PlanQueue.Close 内部有 30 秒超时-- // 如果执行中的计划无法在 30 秒内完成(超大计划),强制 cancel context 终止. // daemon 重启后 RecoverPending 会将遗留的 running 状态恢复为 pending 重新执行. // 替代方案:立即 cancel 所有计划(更快但丢失进度,违反 at-least-once 语义). if e.planQueue != nil { if err := closeWithTimeout(ctx, "plan_queue", func() error { return e.planQueue.Close() }); err != nil { errs = append(errs, err.Error()) } } // 5. 关闭 ActivityTracker(在 Observer 之前--tracker 通过 observer 发最终事件) if e.activity != nil { e.activity.Close() } // 6. Observer 最后刷新--等前面的清理事件都发完 // 精妙之处(CLEVER): Observer 必须最后关闭, // 因为步骤 4 的清理操作可能通过 observer 发事件(如 session_closed). // 如果先关 observer,这些事件就丢了. // 使用 observerCloser 接口检测而非硬类型断言-- // 任何实现了 Close() 的 Observer(BufferedObserver,自定义实现)都会被关闭. closeObservers(ctx, e.observer, &errs) // 发出关闭完成事件(用 NoopObserver 兜底,因为 BufferedObserver 可能已关闭) e.observer.Event("engine_closed", map[string]any{ "errors": len(errs), }) if len(errs) > 0 { return fmt.Errorf("engine close errors: %s", strings.Join(errs, "; ")) } return nil } // Closed 返回引擎是否已关闭. func (e *Engine) Closed() bool { return e.closed.Load() } // InboxServer 返回 UDS Inbox 服务端(可能为 nil,未启用时). // 消费层可用于查询 socket 路径或直接监听消息 channel. func (e *Engine) InboxServer() *inbox.UDSServer { return e.inboxSrv } // inboxForwardLoop 是 UDS Inbox 消息转发循环. // 从 inboxSrv.Messages() 读取工具子进程发来的消息, // 转换为 InboxMessageEvent 通过 observer 发出. // // 精妙之处(CLEVER): 这里不向 Run() 的 events channel 发送-- // // inboxForwardLoop 是引擎后台 goroutine,不绑定到特定的 Run() 调用. // 一次 Run() 结束后下次 Run() 开始前,inbox 消息仍然可以到来(子进程延迟报告). // 通过 observer 发事件是更正确的设计:observer 的生命周期与引擎绑定,而非与单次 Run 绑定. // 替代方案:将消息推给 Run() 的 events channel(消息可能丢失,Run() 结束后 channel 已关闭). // // 升华改进(ELEVATED): 消费层若需要在 Run() 的 events channel 中看到 inbox 消息, // // 可以在自定义 Observer.Event("inbox_message", data) 中向 Run() 的上层 channel 注入, // 而无需修改引擎核心. func (e *Engine) inboxForwardLoop(ctx context.Context) { if e.inboxSrv == nil { return } msgs := e.inboxSrv.Messages() for { select { case msg, ok := <-msgs: if !ok { // channel 已关闭(inboxSrv.Close() 被调用) return } e.observer.Event("inbox_message", map[string]any{ "type": msg.Type, "tool_use_id": msg.ToolUseID, "data": msg.Data, }) case <-ctx.Done(): // 引擎生命周期结束(rootCancel() 被调用) return } } } // Activity 返回活动追踪器(可能为 nil). func (e *Engine) Activity() *ActivityTracker { return e.activity } // SkillRegistry 返回 Engine 的 Skill 注册表. // 消费层可在 New() 之后,Run() 之前调用 engine.SkillRegistry() 注册自定义 Skill. func (e *Engine) SkillRegistry() *SkillRegistry { return e.skillRegistry } // SpawnSkillAgent 实现 SkillSpawner 接口. // 创建 SubAgent 并同步等待结果,供 SkillRegistry.invokeFork 调用. // // 精妙之处(CLEVER): Engine 实现 SkillSpawner,而不是 SkillRegistry 直接持有 Engine-- // 依赖方向从双向变单向(Engine → SkillRegistry,SkillRegistry → SkillSpawner 接口). // SkillRegistry 不再"知道"自己运行在哪个 Engine 里,只知道"能 spawn"这件事. func (e *Engine) SpawnSkillAgent(ctx context.Context, cfg *SubAgentConfig, prompt string) (string, error) { sa := SpawnSubAgent(e, cfg) return sa.RunSync(ctx, prompt) } // RegisterPromptBundle 注册自定义 PromptBundle. // // 升华改进(ELEVATED): 这是平台层和 SDK 用户扩展提示词的标准入口-- // 不需要修改引擎源码,在 New() 之后,Run() 之前注册行业特化 Bundle 即可. // 引擎内置的 claude+programming Bundle 不受影响. // // 典型用法(仓储场景): // // eng, _ := engine.New(cfg) // eng.RegisterPromptBundle( // engine.BundleKey{ModelFamily: "claude", Scenario: "warehouse"}, // myWarehouseBundle, // ) func (e *Engine) RegisterPromptBundle(key agentctx.BundleKey, bundle agentctx.PromptBundle) { e.bundleRegistry.Register(key, bundle) } // ResetSectionCache 清空 Section 计算缓存. // 在 /clear 或 /compact 后调用,让动态 sections(FLYTO.md,环境信息等)下一轮重新计算. // 内部调用 maybeCompact 时自动触发,外部调用方一般不需要显式调用. // // 升华改进(ELEVATED): 同时重置 PromptHashTracker-- // /clear,/compact,Bundle 切换等场景下,SectionCache 被显式清空, // 意味着下一次渲染内容可能与上次相同(如 /compact 后会话内容不变), // 但 Anthropic 缓存 slot 已不再有效(会话 ID 变化或上下文位置偏移). // 重置 tracker 确保下次 runLoop 检测到"变化"并触发重渲,建立新缓存 slot. // 替代方案:<只重置 sectionRegistry,不重置 tracker> // - 否决:tracker 的哈希还指向旧内容,下一轮 buildSystemPromptWithContext 后 // 内容可能相同但缓存 slot 已失效,tracker 不重置会错误判定为"未变化"跳过重渲. func (e *Engine) ResetSectionCache() { e.sectionRegistry.Reset() // 同步重置哈希追踪器(原因见上方注释) if e.promptHashTracker != nil { e.promptHashTracker.Reset() } } // Context 返回引擎的生命周期 context. // 后台 goroutine(Dream,Memory extraction 等)应使用此 context 而非 background. // 精妙之处(CLEVER): 暴露 rootCtx 让内部代码能够感知引擎生命周期, // 而不需要每个方法都传递 context 参数. func (e *Engine) Context() context.Context { return e.rootCtx } // Cwd 返回工作目录路径. func (e *Engine) Cwd() string { return e.cfg.Cwd } // ForkSubAgent 实现 EngineRef 接口. // 将 SpawnSubAgent(*Engine, cfg) 封装为接口方法,消除 agentExecutor 中的类型断言. // // 精妙之处(CLEVER): 一行封装,让 EngineRef 从"有例外的接口"变为"完整接口"-- // agentExecutor 只看到 ForkSubAgent,不需要知道内部调用了 SpawnSubAgent(*Engine, cfg), // 也不需要类型断言.EngineRef mock 只需实现此方法即可覆盖 fork 路径. func (e *Engine) ForkSubAgent(cfg *SubAgentConfig) *SubAgent { return SpawnSubAgent(e, cfg) } // BuildAndStream 实现 EngineBackend 接口,封装双路径的请求构建和流式调用. // // 升华改进(ELEVATED): 替代主循环中的 Provider==nil 判断. // 主循环不再感知"直连 Anthropic API"和"通过 flyto.Provider 接口"两条路径. func (e *Engine) BuildAndStream(ctx context.Context, model string, maxTokens int, promptBlocks []agentctx.SystemPromptBlock, messages []query.Message, toolDefs any, responseFormat *flyto.ResponseFormat, attempt int) (<-chan flyto.Event, error) { // Provider 路径:构建 flyto.Request,调用 provider.Stream // 升华改进(ELEVATED): 彻底消除 Provider==nil 双路径-- // New() 保证 Provider 永远非 nil(默认使用 AnthropicProvider), // EngineBackend 接口不再需要分支判断. req := buildFlytoRequest(model, maxTokens, promptBlocks, messages, toolDefs.([]api.ToolDef), responseFormat, e.cfg.FastMode, e.cfg.Effort) // 升华改进(ELEVATED): data-driven-capabilities RFC PR1.1 - 注入模型能力快照. // engine 是唯一持有 ModelRegistry 的位置,由它在 Stream 之前从 registry 读出 // 当前模型的 ModelInfo(含 MaxTools/SupportsThinking/CachingMinTokens 等), // 塞进 req.Capabilities.Provider 优先使用此数据,缺失时降级到包内常量. // // 反向思维:为什么不在 buildFlytoRequest 里直接注入? // 否决--buildFlytoRequest 是无状态的纯函数(多个调用方复用,包括 makeMemoryQueryFn), // 让它依赖 Engine 实例方法会扩散依赖.注入逻辑只在"主请求路径"需要,放在这里更内聚. // // capabilities_missing event:registry 中没有该模型时发警告事件,不阻塞请求. // 这是 RFC §11 验收标准 - 防止 silent fallback 长期掩盖 engine 注入 bug. if e.cfg.Models != nil { if cfg := e.cfg.Models.GetConfig(model); cfg != nil { req.Capabilities = cfg } else { e.observer.Event("capabilities_missing", map[string]any{ "model": model, }) } } // Inject the main-thread source label so retry-layer diagnostics // attribute run-loop failures to "main_thread" rather than an empty // label. Sub-agent / compact / summary / etc. paths inject their // own labels at their respective provider.Stream call sites. // // 注入主线程 source 标签, 重试层诊断把 run-loop 失败归因到 // "main_thread" 而非空标签. sub-agent / compact / summary 等路径 // 在各自的 provider.Stream call site 注入自己的标签. ctx = retry.WithQuerySource(ctx, SourceMainThread.String()) return e.cfg.Provider.Stream(ctx, req) } // compile-time interface compliance check var _ EngineBackend = (*Engine)(nil) // ============================================================ // 插件管理 API(模块 9.3) // ============================================================ // LoadPlugin 从目录加载插件并立即同步其 hooks 到 hooks.Manager. // // 升华改进(ELEVATED): 早期实现 的 loadPluginHooks() 是全局函数,依赖全局 STATE, // 并发调用需要手动保证顺序 (早期实现注释 "atomic clear-then-register"). // 我们在 Engine 方法上操作,锁保护由 plugin.Host 和 hooks.Manager 各自负责, // 无需外部协调--两个子系统的锁是独立的,不会死锁. // 替代方案: // - 否决:SDK 用户需要在运行时根据会话上下文决定加载哪些插件. func (e *Engine) LoadPlugin(dir string) error { if err := e.plugins.LoadFromDir(dir); err != nil { return err } // 加载成功后立即同步插件 hooks + declarative tools + MCP servers e.syncPluginHooks() e.syncPluginTools() e.syncPluginMCPServers() return nil } // EnablePlugin 启用指定名称的插件并重新注册其 hooks. // // 如果插件之前被 Disable,调用此方法重新激活其 hooks. // 幂等:多次调用无副作用. func (e *Engine) EnablePlugin(name string) error { if err := e.plugins.Enable(name); err != nil { return err } // 重新 sync:DisablePlugin 时已经 UnregisterAllBySource + 清掉 MCP server, // Enable 后需要重新注册 hooks + declarative tools + MCP servers. e.syncPluginHooks() e.syncPluginTools() e.syncPluginMCPServers() return nil } // DisablePlugin 禁用指定名称的插件并移除其所有 hooks. // // 精妙之处(CLEVER): 先 UnregisterAllBySource 再 Disable-- // 顺序很重要:如果先 Disable 再 UnregisterAllBySource, // 其他 goroutine 可能在 Disable 和 Unregister 的空窗期内 // 以为插件已禁用但 hooks 还在触发.先移除 hooks 再标记禁用, // 保证"hooks 消失"早于"插件状态变化",fail-safe 方向. func (e *Engine) DisablePlugin(name string) error { // Step 1: shut down this plugin's MCP servers and unregister their // proxy tools. Done first because an MCP server's tools can have // side effects on shared state; killing the subprocess before // marking the plugin disabled closes the window where a stray // in-flight request could land on a plugin the agent thinks is off. // // Replaced approach: - rejected, leaves the subprocess // running until the next sync cycle. e.shutdownPluginMCPServers(name) // Step 2: 先移除 declarative tools (和 hooks 同理, fail-safe 方向: tools 消失 // 早于插件状态变化, 避免"插件已禁用但 tool 还在 registry 被 LLM 调用" // 的窗口). if e.tools != nil { if p, ok := e.plugins.Get(name); ok { for _, tool := range p.Tools { _ = e.tools.Unregister(tool.Name()) } } } // Step 3: 先移除 hooks(精确到来源) if e.hooksMgr != nil { e.hooksMgr.UnregisterAllBySource(name) } // Step 4: 再标记插件为禁用状态 return e.plugins.Disable(name) } // syncPluginHooks 将所有启用插件的 hooks 同步注册到 hooks.Manager. // // 设计: // 1. 先 UnregisterAllBySource 清除所有现有插件 hooks(非全局) // 2. 遍历启用插件,转换 plugin.HookDef → hooks.HookDef(带 Source 标签) // 3. 注册所有插件 hooks // // 精妙之处(CLEVER): "清除所有插件来源 + 重新注册"是原子操作对-- // 和早期实现 的 clearRegisteredPluginHooks + registerHookCallbacks 同样的原子交换思路. // 区别在于我们不需要清除"全局 hooks"(Source=""),全局 hooks 不受影响. // 这让 reload 插件时无需重新加载全局 hooks(Engine.New() 时已注册). // // 调用时机: // - Engine.New() 初始化完成后(全局 hooks 之后) // - LoadPlugin() 加载新插件后 // - EnablePlugin() 重新启用插件后 // // 注意:当前 Engine 中若有进行中的 hook 执行,不会被强制中断. // 清除只影响下次触发时的查找结果(hooks.Manager 执行时复制 slice). func (e *Engine) syncPluginHooks() { if e.hooksMgr == nil || e.plugins == nil { return } // Step 1: 清除所有现有插件 hooks(保留 Source="" 的全局 hooks). // 不逐个插件清除--批量清除"所有非空 Source"再重建更简洁, // 且避免"部分插件已清除,部分还在"的中间态. // // 升华改进(ELEVATED): 早期实现 clearRegisteredPluginHooks 遍历所有事件类型, // 我们利用 AllHookTypes() 统一遍历,新增 hook 类型时无需修改此处. for _, hookType := range hooks.AllHookTypes() { // 遍历当前已注册的插件来源并移除 // 精妙之处(CLEVER): 不调用 UnregisterAllBySource(非空)-- // 我们不知道有哪些来源,也不想遍历 Manager 内部 map(封装原则). // 替代方案:暴露 Manager.Sources() 方法 → 增加不必要的 API 面积. // 实际做法:每个插件用自己的名字注册,所以按启用插件名逐个清除即可. _ = hookType // 下方按插件名清除,不需要按类型循环 } // 实际清除:按当前所有插件(含禁用的)的名字清除. // 这样即使插件被禁用后 syncPluginHooks 仍能清干净其遗留 hooks. for _, p := range e.plugins.List() { e.hooksMgr.UnregisterAllBySource(p.Name) } // Step 2: 重新注册所有启用插件的 hooks. enabledPlugins := e.plugins.ListEnabled() totalRegistered := 0 for _, p := range enabledPlugins { pluginHooks := p.Hooks // map[string][]plugin.HookDef for hookTypeStr, defs := range pluginHooks { hookType := hooks.HookType(hookTypeStr) // 过滤无效 hook 类型(plugin.json 中拼写错误的类型名) if !isValidPluginHookType(hookType) { e.observer.Event("plugin_hook_invalid_type", map[string]any{ "plugin": p.Name, "hook_type": hookTypeStr, }) continue } for _, def := range defs { hookDef := hooks.HookDef{ Command: def.Command, Timeout: def.Timeout, Source: p.Name, // 来源 = 插件名,用于精准移除 PluginDir: p.Path, // 插件根目录,executor 注入 FLYTO_PLUGIN_ROOT } if err := e.hooksMgr.Register(hookType, hookDef); err != nil { // Register 失败只记录,不中断其他插件(fail-open) e.observer.Event("plugin_hook_register_failed", map[string]any{ "plugin": p.Name, "hook_type": hookTypeStr, "error": err.Error(), }) continue } totalRegistered++ } } } e.observer.Event("plugin_hooks_synced", map[string]any{ "plugin_count": len(enabledPlugins), "hook_count": totalRegistered, }) } // syncPluginTools 将所有启用插件通过 plugin.json 的 tools 字段声明式注册的 // shell tool 同步注册到 Engine 的 tools.Registry. // // 设计对称于 syncPluginHooks: // 1. 先 Unregister 所有 plugin 提供的 tool (clean slate) // 2. 再遍历启用 plugin, 逐个 Register 其 tool // 3. 注册失败走 observer event (不 panic, 不阻断其他 plugin) // // 调用时机 (和 syncPluginHooks 同步): // - Engine.New() 初始化完成后 (内置 + Extra tools 之后) // - LoadPlugin() 加载新插件后 // - EnablePlugin() 重新启用插件后 // // DisablePlugin() 不经本方法, 而是直接在 DisablePlugin 内部 Unregister // 该 plugin 的 tools (fail-safe 顺序: tools 消失先于插件状态变化), // 然后本方法在下次 sync 时自然跳过它 (ListEnabled 只返回 Enabled=true 的). // // 精妙之处 (CLEVER): 和 syncPluginHooks 一样采用"清除全部 + 重新注册" 而非 // "精确 diff", 因为插件 tool 数量通常 <20, 全量 rebuild 的开销和实现复杂度 // 差 10 倍. diff 策略在百万级 tool 才有意义. // // 替代方案: <每次只 Register 新增 tool, 不 Unregister 已存在的> - 否决, // tool 名 (e.g. pluginA:greet) 升级后可能同名但绑定新实现, 不 Unregister 会 // 让旧实现继续生效. func (e *Engine) syncPluginTools() { if e.tools == nil || e.plugins == nil { return } // Step 1: Unregister 所有现有 plugin tools (包括当前被禁用的, 兜底清扫) for _, p := range e.plugins.List() { for _, tool := range p.Tools { _ = e.tools.Unregister(tool.Name()) } } // Step 2: 重新注册所有启用插件的 tools enabledPlugins := e.plugins.ListEnabled() totalRegistered := 0 for _, p := range enabledPlugins { for _, tool := range p.Tools { if err := e.tools.Register(tool); err != nil { if e.observer != nil { e.observer.Event("plugin_tool_register_failed", map[string]any{ "plugin": p.Name, "tool": tool.Name(), "error": err.Error(), }) } continue } totalRegistered++ } } if e.observer != nil { e.observer.Event("plugin_tools_synced", map[string]any{ "plugin_count": len(enabledPlugins), "tool_count": totalRegistered, }) } } // loadConfigPlugins registers plugin definitions declared in // engine.Config.Plugins via plugin.Host.Load. Called exactly once from // engine.New, before the three syncPlugin* passes so the host registry is // stable by the time hooks / tools / MCPServers scan it. // // Scope reminder: Host.Load is the "metadata-only placeholder" API -- // it writes a Plugin entry with Name / Description / Source (no skills / // hooks / tools / MCPServers parsed). For those, see engine.LoadPlugin // (plugin.json on disk) or plugin.Host.RegisterBuiltin (in-binary). // Config.Plugins's narrow niche is documented on the field itself. // // Failure strategy mirrors startConfigMCPServers: an empty Name produces // a config_plugin_skipped event and is discarded; a Host.Load failure // (rare) produces a config_plugin_load_failed event and is non-fatal -- // a broken entry must not prevent the engine from starting or other // entries from registering. A summary config_plugins_loaded event fires // at the end so regression tests can cheaply verify the field was read. // // loadConfigPlugins 把 engine.Config.Plugins 里声明的定义通过 // plugin.Host.Load 注册到 host registry. 仅由 engine.New 调一次, // 位置在三次 syncPlugin* 之前, 保证 hooks / tools / MCPServers 扫描 // registry 时看到的是一份稳定 snapshot. // // 范围提醒: Host.Load 是 "metadata-only 占位" API -- 只写入 Name / // Description / Source 的 Plugin entry (不解析 skills / hooks / tools / // MCPServers). 要解析走 engine.LoadPlugin (磁盘 plugin.json) 或 // plugin.Host.RegisterBuiltin (内嵌二进制). Config.Plugins 的窄场景见 // 字段自身注释. // // 失败策略对齐 startConfigMCPServers: 空 Name 触发 config_plugin_skipped // 并丢弃; Host.Load 失败 (罕见) 触发 config_plugin_load_failed 且不 // fatal -- 坏 entry 不能挡引擎启动或其他 entry 注册. 末尾发一次 // config_plugins_loaded 汇总事件, 让 regression test 以低成本验证字段 // 被真正读过. func (e *Engine) loadConfigPlugins() { if e.plugins == nil || len(e.cfg.Plugins) == 0 { return } loaded := 0 for _, def := range e.cfg.Plugins { if def.Name == "" { if e.observer != nil { e.observer.Event("config_plugin_skipped", map[string]any{ "reason": "empty Name", }) } continue } if err := e.plugins.Load(def); err != nil { if e.observer != nil { e.observer.Event("config_plugin_load_failed", map[string]any{ "plugin": def.Name, "error": err.Error(), }) } continue } loaded++ } if e.observer != nil { e.observer.Event("config_plugins_loaded", map[string]any{ "plugin_count": loaded, }) } } // startConfigMCPServers launches MCP servers declared in engine.Config.MCPServers // and registers each discovered tool as an mcpProxyTool in the engine's // tool registry. Called exactly once from engine.New. // // Relationship with syncPluginMCPServers: both subsystems share the same // mcp.Manager and tools.Registry but live on disjoint key namespaces // ("config." vs "plugin."). Neither path inspects the other's keys, so // plugin reload / enable / disable never affects config-owned servers // and vice versa. // // Why not "sync" semantics like plugin: engine.Config.MCPServers is // captured by value in Config at construction and never mutated (SDK // consumers don't re-configure mid-flight). A one-shot start is the // simpler fit. Lifecycle cleanup happens implicitly via engine.Close -> // mcpMgr.CloseAll which closes all keys including "config." ones. // // Failure strategy mirrors syncPluginMCPServers: single-server failures // are observer events, not fatal -- a broken server must not prevent the // engine from starting or other servers from loading. // // startConfigMCPServers 启动 engine.Config.MCPServers 里声明的 MCP server, // 把每个发现的 tool 包成 mcpProxyTool 注册到引擎 tool registry. 仅由 // engine.New 调一次. // // 跟 syncPluginMCPServers 的关系: 两个子系统共用 mcp.Manager 和 // tools.Registry, 但 key 命名空间隔开 ("config." vs "plugin."), 互不 // inspect 对方的 key, plugin reload/enable/disable 不影响 config-owned // server, 反之亦然. // // 为啥不学 plugin 写成 "sync" 语义: engine.Config.MCPServers 在 Config // 构造时 by-value 捕获, 之后不变 (SDK 消费方不会中途 reconfigure). // 一次性 start 是更简单的匹配. 生命周期收尾由 engine.Close -> // mcpMgr.CloseAll 一并关, 包括 "config." 前缀的 key. // // 失败策略对齐 syncPluginMCPServers: 单个 server 失败是 observer 事件, // 不 fatal -- 坏 server 不能挡引擎启动或其他 server 加载. func (e *Engine) startConfigMCPServers() { if e.tools == nil || e.mcpMgr == nil || len(e.cfg.MCPServers) == 0 { return } connected := 0 registered := 0 for _, serverCfg := range e.cfg.MCPServers { if serverCfg.Name == "" { if e.observer != nil { e.observer.Event("config_mcp_skipped", map[string]any{ "reason": "empty Name", }) } continue } key := configMCPServerKey(serverCfg.Name) // Rewrite Name to match the Manager key for internal consistency // (plugin path does the same). The caller's original Name is // retained in observer events for debugging. // // 把 Name 改写成 Manager key, 保持内部一致 (plugin 路径也这样做). // 调用方原始 Name 在 observer 事件里仍保留, 用于排查. connectCfg := serverCfg connectCfg.Name = key if err := e.mcpMgr.ConnectOne(key, connectCfg); err != nil { if e.observer != nil { e.observer.Event("config_mcp_connect_failed", map[string]any{ "server": serverCfg.Name, "transport": serverCfg.Transport, "error": err.Error(), }) } continue } connected++ // Discover tools and wrap each in an mcpProxyTool. // 发现 tool 并逐个包成 mcpProxyTool. client, ok := e.mcpMgr.GetClient(key) if !ok { continue // disappeared between ConnectOne and GetClient (shouldn't happen) } toolList, err := client.ListTools() if err != nil { if e.observer != nil { e.observer.Event("config_mcp_list_tools_failed", map[string]any{ "server": serverCfg.Name, "error": err.Error(), }) } continue } for _, mt := range toolList { proxy := &mcpProxyTool{ name: mcpProxyToolName("config", serverCfg.Name, mt.Name), description: mt.Description, schema: mt.InputSchema, mgrServerKey: key, toolName: mt.Name, // pluginName is a sentinel "config" on this path -- the // field is only consumed by observer events for grouping, // and "config" distinguishes these from any real plugin. // pluginName 在这条路径是 sentinel "config" -- 此字段只被 // observer 事件用来分组, "config" 作哨兵, 跟任何真的 plugin // 名字都能区分. pluginName: "config", mgr: e.mcpMgr, } if err := e.tools.Register(proxy); err != nil { if e.observer != nil { e.observer.Event("config_mcp_tool_register_failed", map[string]any{ "server": serverCfg.Name, "tool": mt.Name, "error": err.Error(), }) } continue } registered++ } } if e.observer != nil { e.observer.Event("config_mcp_servers_started", map[string]any{ "server_count": connected, "tool_count": registered, }) } } // syncPluginMCPServers starts / reconnects the MCP servers declared by all // enabled plugins (plugin.json "mcpServers" field) and registers each // discovered tool as an mcpProxyTool in the engine's tools.Registry. // // Design mirrors syncPluginHooks / syncPluginTools: // 1. Clean-slate step: close all plugin-owned MCP servers (keys with the // pluginMCPServerKeyPrefix prefix) and unregister their proxy tools. // User-configured MCP servers from settings.json (if any were wired via // a separate path) are untouched because their keys lack the prefix. // 2. Rebuild step: walk Host.GetAllMCPServers(), which returns servers only // from enabled plugins. For each, convert MCPServerDef -> config. // MCPServerConfig, call mcpMgr.ConnectOne with the namespaced key, // then list tools and register proxies. // // Failure strategy: single-server failures are observer events, not fatal. // A broken MCP server must not prevent other plugins' servers from loading // (matches hooks/declarative-tool degrade behavior). Rationale: in a // multi-tenant SaaS deployment a third-party plugin shouldn't be able to // take the whole agent down by shipping a broken MCP server. // // Call sites (same as syncPluginHooks/syncPluginTools): // - Engine.New() after initial plugin hooks + tools sync // - LoadPlugin() after a new plugin directory is loaded // - EnablePlugin() when a previously disabled plugin is re-enabled // // DisablePlugin takes a different path: it calls shutdownPluginMCPServers // directly to kill just the offending plugin's servers, rather than // running the full rebuild. // // Why not per-plugin diffing: same reason syncPluginTools doesn't diff - // the set of servers per engine is typically <20, a clean-slate rebuild is // simpler and avoids "partially migrated" intermediate states. Diff would // matter at 1000+ servers, which is not the target scale. // // Clever (CLEVER): discovery uses mcpMgr.GetClient(key).ListTools() rather // than mcpMgr.AllTools(). AllTools prefixes tool names with "mcp__srv__" // for the entire manager, which would force us to reverse-parse the prefix // to recover (plugin, server, tool) triples. GetClient + ListTools gives // us raw MCPTool values directly - cleaner and O(N) with N = tools per // server (AllTools scans all servers including user-configured ones). func (e *Engine) syncPluginMCPServers() { if e.tools == nil || e.plugins == nil || e.mcpMgr == nil { return } // Step 1: clean slate for plugin-owned keys + their proxy tools. // We collect keys first, release any implicit locks inside // CloseOne, then iterate. for _, key := range e.mcpMgr.ClientNames() { pluginName, serverName, ok := parsePluginMCPServerKey(key) if !ok { continue // user-configured server, not ours to touch } // Unregister any proxy tools this server previously exposed. // We cannot cheaply enumerate them from the registry, so we // rely on ListTools from the live client before closing. if client, alive := e.mcpMgr.GetClient(key); alive { if toolList, err := client.ListTools(); err == nil { for _, mt := range toolList { name := mcpProxyToolName(pluginName, serverName, mt.Name) _ = e.tools.Unregister(name) } } } if err := e.mcpMgr.CloseOne(key); err != nil && e.observer != nil { e.observer.Event("plugin_mcp_close_failed", map[string]any{ "plugin": pluginName, "server": serverName, "error": err.Error(), }) } } // Step 2: rebuild from enabled plugins. defs := e.plugins.GetAllMCPServers() connected := 0 registered := 0 for _, def := range defs { key := pluginMCPServerKey(def.PluginName, def.Name) cfg := config.MCPServerConfig{ Name: key, Transport: def.Transport, Command: def.Command, Args: def.Args, URL: def.URL, Env: def.Env, } if err := e.mcpMgr.ConnectOne(key, cfg); err != nil { if e.observer != nil { e.observer.Event("plugin_mcp_connect_failed", map[string]any{ "plugin": def.PluginName, "server": def.Name, "transport": def.Transport, "error": err.Error(), }) } continue } connected++ // Discover tools and wrap each in an mcpProxyTool. client, ok := e.mcpMgr.GetClient(key) if !ok { continue // disappeared between ConnectOne and GetClient (shouldn't happen) } toolList, err := client.ListTools() if err != nil { if e.observer != nil { e.observer.Event("plugin_mcp_list_tools_failed", map[string]any{ "plugin": def.PluginName, "server": def.Name, "error": err.Error(), }) } continue } for _, mt := range toolList { proxy := &mcpProxyTool{ name: mcpProxyToolName(def.PluginName, def.Name, mt.Name), description: mt.Description, schema: mt.InputSchema, mgrServerKey: key, toolName: mt.Name, pluginName: def.PluginName, mgr: e.mcpMgr, } if err := e.tools.Register(proxy); err != nil { if e.observer != nil { e.observer.Event("plugin_mcp_tool_register_failed", map[string]any{ "plugin": def.PluginName, "server": def.Name, "tool": mt.Name, "error": err.Error(), }) } continue } registered++ } } if e.observer != nil { e.observer.Event("plugin_mcp_servers_synced", map[string]any{ "server_count": connected, "tool_count": registered, }) } } // shutdownPluginMCPServers kills the MCP server subprocesses owned by a // specific plugin and unregisters their proxy tools from tools.Registry. // Called by DisablePlugin. // // Idempotent: if the plugin never had MCP servers, or they are already // gone, this is a no-op. Errors from CloseOne are reported via observer // but do not block plugin disable - a stuck MCP subprocess must not // prevent the admin from turning a plugin off. func (e *Engine) shutdownPluginMCPServers(pluginName string) { if e.mcpMgr == nil { return } for _, key := range e.mcpMgr.ClientNames() { owner, serverName, ok := parsePluginMCPServerKey(key) if !ok || owner != pluginName { continue } // Unregister proxy tools before closing the client so an // in-flight registry lookup can't grab a proxy pointing at // a dead subprocess. if client, alive := e.mcpMgr.GetClient(key); alive && e.tools != nil { if toolList, err := client.ListTools(); err == nil { for _, mt := range toolList { _ = e.tools.Unregister(mcpProxyToolName(pluginName, serverName, mt.Name)) } } } if err := e.mcpMgr.CloseOne(key); err != nil && e.observer != nil { e.observer.Event("plugin_mcp_close_failed", map[string]any{ "plugin": pluginName, "server": serverName, "error": err.Error(), }) } } } // isValidPluginHookType 验证来自 plugin.json 的 hook 类型字符串是否合法. // 防止插件通过拼写错误的 hook 类型名绕过 Manager 的类型检查. // // 精妙之处(CLEVER): 复用 hooks.AllHookTypes() 而非硬编码字符串 set-- // 新增 hook 类型时自动覆盖,不需要同步修改此处. func isValidPluginHookType(hookType hooks.HookType) bool { for _, t := range hooks.AllHookTypes() { if t == hookType { return true } } return false } // observerCloser 是可关闭的 Observer 接口. // 精妙之处(CLEVER): 用接口检测而非硬类型断言-- // BufferedObserver,自定义 Observer,mock 都能被识别, // 不需要为每种 Observer 类型写单独的 if 分支. type observerCloser interface { Close() } // closeObservers 关闭 observer 及其内部可能包含的子 observer. func closeObservers(ctx context.Context, obs EventObserver, errs *[]string) { // 直接实现 Close() 的 observer if closer, ok := obs.(observerCloser); ok { if err := closeWithTimeout(ctx, "observer", func() error { closer.Close() return nil }); err != nil { *errs = append(*errs, err.Error()) } } // CompositeObserver 内部可能有可关闭的子 observer if comp, ok := obs.(*CompositeObserver); ok { for _, inner := range comp.Observers() { if closer, ok := inner.(observerCloser); ok { if err := closeWithTimeout(ctx, "composite_observer", func() error { closer.Close() return nil }); err != nil { *errs = append(*errs, err.Error()) } } } } } // closeWithTimeout 在超时保护下执行关闭操作. // 精妙之处(CLEVER): 每个资源的关闭都有独立超时-- // 一个挂起的资源不会阻塞后续资源的清理. // 早期方案 failsafe 是全局的(一个超时管所有),我们更精细. func closeWithTimeout(ctx context.Context, name string, fn func() error) error { done := make(chan error, 1) go func() { done <- fn() }() select { case err := <-done: return err case <-ctx.Done(): return fmt.Errorf("close %s: timeout", name) } } // registerBuiltinTools 注册所有内置工具. // 对应原项目中 tools/ 目录下的 40+ 个工具定义. // fileCache 注入到 FileReadTool 和 FileEditTool,用于在文件读取/编辑后记录缓存. // sp 注入到 Scratchpad 三件套工具(模块 18.3),可以是 *Scratchpad 或 *FileScratchpad. func registerBuiltinTools(r *tools.Registry, cfg *Config, fileCache *FileStateCache, fileHistory *FileHistory, sp builtin.ScratchpadStore) error { // cfg.SecretGuard 在 engine.New() 中已完成初始化(nil → 自动注入 DefaultSecretGuard). // 此处直接使用,不重复初始化--避免丢失 engine.New() 注入的 OnBlocked 回调. guard := cfg.SecretGuard // 创建共享的任务存储 taskStore := builtin.NewTaskStore() // 创建后台 Bash 任务存储(供 BashTool 自动后台化使用). // 精妙之处(CLEVER): bgStore 与 taskStore 分离-- // taskStore 是 Agent 任务追踪(TaskCreate/TaskList/TaskUpdate 工具的数据源), // bgStore 是 Bash 后台任务存储(run_in_background + 自动后台化). // 两者职责不同,共用一个 store 会导致任务 ID 命名空间混乱. bgStore := builtin.NewBackgroundTaskStore() // 构建所有可用的内置工具 allTools := []tools.Tool{ // 升华改进(ELEVATED): 主 agent 使用 NewBashToolMainAgent,启用 15s 自动后台化-- // 命令阻塞超过 15s 自动转后台,主 agent 保持响应性. // subagent 通过 SubAgentConfig 注入的工具列表会使用 NewBashToolWithStores(不含 isMainAgent), // 不触发自动后台化(subagent 专注单一任务,阻塞是正常的). builtin.NewBashToolMainAgent(cfg.Cwd, bgStore, taskStore, cfg.Executor), builtin.NewFileReadToolWithCache(fileCache), builtin.NewFileEditToolWithGuard(fileCache, fileHistory, cfg.Cwd, guard), builtin.NewFileWriteToolWithGuard(fileHistory, guard), builtin.NewGlobTool(cfg.Executor), builtin.NewGrepTool(cfg.Executor), builtin.NewWebFetchTool(), builtin.NewWebSearchTool(), builtin.NewAgentTool(r), builtin.NewSkillTool(), builtin.NewTaskCreateTool(taskStore), builtin.NewTaskListTool(taskStore), builtin.NewTaskUpdateTool(taskStore), // MonitorTool: nil InboxServer = 静默成功(UDS 未启用时降级). // New() 末尾若 EnableUDSInbox=true,会重新注册带有真实 UDSServer 的版本. builtin.NewMonitorTool(nil), // Scratchpad 三件套(模块 18.3):键值暂存区,跨轮次存储中间状态. // sp 由 Engine.New() 初始化并注入,三个工具共享同一实例. // 精妙之处(CLEVER): 接口注入解循环--builtin 包定义 ScratchpadStore 接口, // *Scratchpad 实现该接口,注入时无需引入 engine 包(避免循环导入). builtin.NewScratchpadWriteTool(sp), builtin.NewScratchpadReadTool(sp), builtin.NewScratchpadListTool(sp), // Gitignore:管理 .gitignore 文件(add/check 两种操作). builtin.NewGitignoreTool(cfg.Cwd), // ToolSearch:搜索并激活延迟加载的工具. // 历史包袱(LEGACY): 当前传 nil(DeferredRegistry 未启用), // 工具可用但搜索结果为空(返回"all tools active"). // 待延迟加载机制上线后改为传真实 DeferredRegistry 实例. builtin.NewToolSearchTool(nil), // send_message: Agent Teams peer-to-peer 通讯工具. // 工具本身无状态, sender 通过 ctx 注入 (由 engine.Team.runWorker 绑定). // 在非 Team 场景调用返回 "not in a Team" 错误, 对 SDK 用户零副作用. // 跨行业中立 -- 消息协议不假设任何领域, 编程/金融/医疗/仓储通用. builtin.NewSendMessageTool(), // shared_task_* 四件套: Agent Teams 共享任务清单工具. // TaskListProvider 通过 ctx 注入 (由 engine.Team.runWorker / ContextForLeader 绑定). // 非 Team 场景返回错误, 对 SDK 用户零副作用. // // 命名带 "shared_" 前缀, 和引擎内部 Task 工具 (task_create/task_list/ // task_update, 见 task.go) 区分: task_* 是单 Agent 本地待办, // shared_task_* 是 Team 级跨 Agent 共享清单. // // 跨行业中立 -- 任务模型 (subject/description/status/claimed_by/result) // 覆盖编程/金融/医疗/仓储/法律场景共性结构. builtin.NewAddSharedTaskTool(), builtin.NewListSharedTasksTool(), builtin.NewClaimSharedTaskTool(), builtin.NewCompleteSharedTaskTool(), } // 如果 cfg.Tools 非空,只注册指定的工具 var allowed map[string]bool if len(cfg.Tools) > 0 { allowed = make(map[string]bool, len(cfg.Tools)) for _, name := range cfg.Tools { // 统一转小写比较 allowed[strings.ToLower(name)] = true } } for _, t := range allTools { if allowed != nil { name := strings.ToLower(t.Name()) // 也检查别名 match := allowed[name] if !match { meta := tools.GetMetadata(t) for _, alias := range meta.Aliases { if allowed[strings.ToLower(alias)] { match = true break } } } if !match { continue } } if err := r.Register(t); err != nil { return fmt.Errorf("register %s: %w", t.Name(), err) } } return nil } // blockState 追踪 SSE 流中正在构建的 content block 的状态. type blockState struct { blockType string // "text", "thinking", "tool_use" id string // tool_use ID name string // tool_use name text string // 累积的文本内容 partialJSON string // 累积的 tool_use JSON 输入 } // runConfig 是 Run 方法的可选配置. type runConfig struct { messages []query.Message // 预置的消息历史 model string // 覆盖默认模型 maxTurns int // 覆盖默认最大轮次(0 = 使用引擎默认值) bundleKey *agentctx.BundleKey // per-request Bundle key 覆盖(nil = 使用 resolveBundleKey()) // isSkillPrompt 标志:当前 Run 的 prompt 是通过 /skill-name 展开的(14.P1-B). // // 升华改进(ELEVATED): 当用户输入 "/skill-name [args]" 且匹配已注册 Skill 时, // 引擎内部展开 Skill 提示词并设置此标志,而不是发出 SlashCommandEvent. // 消费层无感知(Run channel 照常返回),但此标志可用于: // - 审计日志区分"普通对话"vs"技能调用" // - 未来的专用 token budget 调度(Skill 轮次可分配更多 tokens) // - 跳过某些上下文压缩(Skill 提示词保持完整语义) // 替代方案:让消费层在调用 Run 前自己展开-- // 否决:消费层需要感知 SkillRegistry,破坏封装;审计日志无法统一追踪. isSkillPrompt bool // sessionHooks 会话级 Hook 注册表(可选,14.P1-C). // // 升华改进(ELEVATED): 早期实现 的 hook 系统是进程全局的, // 在 HTTP API / 多租户服务器模式下,一个请求注册的 hook 会影响所有请求. // 我们在 runConfig 引入 session-level hooks,实现按请求隔离: // - sessionHooks 仅在当前 Run() 调用期间生效 // - 与 Engine.hooksMgr(引擎级全局 hooks)组合为 effectiveHooks // - session-level 先触发,engine-level 后触发(精确控制优先级) // - nil 时行为与当前完全一样(向后兼容) // 替代方案:给 Engine.Run 加 context.WithValue(sessionHooksKey, hooks) -- // 否决:context.Value 是弱类型,容易出现 key 冲突,可读性差. sessionHooks *hooks.Manager // extraSecrets per-request 凭据列表(WithSecret RunOption 填充). // runLoop 将其与引擎级 e.secrets 合并为 effectiveSecrets, // 合并只在本次 Run 的 goroutine 内发生,不修改引擎级 store. extraSecrets []secretEntry // checkpointHandler 不可逆操作确认回调(WithCheckpointHandler RunOption 填充). // nil 时采用 deny-safe 默认行为(直接拒绝所有 RequiresCheckpoint=true 的工具). checkpointHandler CheckpointHandlerFn } // RunOption 是 Run 方法的选项函数. type RunOption func(*runConfig) // WithMessages 预置消息历史(用于会话恢复). func WithMessages(msgs []query.Message) RunOption { return func(c *runConfig) { c.messages = msgs } } // WithModel 覆盖本次 Run 使用的模型. func WithModel(model string) RunOption { return func(c *runConfig) { c.model = model } } // WithMaxTurns 覆盖本次 Run 的最大对话轮次. // 0 表示使用引擎配置的默认值(Config.MaxTurns). // 适用于 HTTP API 模式:前端可以按请求指定轮次上限,而无需重启服务器. func WithMaxTurns(n int) RunOption { return func(c *runConfig) { c.maxTurns = n } } // WithSessionHooks 为本次 Run 注册会话级 Hook(14.P1-C Session-scoped Hook Registry). // // 升华改进(ELEVATED): 多租户安全隔离--session hooks 只在当前 Run 中生效, // 与 Engine 级全局 hooks 组成 effectiveHooks,session 级优先触发. // nil 参数静默忽略(向后兼容). // // 典型用法(HTTP API / SDK 嵌入): // // perRequestHooks := hooks.NewManager(nil) // perRequestHooks.Register(hooks.HookPreToolUse, hooks.HookDef{ // Handler: hooks.NewCallbackHandler(myTenantAuditFn), // }) // engine.Run(ctx, prompt, engine.WithSessionHooks(perRequestHooks)) func WithSessionHooks(m *hooks.Manager) RunOption { return func(c *runConfig) { c.sessionHooks = m } } // WithBundleKey 为本次 Run 覆盖使用的 PromptBundle key. // // 升华改进(ELEVATED): 早期实现 没有任何运行时切换提示词 Bundle 的机制-- // 系统提示词完全由编译时确定,无法 per-request 切换. // 我们允许 SaaS 场景下同一个 Engine 实例在不同请求间使用不同 Bundle: // - 编程工作台请求 → WithBundleKey({ModelFamily: "claude", Scenario: "programming"}) // - 仓储调度请求 → WithBundleKey({ModelFamily: "claude", Scenario: "warehouse"}) // - 通用助手请求 → WithBundleKey({ModelFamily: "claude", Scenario: "general"}) // // 不传此选项时使用 Config.Scenario + Config.ModelFamily 推断(原有行为不变). // 对应 Bundle 未注册时,BundleRegistry.Resolve 自动回退到默认 Bundle(claude+programming), // 行为安全,不会 panic. // // 典型用法(HTTP API 服务): // // http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { // scenario := r.Header.Get("X-Flyto-Scenario") // "warehouse" // key := engine.BundleKeyFor("claude", scenario) // for ev := range eng.Run(ctx, prompt, engine.WithBundleKey(key)) { // // ... // } // }) // // 替代方案:<为每个场景创建独立的 Engine 实例> // - 否决原因:每个 Engine 持有独立的 sectionRegistry / sessionManager / tools 等, // 内存开销随场景数线性增长;per-request RunOption 开销近乎零. func WithBundleKey(key agentctx.BundleKey) RunOption { return func(c *runConfig) { c.bundleKey = &key } } // BundleKeyFor 是构建 BundleKey 的便捷函数,减少调用方 import agentctx 的需要. // // 精妙之处(CLEVER): 封装 agentctx.BundleKey 的构造,让 HTTP handler 等消费层 // 无需显式 import flyto-agent/pkg/context 包--只需 import engine 包即可. // 减少消费层的直接依赖深度,降低未来包路径重构的影响范围. func BundleKeyFor(modelFamily, scenario string) agentctx.BundleKey { return agentctx.BundleKey{ModelFamily: modelFamily, Scenario: scenario} } // --------------------------------------------------------------------------- // 凭据注入 API(模块 P0 SetSecret) // --------------------------------------------------------------------------- // SetSecret 注册引擎级别的凭据. // // 注册后,凭据在所有后续 Run() 中生效: // 1. 注入 Bash 工具子进程的 env var(工具脚本可通过 $name 引用) // 2. 对所有工具输出进行 value-level 脱敏(将明文替换为 [SECRET:name]) // // name 建议使用大写(如 "DB_PASSWORD")以符合 env var 命名惯例. // value 必须至少 8 字节(防止超短 value 引发大量误替换). // // 升华改进(ELEVATED): 早期实现 只有静态的 GHA_SUBPROCESS_SCRUB 白名单, // 无法处理用户自定义凭据.我们允许动态注册任意 secret, // 引擎自动在全部工具输出和 Bash 子进程 env 中处理,消费层无需感知. // // 典型用法(SDK 嵌入): // // eng.SetSecret("DB_PASSWORD", os.Getenv("DB_PASSWORD")) // eng.SetSecret("MY_API_KEY", os.Getenv("MY_API_KEY")) // for ev := range eng.Run(ctx, prompt) { ... } // // 若需要 per-request 隔离(HTTP API / 多租户),使用 WithSecret RunOption 代替. func (e *Engine) SetSecret(name, value string) error { return e.secrets.Add(name, value) } // Redact 将字符串中所有已注入的凭据值替换为 `[credential:name]`. // // 消费层自定义日志组件应在写出任何可能含凭据的字符串前调用此方法. // 引擎内部日志路径已自动调用,此方法供消费层在引擎外部的日志代码使用. func (e *Engine) Redact(s string) string { return e.secrets.Redact(s) } // WithSecret 为本次 Run 注入 per-request 凭据. // // 升华改进(ELEVATED): HTTP API 多租户场景下,不同租户的凭据必须请求级隔离-- // 用 WithSecret 注入的 secret 只在本次 Run 期间的工具调用中有效, // 不写入引擎级 SecretStore,不影响其他并发请求. // // 与 SetSecret 的区别: // - SetSecret:引擎级,所有 Run 共享,适合单租户 SDK 嵌入 // - WithSecret:请求级,本次 Run 独占,适合多租户 HTTP API // // 典型用法(HTTP API 服务): // // http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { // tenantToken := r.Header.Get("X-Tenant-Token") // 从请求头取租户凭据 // for ev := range eng.Run(ctx, prompt, engine.WithSecret("TENANT_TOKEN", tenantToken)) { // // ... // } // }) func WithSecret(name, value string) RunOption { return func(c *runConfig) { c.extraSecrets = append(c.extraSecrets, secretEntry{name: name, value: value}) } } // --------------------------------------------------------------------------- // 不可逆操作确认 API(模块 P0 CheckpointEvent) // --------------------------------------------------------------------------- // CheckpointHandlerFn 是 CheckpointEvent 的处理回调. // // 引擎在执行声明了 RequiresCheckpoint=true 的工具前同步调用此函数. // 返回 true = 允许执行;返回 false = 拒绝执行(工具调用中止,模型收到拒绝消息). // // 实现约定: // - 必须线程安全(可能被并发工具调用触发) // - 应快速返回(阻塞会影响 runLoop 响应性) // - panic 会被引擎 recover,视为返回 false(deny-safe) // // 默认行为(未注册时):拒绝(false). // deny-safe 原则:宁可误拒绝一个操作,也不可在用户不知情的情况下执行不可逆操作. type CheckpointHandlerFn func(evt CheckpointEvent) bool // WithCheckpointHandler 为本次 Run 注册不可逆操作确认回调. // // 升华改进(ELEVATED): 早期实现 的权限确认依赖 React UI 组件,无法在 SDK/API 模式使用. // 我们将确认逻辑抽象为函数回调,任何消费层都可以提供实现: // - TUI:阻塞读取用户键盘输入 // - HTTP API:通过外部信号量或 webhook 异步收集 ACK // - 测试:直接返回 true/false // - 自动化 CI:策略引擎(根据工具名和参数决策) // // fn 为 nil 时静默忽略(等同于未注册,保持 deny-safe 默认行为). // // 典型用法(SDK 嵌入): // // eng.Run(ctx, prompt, engine.WithCheckpointHandler(func(evt engine.CheckpointEvent) bool { // fmt.Printf("⚠️ 工具 %q 请求执行不可逆操作,参数:%v\n继续?[y/N] ", evt.ToolName, evt.Input) // var s string // fmt.Scan(&s) // return s == "y" || s == "Y" // })) func WithCheckpointHandler(fn CheckpointHandlerFn) RunOption { return func(c *runConfig) { if fn != nil { c.checkpointHandler = fn } } } // apiRetryConfig 是 API 调用重试配置. type apiRetryConfig struct { maxRetries int // 最大重试次数 baseDelay time.Duration // 基础退避延迟 maxDelay time.Duration // 最大退避延迟 } // defaultRetryConfig 返回默认重试配置. func defaultRetryConfig() *apiRetryConfig { return &apiRetryConfig{ maxRetries: 3, baseDelay: 1 * time.Second, maxDelay: 30 * time.Second, } } // isRetryableError 判断 API 错误是否可重试. // 429 (rate limit) 始终可以重试. // 529 (overloaded) 只对前台请求重试(防止级联雪崩). func isRetryableError(errStr string) bool { return isRetryableErrorForSource(errStr, SourceMainThread) } // isRetryableErrorForSource 根据请求来源判断 API 错误是否可重试. // 529 只对前台请求重试;429 始终重试. func isRetryableErrorForSource(errStr string, source QuerySource) bool { if strings.Contains(errStr, "HTTP 429") { return true } if strings.Contains(errStr, "HTTP 529") { // 529 只对前台请求重试,后台请求直接返回错误 return source.IsForeground() } return false } // isMaxOutputTokensError 判断是否为 max_output_tokens 错误. // 当模型输出被截断时,需要增大 max_tokens 重试. func isMaxOutputTokensError(stopReason string) bool { return stopReason == "max_tokens" } // isPartialStream 检测 proxy 截断(partial-stream)条件. // // 触发条件全部满足时才视为 partial-stream: // 1. receivedMessageStart:HTTP 层面 API 请求成功,开始发送 message_start 事件 // 2. !hasAnyContentBlock:没有收到任何 content_block_start 事件(消息体被截断) // 3. stopReason == "":没有收到 message_delta 中的 stop_reason(流未正常结束) // // 精妙之处(CLEVER): 三个条件的交集精确定位 proxy 截断-- // - 仅 !receivedMessageStart:HTTP 连接失败(StreamGuard 处理) // - 仅 !hasAnyContentBlock:可能是合法的空回复(极少见,stop_reason 会是 end_turn) // - 仅 stopReason == "":流尚在传输中(中间状态,不是错误) // 只有三者同时为真,才能确认是"连接成功但内容被截断"这一特定场景. func isPartialStream(receivedMessageStart, hasAnyContentBlock bool, stopReason string) bool { return receivedMessageStart && !hasAnyContentBlock && stopReason == "" } // retryDelay 计算指数退避延迟. // delay = baseDelay * 2^attempt,上限为 maxDelay. func retryDelay(attempt int, cfg *apiRetryConfig) time.Duration { delay := cfg.baseDelay * time.Duration(math.Pow(2, float64(attempt))) if delay > cfg.maxDelay { delay = cfg.maxDelay } return delay } // runLoop 是核心查询循环. // 核心查询循环, // 但结构更清晰:每一轮是一个独立的函数调用. // // 流程: // 1. 使用 context.Builder 构建系统提示(替代硬编码) // 2. 构建用户消息 // 3. 调用 API(流式),支持重试和 max_output_tokens 恢复 // 4. 处理流式事件 -> 推送 TextDeltaEvent / ThinkingDeltaEvent / ToolUseEvent // 5. 如果有工具调用:执行工具 -> 追加结果 -> 回到步骤 3 // 6. 如果没有工具调用,会话结束 -> 推送 DoneEvent // 7. 检查 maxTurns 和 maxBudget 限制 // 8. 每轮结束后检查 token 用量,触发自动压缩 func (e *Engine) runLoop(ctx context.Context, prompt string, cfg *runConfig, ch chan<- Event) { // 14.P1-C:解析本次 Run 有效的 Hook 管理器. // // 升华改进(ELEVATED): hooks.ResolveHooksMgr 将 session-level 和 engine-level hooks 合并-- // session hooks 优先触发,engine hooks 后触发,实现多租户安全隔离. // cfg.sessionHooks=nil 时退化为 e.hooksMgr(向后兼容,零性能损耗). // 替代方案:<每个触发点手动判断 cfg.sessionHooks != nil>-- // 否决:12 处触发点全改,维护成本高,容易遗漏. effectiveHooks := hooks.ResolveHooksMgr(cfg.sessionHooks, e.hooksMgr) // 构建有效凭据 store(引擎级 + 本次请求级合并). // // 精妙之处(CLEVER): mergeSecrets 创建新实例而非修改 e.secrets-- // e.secrets 是多请求共享的;effectiveSecrets 是本次 runLoop goroutine 私有的. // 即使 e.secrets 在 run 期间被其他 goroutine 修改(SetSecret 调用), // 也不影响本次 run 的脱敏和 env 注入行为(快照语义). effectiveSecrets := mergeSecrets(e.secrets, cfg.extraSecrets) // 将 secret env vars 注入 context,Bash 工具子进程执行时从 ctx 读取 if envs := effectiveSecrets.Environ(); len(envs) > 0 { ctx = builtin.ContextWithSecretEnvs(ctx, envs) } // session_start hook(纯通知,不影响行为) if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookSessionStart) { env := hooks.BuildSessionEnv("", e.cfg.Cwd, hooks.HookSessionStart) effectiveHooks.ExecuteAsync(hooks.HookSessionStart, env) // 异步,不阻塞启动 } // session_end hook 在 defer 中触发(确保异常退出也能触发) defer func() { if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookSessionEnd) { env := hooks.BuildSessionEnv("", e.cfg.Cwd, hooks.HookSessionEnd) effectiveHooks.ExecuteAsync(hooks.HookSessionEnd, env) } }() // 创建查询链追踪(每次 Run 生成新链) chain := NewQueryChain() // api.Client 的创建已移至 EngineBackend.BuildAndStream() 内部, // 主循环不再持有 api.Client 引用. // 创建工具编排器(注入大结果处理器) orchestrator := tools.NewOrchestratorWithResultProcessor(e.tools, 10, e.resultStore) // 确定使用的模型 model := e.cfg.Model if cfg.model != "" { model = cfg.model } // 初始化模型降级追踪器 fallbackTracker := NewFallbackTracker(e.cfg.Fallback, model) // 1. 使用 context.Builder 构建系统提示(返回带缓存语义的块列表) // cfg.bundleKey 非 nil 时 per-request 覆盖 Bundle key(WithBundleKey RunOption) promptBlocks := e.buildSystemPromptWithContext(ctx, model, cfg.bundleKey) // 模块 22.2:Prompt Cache 内容哈希刷新 // 计算系统提示内容哈希,变化时重置 SectionCache. // // 精妙之处(CLEVER): 在 buildSystemPromptWithContext 之后立即检测-- // 此时 promptBlocks 已包含完整内容(工具描述,FLYTO.md,记忆等), // 任何块的变化都会导致 BlocksToString 的输出不同,哈希随之变化. // 对于首次 Run(tracker 无基准哈希),Check 必然返回 true-- // 触发 ResetSectionCache 确保下一轮重新渲染,建立新的缓存 slot. // // 注意:ResetSectionCache 重置的是 sectionRegistry(Section 计算缓存), // 不是 Anthropic 侧的缓存--Anthropic 侧由内容哈希自动判断是否命中. // 我们的 Reset 确保下次 buildSystemPromptWithContext 会生成内容一致的块, // 让 Anthropic 正确建立新 cache slot. if e.cfg.EnableCaching { promptContent := agentctx.BlocksToString(promptBlocks) if e.promptHashTracker.Check([]byte(promptContent)) { // 内容已变化,重置 SectionCache 确保下次完整重渲 // 历史包袱(LEGACY): ResetSectionCache 原本只在 /clear 和 /compact 时调用. // 这里额外在内容变化时调用,扩展了触发时机,但语义一致: // "内容可能变了,下次从头渲染". e.ResetSectionCache() // 重置后立即重新渲染,确保本轮 promptBlocks 与 sectionRegistry 同步 promptBlocks = e.buildSystemPromptWithContext(ctx, model, cfg.bundleKey) } } // 初始化上下文压缩器 // 升华改进(ELEVATED): 使用 TokenBudgetManager 计算压缩阈值, // 考虑 thinking 预算扣减和有效窗口计算,替代旧的固定偏移量. // 替代方案:agentctx.AutoCompactThreshold(model)(不考虑 thinking 预算). compactReg := e.cfg.ModelRegistry() var compactThreshold int if e.cfg.Thinking != nil && e.cfg.Thinking.BudgetTokens > 0 { compactThreshold = e.tokenBudget.AutoCompactThresholdWithThinking(model, e.cfg.Thinking.BudgetTokens) } else { compactThreshold = e.tokenBudget.AutoCompactThreshold(model) } // 校准器修正:若历史记录到 context_too_long,将 compactThreshold 按比例下调-- // 确保 ShouldCompact 在下次 Run 时提前触发,不再等到撞墙才压缩. // // 精妙之处(CLEVER): 按比例缩放而非直接用 effectiveWindow - buffer-- // tokenBudget 已经做了 thinking 预算扣减和 maxOutput 预留,这些计算对不同模型各不同. // 直接重算不如按比例缩放来得简单且保留 tokenBudget 的精细计算成果. // 例如:静态窗口=200000,校准窗口=180000(ratio=0.9),静态阈值=187000 → 校准后=168300. // 替代方案:<重新调用 tokenBudget 的 window 参数化版本> - 否决:接口未暴露,需要改动更多文件. { staticWindow := compactReg.ContextWindow(model) effectiveWindow := e.calibrator.EffectiveWindow(model, staticWindow) if effectiveWindow < staticWindow && staticWindow > 0 { ratio := float64(effectiveWindow) / float64(staticWindow) calibrated := int(float64(compactThreshold) * ratio) if calibrated < 10_000 { calibrated = 10_000 } compactThreshold = calibrated } } compressor := agentctx.NewCompressor(compactThreshold, e.cfg.Provider) // P0-1 修复:注入实例级上下文窗口和模型函数,消除全局可变状态. // 同进程多 Engine 实例各自持有独立函数,互不覆盖. compressor.SetContextWindowFn(func(m string) int { static := compactReg.ContextWindow(m) return e.calibrator.EffectiveWindow(m, static) }) compressor.SetCompactModelFn(func() string { return e.cfg.ModelForRole(config.RoleFast) }) // 跨进程持久化断路器状态--daemon 重启后不从零浪费 API 调用. // fail-open:NewFilePersister 失败时跳过持久化(不影响压缩功能). if cwd := e.cfg.Cwd; cwd != "" { if fp, err := agentctx.NewFilePersister("", cwd); err == nil { compressor.SetPersister(fp) } } // 重试配置 retryCfg := defaultRetryConfig() // 初始化系统提醒管理器 reminderSys := NewReminderSystem(e.fileCache, e.mem) reminderSys.freshnessConfig = e.cfg.FreshnessConfig // 初始化输入处理器 inputProc := NewInputProcessor(e.cfg.Cwd, e.fileCache) // 预处理用户输入(展开文件引用,检测图片/URL/斜杠命令) processed, procErr := inputProc.Process(prompt) if procErr != nil { ch <- newErrorEvent(WrapError(procErr, ErrInternal, "输入预处理失败")) return } // 14.P1-B:处理斜杠命令--Skill 命令引擎内展开,非 Skill 命令委派消费层. // // 升华改进(ELEVATED): 早期方案所有斜杠命令都委派消费层处理(SlashCommandEvent + return). // 我们在引擎内部截获"匹配已注册 Skill"的斜杠命令: // 1. 展开 Skill 提示词,替换原始 prompt(inline 展开) // 2. 设置 cfg.isSkillPrompt=true(用于审计/token budget 决策) // 3. 继续正常 runLoop(不 return,不发 SlashCommandEvent) // 非 Skill 的斜杠命令(/help,/clear 等)仍然走原路委派消费层. // // 精妙之处(CLEVER): skillHandled 标志控制分支-- // 不用 goto,避免跳过变量声明的编译错误(Go 的 goto 限制). // 单层 if-else 结构,代码路径清晰可测试. // 替代方案:<消费层自己展开,调用 Run 时传入展开后的 prompt>-- // 否决:消费层需要感知 SkillRegistry,破坏封装;审计日志无法统一追踪. if processed.SlashCommand != nil { skillHandled := false cmdName := processed.SlashCommand.Name // 检查斜杠命令是否匹配已注册的 Skill if e.skillRegistry != nil { if skill, ok := e.skillRegistry.Get(cmdName); ok { // 匹配到 Skill--在引擎内部展开 sessionID, _ := ctx.Value(ctxKeySessionID{}).(string) expandedPrompt := skill.ExpandPrompt(processed.SlashCommand.Args, sessionID) if expandedPrompt != "" { skillHandled = true cfg.isSkillPrompt = true // 发出 Skill 展开通知事件(消费层可选监听,用于 UI 提示) e.observer.Event("skill_expanded", map[string]any{ "skill": cmdName, "args": processed.SlashCommand.Args, "context_mode": string(skill.Context), }) // 用展开后的提示词继续(覆盖 prompt) prompt = expandedPrompt // 重新预处理展开后的提示词(可能含文件引用) if processed2, procErr2 := inputProc.Process(prompt); procErr2 == nil && processed2.SlashCommand == nil { processed = processed2 } } } } if !skillHandled { // 非 Skill 斜杠命令--委派消费层处理(原行为) ch <- &SlashCommandEvent{ Name: processed.SlashCommand.Name, Args: processed.SlashCommand.Args, } return } } // 使用处理后的文本(文件引用已展开) userPrompt := prompt if processed.Text != "" { userPrompt = processed.Text } // 防御点 d: 输入过大(>100K 字符)时截断并告警. // maxInputChars 常量定义在包级别,便于测试覆盖. if len(userPrompt) > maxInputChars { userPrompt = userPrompt[:maxInputChars] ch <- &WarningEvent{ Code: "input_truncated", Message: "input truncated to 100K characters", } } // 2. 构建消息历史 messages := make([]query.Message, 0) // 如果有预置消息历史(会话模式),先加入 if len(cfg.messages) > 0 { messages = append(messages, cfg.messages...) } // 加入用户消息(使用预处理后的文本) messages = append(messages, query.NewTextMsg(query.RoleUser, userPrompt)) // 构建工具定义列表 toolDefs := e.buildToolDefs(ctx) // Verbose trace: emit a detailed run-loop-started snapshot only when // the SDK consumer set Config.Verbose. Captures the initial state // (model / max_tokens ceiling / message count / tool count / prompt // length) that would normally get lost in non-verbose operational // logging. Using a distinct event keeps zero behavioral impact on // default observers while giving verbose mode a real first data point. // // Verbose trace: 仅当 SDK 消费方设 Config.Verbose 时才 emit 一条详细 // 的 run-loop-started 快照. 捕获初始状态 (model / max_tokens 上限 / // 消息数 / 工具数 / prompt 长度), 这些信息在非 verbose 的 operational // 日志里会丢. 用专属事件名让默认 observer 零行为影响, verbose 模式 // 则拿到一条真实的初始数据点. if e.cfg.Verbose && e.observer != nil { e.observer.Event("verbose_run_loop_started", map[string]any{ "model": model, "message_count": len(messages), "tool_count": len(toolDefs), "prompt_len": len(userPrompt), "max_turns": cfg.maxTurns, "is_skill_prompt": cfg.isSkillPrompt, }) } // 追踪统计 var totalInputTokens, totalOutputTokens int turnCount := 0 // 精妙之处(CLEVER): max_tokens 先低后高策略--首次请求用 8192(p99 覆盖), // 被截断后才升级到 64000.这个策略来自早期方案的经验数据: // 95% 的回复 < 4K tokens,只有极少数需要更多空间. // 低值启动避免了 API 按 max_tokens 预留资源带来的延迟和计费惩罚. const ( defaultMaxTokens = 8192 // 默认低值 escalatedMaxTokens = 64000 // 截断后升级值 ) currentMaxTokens := defaultMaxTokens maxTokensEscalated := false // 是否已升级过 max_tokens // 空响应兜底重试计数器:proxy 截断(partial-stream)最多重试 2 次. // 精妙之处(CLEVER): 与 max_tokens 升级计数器同级,在 for 循环外声明-- // 跨轮次共享,防止每轮重置后无限重试. // 如果放在内层会在每轮对话都重置为 0,proxy 截断可以无限触发. const maxPartialStreamRetries = 2 partialStreamRetries := 0 // context_too_long 强制压缩重试计数器--最多重试 2 次. // 防止无限循环:压缩后若仍然超限(极端长会话 + 极小上下文模型),不再重试. const maxContextTooLongRetries = 2 ctxTooLongRetries := 0 // 核心查询循环 for { // 检查上下文是否已取消 select { case <-ctx.Done(): ch <- newErrorEvent(WrapError(ctx.Err(), ErrInternal, "操作已取消")) return default: } turnCount++ // 7. 检查 maxTurns 限制(per-request 优先,回退到引擎全局配置) effectiveMaxTurns := cfg.maxTurns if effectiveMaxTurns <= 0 { effectiveMaxTurns = e.cfg.MaxTurns } if effectiveMaxTurns > 0 && turnCount > effectiveMaxTurns { engErr := NewEngineError(ErrMaxTurns, fmt.Sprintf("已达到最大对话轮次限制: %d 轮", effectiveMaxTurns), nil) engErr.Suggestion = fmt.Sprintf("当前限制为 %d 轮。增加 --max-turns 参数或设置为 0 取消限制", effectiveMaxTurns) ch <- newErrorEvent(engErr) return } // 推送轮次开始事件(含模型规范,消费者可据此渲染 context 进度条) ch <- &TurnStartEvent{ TurnNumber: turnCount, Model: model, ContextWindowTokens: e.cfg.ModelRegistry().ContextWindow(model), } // 收集系统提醒并注入到消息列表中 reminders := reminderSys.CollectReminders(ctx, messages, turnCount) if len(reminders) > 0 { reminderText := strings.Join(reminders, "\n") messages = append(messages, query.NewTextMsg(query.RoleUser, reminderText)) } // 注入 Team Worker 完成通知(来自并发 Worker 的 task-notification XML) // 升华改进(ELEVATED): 通知作为独立 user 消息注入,而非拼接到 reminder-- // 让 Leader 清楚地知道"哪些 Worker 已完成,状态是什么",而非埋在系统提醒里. // 替代方案:拼接到 reminder 文本(难以区分来源,模型可能忽略). if notifs := e.drainTeamNotifications(); len(notifs) > 0 { for _, xml := range notifs { messages = append(messages, query.NewTextMsg(query.RoleUser, xml)) } } // 注入 Agent Teams peer-to-peer 消息 (来自同伴 Worker 的 teammate-message XML). // 精妙之处(CLEVER): 复用 drainTeamNotifications 相同的注入模式 -- // 每轮开始 Poll incomingInbox (非阻塞, 最多取完当前积压), 全部包装成 XML // 加入 messages, 模型在本轮请求中自然读到. 与 task-notification 走同一注入 // 路径, XML tag 不同 ( vs ) 模型可区分. if e.incomingInbox != nil { for { msg, err := e.incomingInbox.Poll() if err != nil || msg == nil { break } // MsgPermissionRequest is the Worker→Leader permission bubble: // resolve via teamPermissionHandler (or auto-approve when nil) and // send MsgPermissionResponse back through teamRouter. The model // never sees these control-protocol messages -- they would only // confuse it (Leader can't grant permission via chat reply, only // via the consumer-layer handler). // // MsgPermissionRequest 是 Worker→Leader 权限冒泡: 通过 // teamPermissionHandler 决策 (nil 时自动批准), 经 teamRouter 发 // MsgPermissionResponse 回去. 模型不看这种 control 协议消息 -- // 看到了反而困惑 (Leader 无法用对话回复授权, 只能通过消费层 handler). if msg.Type == inbox.MsgPermissionRequest { e.handleWorkerPermissionRequest(ctx, msg) continue } xml := formatTeammateMessageXML(msg) messages = append(messages, query.NewTextMsg(query.RoleUser, xml)) if e.observer != nil { e.observer.Event("teammate_message_received", map[string]any{ "from": msg.From, "to": msg.To, "type": string(msg.Type), "message_id": msg.ID, "payload_size": len(msg.Payload), }) } } } // 8. 每轮开始前检查是否需要自动压缩 messages = e.maybeCompact(messages, compressor, ch) // 3. 调用模型 API(流式),带重试逻辑 // 精妙之处(CLEVER): 使用 buildAPIRequest 统一处理分块缓存-- // promptBlocks 的每个块按 CacheScope 附加 cache_control, // 静态块命中跨会话缓存,动态块命中会话级缓存,volatile 块不缓存. // // 请求构建已移至 EngineBackend.BuildAndStream() 内部. // pre_sampling hook:API 调用前同步触发,可阻止(exit 2 → 终止本轮). // // 升华改进(ELEVATED): 早期实现 没有 pre-sampling hook (只有 void post-sampling). // 我们在每轮 API 调用前插入同步检查点-- // CLI 用途:配额控制脚本,合规审计(每次 AI 调用都有记录) // SDK 用途:CallbackHandler 动态切换模型,注入 system prompt 修改 // 跨行业:医疗场景每次调用前检查患者数据权限 // 替代方案:<只在 Run() 入口前触发> - 否决:多轮对话第 2 轮以后无法感知. // // 精妙之处(CLEVER): 只在第一个 attempt (attempt==0) 触发 pre-sampling-- // 重试(attempt>0)是因为网络/限速临时失败,不是新的"采样决策", // 重复触发 pre-sampling 会让审计日志出现重复记录,让配额 hook 误扣多次. if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookPreSampling) { env := hooks.BuildPreSamplingEnv(model, turnCount, len(messages), e.cfg.Cwd) hookResults, _ := effectiveHooks.Execute(ctx, hooks.HookPreSampling, env) if hookResults != nil { // ParseStopHookResponse 复用 stop hook 语义:非零退出 = "阻止本次操作". // 精妙之处(CLEVER): 复用而非新建解析函数-- // pre_sampling 的"阻止"和 stop 的"阻止继续"语义完全一致: // 非零退出码 = hook 检测到应该停止的条件. // 新建 ParsePreSamplingResponse 会引入重复代码,且语义相同没有理由分开. shouldBlock, reason := hooks.ParseStopHookResponse(hookResults) if shouldBlock { e.observeWithChain("pre_sampling_blocked", chain, map[string]any{ "model": model, "turn": turnCount, "reason": reason, }) ch <- &WarningEvent{ Code: "pre_sampling_blocked", Message: fmt.Sprintf("pre_sampling hook 阻止了本轮 API 调用: %s", reason), } // 推送 DoneEvent 保证消费层不会永久等待,然后退出 Run ch <- &DoneEvent{Reason: "pre_sampling_blocked"} return } } } // 带重试的 API 调用 var streamCh <-chan flyto.Event var apiErr error for attempt := 0; attempt <= retryCfg.maxRetries; attempt++ { if attempt > 0 { // 指数退避等待 delay := retryDelay(attempt-1, retryCfg) // 通知消费层正在重试 ch <- &WarningEvent{ Code: "api_retry", Message: fmt.Sprintf("API 调用失败,将在 %.0f 秒后重试 (第 %d/%d 次)", delay.Seconds(), attempt, retryCfg.maxRetries), } select { case <-ctx.Done(): ch <- newErrorEvent(WrapError(ctx.Err(), ErrInternal, "操作已取消")) return case <-time.After(delay): } } apiCallStart := time.Now() if e.activity != nil { e.activity.Start(ActivityAPICall) } e.observeWithChain("api_call_start", chain, map[string]any{ "model": model, "turn": turnCount, "attempt": attempt, }) // 构建 responseFormat(用于 BuildAndStream) var responseFormat *flyto.ResponseFormat if len(e.cfg.JSONSchema) > 0 { responseFormat = &flyto.ResponseFormat{ Type: "json_schema", JSONSchema: e.cfg.JSONSchema, } } // 通过 EngineBackend 接口调用,屏蔽双路径差异 streamCh, apiErr = e.backend.BuildAndStream(ctx, model, currentMaxTokens, promptBlocks, messages, toolDefs, responseFormat, attempt) if apiErr == nil { if e.activity != nil { e.activity.Stop(ActivityAPICall) } break // 调用成功 } if e.activity != nil { e.activity.Stop(ActivityAPICall) } // API 调用失败时记录耗时 e.observeWithChain("api_call_failed", chain, map[string]any{ "model": model, "turn": turnCount, "attempt": attempt, "error": apiErr.Error(), "duration_ms": time.Since(apiCallStart).Milliseconds(), }) // 检查是否可重试 if !isRetryableError(apiErr.Error()) || attempt >= retryCfg.maxRetries { // 尝试模型降级 if fallbackModel, should := fallbackTracker.ShouldFallback(apiErr); should { fallbackTracker.RecordFallback() model = fallbackModel // model 局部变量已更新,下次 BuildAndStream(model,...) 会使用新模型. ch <- &WarningEvent{ Code: "model_fallback", Message: fmt.Sprintf("主模型不可用,已降级到备用模型: %s", fallbackModel), } break // 跳出重试循环,用新模型重新发起请求 } code := ClassifyAPIError(apiErr.Error()) // ErrContextTooLong 特殊处理:跳出重试循环,在下方统一处理 if code == ErrContextTooLong { break } engErr := WrapError(apiErr, code, fmt.Sprintf("API 调用在 %d 次尝试后失败", attempt+1)) ch <- newErrorEvent(engErr) return } // 可重试的错误,继续循环 } if apiErr != nil { // 降级成功时 apiErr 仍然非 nil(因为 break 跳出了重试循环) // 如果已降级,继续循环用新模型重试 if fallbackTracker.WasFallback() && model == fallbackTracker.CurrentModel() { continue } code := ClassifyAPIError(apiErr.Error()) // ErrContextTooLong:记录校准数据,强制压缩后重试. // // 升华改进(ELEVATED): 早期方案遇到 context-too-long 直接报错给消费层-- // 消费层(CLI/SDK/Web)需要手动缩短会话才能继续. // 我们在引擎层自动: // 1. 记录失败点 → 校准器更新有效窗口(下次提前压缩) // 2. 强制压缩当前消息列表(跳过 ShouldCompact 阈值检查) // 3. 重试本轮(对消费层透明) // 保护:最多重试 maxContextTooLongRetries 次,防止极端场景无限循环. if code == ErrContextTooLong && ctxTooLongRetries < maxContextTooLongRetries { ctxTooLongRetries++ actual, max := ParseContextError(apiErr.Error()) e.calibrator.RecordFailure(model, actual, max) e.observeWithChain("context_too_long_calibrating", chain, map[string]any{ "model": model, "actual_tokens": actual, "max_tokens": max, "retry_attempt": ctxTooLongRetries, }) ch <- &WarningEvent{ Code: "context_too_long_retrying", Message: fmt.Sprintf( "上下文超出模型限制(约 %d tokens),正在压缩后重试(%d/%d)", actual, ctxTooLongRetries, maxContextTooLongRetries), } // If the API returned a *api.APIError with a concrete // TokenGap (e.g. "prompt_too_long: 137500 tokens > // 135000 maximum" parsed to 2500), hand it to the // compressor so it can take a precise stride instead // of shaving one group at a time. Zero falls back to // the incremental loop. See api.APIError.TokenGap and // agentctx.WithTokenGap godoc. // // 若 API 返回 *api.APIError 且带精确 TokenGap, 透传给 // compressor 做精确跳步; 0 回退逐组 loop. 见 // api.APIError.TokenGap 和 agentctx.WithTokenGap godoc. var tokenGap int var wrapped *api.APIError if errors.As(apiErr, &wrapped) { tokenGap = wrapped.TokenGap } messages = e.forceCompact(messages, compressor, ch, tokenGap) turnCount-- // 强制重试不计入有效轮次 continue } ch <- newErrorEvent(WrapError(apiErr, code, "API 调用失败")) return } // 4. 处理流式事件 // 升华改进(ELEVATED): 早期方案需要维护 blocks 状态机(Start/Delta/Stop 三段式)-- // wire.ParseAnthropicStream 已在内部完成 block 聚合, // 这里直接处理语义完整的 flyto.Event 事件. // 删除了 ~100 行的 block 状态机代码,逻辑清晰度大幅提升. // 替代方案:<保留 blocks map + blockState 类型> - 否决:职责重叠, // wire 层和 engine 层都维护 block 状态会导致维护双份逻辑. var stopReason string var turnInputTokens, turnOutputTokens int // stop_reason 不可靠兜底:不依赖 API 返回的 stop_reason === "tool_use", // 自己追踪流式到达的 tool_use block. hasToolUseBlocks := false completedToolUseCount := 0 // partial-stream 检测变量:等效于旧版 receivedMessageStart/hasAnyContentBlock. // StreamGuard 的 stream_empty/stream_truncated 错误码覆盖大多数 proxy 截断场景; // hasAnyContentBlock 作为额外保护(有 UsageEvent 但无内容的边界情况). hasAnyContentBlock := false needPartialStreamRetry := false // 收集 assistant 回复块(用于追加到消息历史 + 构建工具调用列表). // 精妙之处(CLEVER): 流式收集替代旧版 map[int]*blockState-- // flyto.TextEvent / ThinkingEvent / ToolUseEvent 到达时已是完整块, // 按到达顺序追加即可,无需额外排序(wire 层保证顺序). var assistantContent []query.Content var pendingToolCalls []tools.ToolCall for evt := range streamCh { switch ev := evt.(type) { case *flyto.TextDeltaEvent: hasAnyContentBlock = true if ev.Text != "" { ch <- &TextDeltaEvent{Text: ev.Text} } case *flyto.ThinkingDeltaEvent: if ev.Text != "" { ch <- &ThinkingDeltaEvent{Text: ev.Text} } case *flyto.TextEvent: if ev.Text != "" { ch <- &TextEvent{Text: ev.Text} assistantContent = append(assistantContent, query.Content{ Type: query.ContentText, Text: ev.Text, }) } case *flyto.ThinkingEvent: if ev.Text != "" { ch <- &ThinkingEvent{Text: ev.Text} assistantContent = append(assistantContent, query.Content{ Type: query.ContentThinking, Text: ev.Text, }) } case *flyto.ToolUseEvent: hasAnyContentBlock = true hasToolUseBlocks = true completedToolUseCount++ // 精妙之处(CLEVER): wire.ParseAnthropicStream 已做 json.Valid() 预检-- // 残缺 JSON 时产出空 Input map(而非 nil). // 引擎层检测 len(Input)==0 等价于旧版 !json.Valid(partialJSON), // 但无需重复 Valid 检查(wire 层已做,避免双重 JSON 解析). if len(ev.Input) == 0 { hint := e.getToolSchemaHint(ev.ToolName) ch <- &ToolResultEvent{ ID: ev.ID, ToolName: ev.ToolName, Output: fmt.Sprintf( "tool_use input JSON 格式无效,无法执行工具 %q。%s", ev.ToolName, hint), IsError: true, } } ch <- &ToolUseEvent{ ID: ev.ID, ToolName: ev.ToolName, Input: ev.Input, } // 收集到 assistant 块历史 + 工具调用列表 assistantContent = append(assistantContent, query.Content{ Type: query.ContentToolUse, ID: ev.ID, Name: ev.ToolName, Input: ev.Input, }) rawInput, _ := json.Marshal(ev.Input) if rawInput == nil { rawInput = json.RawMessage("{}") } pendingToolCalls = append(pendingToolCalls, tools.ToolCall{ ID: ev.ID, Name: ev.ToolName, Input: rawInput, }) case *flyto.UsageEvent: // UsageEvent 是流正常结束的标志(等效于旧版 receivedMessageStart + EventMessageStop) stopReason = ev.StopReason turnInputTokens += ev.InputTokens turnOutputTokens += ev.OutputTokens case *flyto.ErrorEvent: // StreamGuard 产出的可重试流错误(空响应/截断/空闲超时) // 精妙之处(CLEVER): stream_empty / stream_truncated 等价于旧版 partial-stream 检测-- // 早期方案需要手动对比 receivedMessageStart && !hasAnyContentBlock, // StreamGuard 自动检测并产出结构化错误码,引擎只需根据 Code 分支. if (ev.Code == "stream_empty" || ev.Code == "stream_truncated" || ev.Code == "stream_idle_timeout") && partialStreamRetries < maxPartialStreamRetries { needPartialStreamRetry = true continue } code := ClassifyAPIError(ev.Err.Error()) ch <- newErrorEvent(WrapError(ev.Err, code, "API 流式响应出错")) return } } // 空响应兜底:StreamGuard 的 stream_empty/stream_truncated 已覆盖大多数情况. // 这里处理 StreamGuard 未覆盖的边界:有 UsageEvent 但无任何内容 block. // 区别于 StreamGuard 的"完全没有事件"检测--这里是"有 UsageEvent 但无内容"的应用层截断. // // 精妙之处(CLEVER): continue 而不是 return--partial-stream 是瞬态代理故障, // 下一轮用相同的 messages 重试,对消费层完全透明(不推送错误事件). if needPartialStreamRetry || (!hasAnyContentBlock && stopReason == "") { if partialStreamRetries < maxPartialStreamRetries { partialStreamRetries++ ch <- &WarningEvent{ Code: "stream_truncated_partial", Message: fmt.Sprintf( "流式响应被代理截断(无内容),将重试 (%d/%d)", partialStreamRetries, maxPartialStreamRetries), } e.observeWithChain("stream_truncated_partial", chain, map[string]any{ "turn": turnCount, "attempt": partialStreamRetries, }) turnCount-- // 不计入有效轮次,撤销本轮 turnCount++ continue } // 超过重试上限:推送错误,终止 ch <- newErrorEvent(NewEngineError(ErrStreamTruncated, fmt.Sprintf("流式响应持续被代理截断,已重试 %d 次", maxPartialStreamRetries), nil)) return } // 更新 token 统计 totalInputTokens += turnInputTokens totalOutputTokens += turnOutputTokens // 埋点说明:API 调用完成是引擎最核心的可观测性事件. // 每次 API 调用的 token 用量,耗时,停止原因都是容量规划和成本归因的基础数据. // 没有这个埋点,无法回答"钱花在哪了"和"为什么这么慢". e.observeWithChain("api_call_complete", chain, map[string]any{ "model": model, "input_tokens": turnInputTokens, "output_tokens": turnOutputTokens, "stop_reason": stopReason, "turn": turnCount, }) if metricObs, ok := e.observer.(MetricObserver); ok { metricObs.Metric("api_input_tokens", float64(turnInputTokens), map[string]string{"model": model}) metricObs.Metric("api_output_tokens", float64(turnOutputTokens), map[string]string{"model": model}) } // 推送轮次结束事件(包含实际使用的 max_tokens) costUSD := e.estimateCost(model, turnInputTokens, turnOutputTokens) ch <- &TurnEndEvent{ TurnNumber: turnCount, InputTokens: turnInputTokens, OutputTokens: turnOutputTokens, CostUSD: costUSD, MaxTokens: currentMaxTokens, } // 推送会话信息事件(累计统计,消费层用于更新会话列表). // // 升华改进(ELEVATED): 每轮结束后发送累计 token 用量 + 花费,而非只在 DoneEvent 发送-- // 消费层(CLI 状态栏,Web UI,SDK 计费回调)可以实时更新,不必等会话结束. // DoneEvent 仍然携带最终统计,两者语义不同:TurnEnd=本轮摘要,SessionInfo=累计快照. // 替代方案:<只在 DoneEvent 发送一次> - 否决:多轮对话中间中断时消费层无统计数据. { sessionID, _ := ctx.Value(ctxKeySessionID{}).(string) totalCostSoFar := e.estimateCost(model, totalInputTokens, totalOutputTokens) ch <- &SessionInfoEvent{ SessionID: sessionID, Title: sessionTitle(prompt), TurnCount: turnCount, InputTokens: totalInputTokens, OutputTokens: totalOutputTokens, CostUSD: totalCostSoFar, } } // 7. 检查 maxBudget 限制 totalCost := e.estimateCost(model, totalInputTokens, totalOutputTokens) if e.cfg.MaxBudgetUSD > 0 && totalCost > e.cfg.MaxBudgetUSD { engErr := NewEngineError(ErrBudgetExceeded, fmt.Sprintf("已达到预算上限: $%.4f (上限 $%.4f)", totalCost, e.cfg.MaxBudgetUSD), nil) engErr.Suggestion = fmt.Sprintf("已达到本次运行的预算上限 ($%.2f)。增加 --max-budget 或不设限制", e.cfg.MaxBudgetUSD) ch <- newErrorEvent(engErr) return } // 预算接近上限警告(80% 时提醒) if e.cfg.MaxBudgetUSD > 0 && totalCost > e.cfg.MaxBudgetUSD*0.8 { ch <- &WarningEvent{ Code: WarnBudgetNearLimit, Message: fmt.Sprintf("花费已达预算的 %.0f%% ($%.4f / $%.4f)", totalCost/e.cfg.MaxBudgetUSD*100, totalCost, e.cfg.MaxBudgetUSD), } } // Token 预算警告检查. // // CLEVER: Uses API-returned precise usage for immediate warnings — // faster than waiting until next turn's estimation. // Severity-first dispatch (via PickWarningCode) was a bug-fix: the // original else-if chain tested weakest threshold first so a blocking // (100%) state was swallowed by the critical branch, which left // TokenWarningState.IsAtBlockingLimit as a dead field. // // 精妙之处 (CLEVER): 用 API 返回的精确 usage 做即时警告, 比等下一轮 // 估算更及时. 通过 PickWarningCode 的严重优先分发是 bug 修复 — // 原 else-if 链从最弱阈值判起, blocking (100%) 状态被 critical 分支 // 吞掉, 让 TokenWarningState.IsAtBlockingLimit 成 dead 字段. if e.tokenBudget != nil { currentContextTokens := totalInputTokens + totalOutputTokens warningState := e.tokenBudget.CalculateWarningState(currentContextTokens, model) if code := PickWarningCode(warningState); code != "" { var msg string switch code { case "context_window_blocked": msg = fmt.Sprintf("上下文窗口已占满 (使用率 %d%%), 下轮 pre-compact 将被强制触发", warningState.PercentUsed) case "context_window_critical": msg = fmt.Sprintf("上下文窗口即将耗尽,使用率 %d%%,剩余 %d%%", warningState.PercentUsed, warningState.PercentLeft) case "context_window_warning": msg = fmt.Sprintf("上下文窗口使用率 %d%%,剩余 %d%%", warningState.PercentUsed, warningState.PercentLeft) } ch <- &WarningEvent{Code: code, Message: msg} } } // 将 assistant 回复追加到消息历史(assistantContent 在流循环中已收集完毕) messages = append(messages, query.Message{Role: query.RoleAssistant, Content: assistantContent}) // post_sampling hook:API 响应后,工具执行前异步触发(fire-and-forget). // // 升华改进(ELEVATED): 早期实现 的 `void executePostSamplingHooks(...)` 使用 background context, // 引擎关闭时正在运行的 hook goroutine 会成为孤儿(无法被 Close() 终止). // 我们使用引擎的 rootCtx--Close() 调用 rootCancel() 即可终止所有在途 hook. // 替代方案: - 否决:goroutine 泄漏, // 在高并发场景(多 Session 并发)可能积累数百个孤儿 goroutine. // // 精妙之处(CLEVER): 在 assistant blocks 追加到 messages 之后触发-- // hook 脚本通过 MESSAGE_COUNT 可以感知到本轮响应已经被写入历史, // 如果之后要用 CallbackHandler 读消息历史,拿到的是完整的包含本轮的快照. // 在追加之前触发则看到的是上一轮结束的状态,语义不符合"post-sampling". if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookPostSampling) { // 提取 assistant 回复文本预览(前 500 字节) responsePreview := extractResponsePreviewFromContent(assistantContent) env := hooks.BuildPostSamplingEnv(model, turnCount, turnInputTokens, turnOutputTokens, stopReason, responsePreview, e.cfg.Cwd) // 异步执行,用引擎 rootCtx(引擎关闭时自动终止,无孤儿 goroutine) capturedHooks := effectiveHooks // 精妙之处(CLEVER): 闭包捕获副本,防止外层变量被覆盖 go func() { _, _ = capturedHooks.Execute(e.rootCtx, hooks.HookPostSampling, env) }() } // max_tokens 先低后高恢复逻辑: // 如果 stop_reason 是 max_tokens,说明模型输出被截断. // 第一次截断:升级到 escalatedMaxTokens,追加继续提示,重试. // 第二次截断(已升级过):正常结束,不无限重试. if isMaxOutputTokensError(stopReason) { if !maxTokensEscalated { // 首次截断:升级 max_tokens maxTokensEscalated = true currentMaxTokens = escalatedMaxTokens // 追加一条用户消息请求模型继续 messages = append(messages, query.NewTextMsg(query.RoleUser, "Your response was truncated due to length limits. Please continue from where you left off.")) ch <- &WarningEvent{ Code: "max_tokens_escalated", Message: fmt.Sprintf("输出被截断,已将 max_tokens 升级到 %d 并重试", escalatedMaxTokens), } continue } // 已升级过仍被截断,不再重试,正常结束 ch <- &WarningEvent{ Code: "max_tokens_still_truncated", Message: "输出在升级 max_tokens 后仍被截断,正常结束", } } // 5. stop_sequence 处理 // 升华改进(ELEVATED): stop_sequence 是 API 基础能力,跨场景通用-- // 编程场景:分类器用 stop_sequences=[""] 省 token // 仓储场景:模型生成到 "---订单结束---" 就停 // 金融场景:模型生成到 "CONFIRM" 触发后续流程 // stop_reason == "stop_sequence" 表示模型输出在自定义停止词处被截断. // 这是正常的预期行为(调用方有意为之),不是错误. // 直接结束当前轮,不重试,不执行工具. if stopReason == "stop_sequence" { ch <- &WarningEvent{ Code: "stop_sequence_hit", Message: "模型输出在 stop_sequence 处停止", } break } // 6. 如果有工具调用,执行工具 // stop_reason 不可靠兜底: // 不仅检查 stop_reason,还检查实际收集到的 tool_use blocks. // 即使 stop_reason 不是 "tool_use",只要检测到完成的 tool_use block, // 就执行工具并继续循环. needsToolExecution := (stopReason == "tool_use") || (hasToolUseBlocks && completedToolUseCount > 0) if needsToolExecution { toolCalls := pendingToolCalls if len(toolCalls) == 0 { // 有 tool_use 信号但没解析到工具调用,异常情况 // 降级为正常结束而不是报错 ch <- &WarningEvent{ Code: "tool_use_parse_failure", Message: "检测到工具调用信号但未能解析工具调用内容,视为对话结束", } } else { // 埋点:工具执行开始(批次级) toolNames := make([]string, len(toolCalls)) for i, tc := range toolCalls { toolNames[i] = tc.Name } if e.activity != nil { e.activity.Start(ActivityToolExec) } e.observeWithChain("tool_batch_start", chain, map[string]any{ "tools": strings.Join(toolNames, ","), "count": len(toolCalls), "turn": turnCount, }) // 收集工具结果(声明在 pre_tool_use hook 之前,hook 可能需要写入 blocked 结果) var toolResultContent []query.Content // 升华改进(ELEVATED): pre_tool_use hook 逐个工具触发,block 即短路. // Hook 可以检查工具名和输入参数,决定是否阻止执行. // exit 2 或 {"decision":"block"} → 跳过该工具,返回错误给模型. // 第一个被 block 的工具不影响后续工具(每个独立判断). // 替代方案:<原方案 hook 从未在 engine 中触发,输出被忽略> if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookPreToolUse) { var blockedCalls []tools.ToolCall var passedCalls []tools.ToolCall for _, tc := range toolCalls { env := hooks.BuildPreToolEnv(tc.Name, tc.Input, e.cfg.Cwd) hookResults, _ := effectiveHooks.Execute(ctx, hooks.HookPreToolUse, env) blocked, reason := hooks.ParseToolHookResponse(hookResults) if blocked { // 被 hook 阻止--记录埋点 e.observeWithChain("permission_decision", chain, map[string]any{ "tool": tc.Name, "decision": "hook_blocked", "reason": reason, }) ch <- &WarningEvent{ Code: "hook_blocked_tool", Message: fmt.Sprintf("pre_tool_use hook 阻止了工具 %s: %s", tc.Name, reason), } blockedCalls = append(blockedCalls, tc) } else { passedCalls = append(passedCalls, tc) } } // 被阻止的工具返回错误结果给模型 if len(blockedCalls) > 0 { for _, tc := range blockedCalls { toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: tc.ID, Text: "error: tool execution blocked by pre_tool_use hook", IsError: true, }) } } // 替换为通过 hook 的工具列表 toolCalls = passedCalls if len(toolCalls) == 0 { // 精妙之处(CLEVER): 所有工具都被 block 时,必须在 continue 前停止计数-- // Start(ActivityToolExec) 在第 1410 行已调用,引用计数已增加. // 如果不在这里 Stop,ActivityTracker 的引用计数永远不会归零, // 导致 Engine.Close() 的空闲等待超时,也会让心跳持续误报"有活动". // 正常执行路径的 Stop 在第 1525 行,但 continue 会跳过它. if e.activity != nil { e.activity.Stop(ActivityToolExec) } // 所有工具都被 block,跳过执行,直接把结果发回模型 messages = append(messages, query.Message{Role: query.RoleUser, Content: toolResultContent}) continue } } // Checkpoint:对声明了 RequiresCheckpoint=true 的工具, // 在执行前暂停并调用 CheckpointHandlerFn. // // 升华改进(ELEVATED): 早期实现 "Human checkpoint" 是提示词建议, // 模型可能忽略.这里是引擎层强制拦截--无论模型如何决策, // RequiresCheckpoint=true 的工具必须经过 handler 放行才能执行. // deny-safe 原则:handler 未注册或 panic 时视为拒绝. // 替代方案:<权限系统拦截> - 否决:权限系统面向"是否允许执行该类工具", // Checkpoint 面向"本次具体调用是否已被人类确认",语义不同. if len(toolCalls) > 0 { var checkpointBlocked []tools.ToolCall var checkpointPassed []tools.ToolCall for _, tc := range toolCalls { tool, toolFound := e.tools.Get(tc.Name) needsCheckpoint := false if toolFound { meta := tools.GetMetadata(tool) needsCheckpoint = meta.RequiresCheckpoint } if !needsCheckpoint { checkpointPassed = append(checkpointPassed, tc) continue } // 构造 CheckpointEvent 并推送给消费层(通知 TUI / HTTP SSE 客户端) // tc.Input 是 json.RawMessage,解析为 map 便于消费层展示 var parsedCheckpointInput map[string]any _ = json.Unmarshal(tc.Input, &parsedCheckpointInput) if parsedCheckpointInput == nil { parsedCheckpointInput = make(map[string]any) } evt := CheckpointEvent{ ToolCallID: tc.ID, ToolName: tc.Name, Input: parsedCheckpointInput, Message: fmt.Sprintf( "工具 %q 声明为不可逆操作,执行前需要确认", tc.Name), } ch <- &evt // 同步调用 handler(deny-safe:无 handler 或 panic → 拒绝) allowed := e.invokeCheckpointHandler(cfg.checkpointHandler, evt) e.observeWithChain("checkpoint_decision", chain, map[string]any{ "tool": tc.Name, "allowed": allowed, "turn": turnCount, }) if allowed { checkpointPassed = append(checkpointPassed, tc) } else { checkpointBlocked = append(checkpointBlocked, tc) } } // 被 checkpoint 拒绝的工具:返回拒绝消息给模型 for _, tc := range checkpointBlocked { toolResultContent = append(toolResultContent, query.Content{ Type: query.ContentToolResult, ToolUseID: tc.ID, Text: "error: tool execution denied at checkpoint (user did not confirm)", IsError: true, }) } toolCalls = checkpointPassed if len(toolCalls) == 0 { // 精妙之处(CLEVER): 全部被 checkpoint 拒绝时, // 必须 Stop(ActivityToolExec)(对应 Start 在批次开始处). // 若跳过 Stop,ActivityTracker 引用计数不归零,Engine.Close() 超时. if e.activity != nil { e.activity.Stop(ActivityToolExec) } messages = append(messages, query.Message{Role: query.RoleUser, Content: toolResultContent}) continue } } // checkpoint_suggested:高风险模式静态分析(INF-7 P1) // // 升华改进(ELEVATED): 早期方案依赖提示词引导模型主动提示用户, // 模型可能忽略,且换模型后行为不一致. // 引擎层静态分析不依赖任何特定模型-- // 只要检测到高风险命令,就主动推送建议事件. // 消费层(TUI / SDK webhook)可按需展示警告或弹出确认框. // // 精妙之处(CLEVER): 此处是"建议"而非"阻塞"-- // 与 CheckpointEvent(工具声明的强制拦截)不同, // CheckpointSuggestedEvent 不影响执行流,只是通知消费层. // 这样不会为未注册 checkpointHandler 的场景增加额外负担. // 替代方案:<直接升级为 Checkpoint 阻塞> - 否决:会破坏现有的静默执行场景 // (例如 SDK 嵌入,Coordinator 子 Agent,CI 无头模式). // L1223 修复: 建立 tool call ID → Input 映射供 OperationLog 填充. // 原本 OperationEntry.Input 从未被赋值 (day-1 dead code bug), // 导致事件 input_len 永远为 0, audit Extra tool_input_bytes 永远不写入. // 顺 checkpoint 循环一起建表, 零额外扫描开销. inputByID := make(map[string]json.RawMessage, len(toolCalls)) for _, tc := range toolCalls { e.emitCheckpointSuggested(ch, tc) inputByID[tc.ID] = tc.Input } // 执行工具并收集结果 resultCh := make(chan tools.ToolCallResult, len(toolCalls)) // 注入 EventEmitter 到工具派发 ctx, 让 Agent / Team / Dream / // SkillFork 工具 spawn 的子 agent 能把 SubAgentStart / 包装 // SubAgentEvent / SubAgentEnd 转发到父 Run channel. 非阻塞 // select + default 丢弃: 若父消费者慢到把 ch 填满, 丢弃转发 // 事件而非让子 agent 阻塞 — 子 agent 本地 channel 仍收到业务 // 事件, RunSync 结果不丢; 丢的只是父侧可见性. 这个取舍让 // 转发路径永不死锁. // // Inject EventEmitter into tool-dispatch ctx so sub-agents // spawned by Agent / Team / Dream / SkillFork tools can // forward SubAgentStart / wrapped SubAgentEvent / SubAgentEnd // up to the parent Run channel. Non-blocking select+default // drop: if the parent consumer is slow enough to fill ch, we // drop the forwarded event rather than blocking the // sub-agent — the sub-agent's own channel still receives // business events so RunSync results are intact; only parent // visibility is lost. This tradeoff keeps the forwarding // path deadlock-free. toolCtx := WithEventEmitter(ctx, func(evt Event) { select { case ch <- evt: default: } }) go func() { defer close(resultCh) defer func() { // 精妙之处(CLEVER): recover 防止工具执行器内部 panic 崩溃整个引擎进程. // defer 顺序(LIFO):recover 先于 close(resultCh) 注册,所以先执行-- // panic 被恢复后,外层 defer 关闭 resultCh,for-range 正常退出. // 不向 ch 发送 ErrorEvent 是因为 ch 可能已满或关闭,避免二次 panic. if r := recover(); r != nil { fmt.Fprintf(os.Stderr, "engine: tool executor panic: %v\n", r) } }() orchestrator.ExecuteBatch(toolCtx, toolCalls, resultCh) }() // 收集工具结果并构建 tool_result 消息 for result := range resultCh { // 对工具输出进行 secret 脱敏. // 精妙之处(CLEVER): 脱敏发生在两个地方-- // 1. 推送给消费层的 ToolResultEvent(防止 secret 进入日志/监控) // 2. 写入 messages 的 Content.Text(防止 secret 送回模型上下文) // 两处都用同一个 effectiveSecrets.Redact,保证一致性. // 若 effectiveSecrets 为空(无 secret 注册),Redact 是 O(1) 直接返回原字符串. redactedOutput := effectiveSecrets.Redact(result.Output) // 推送工具结果事件(已脱敏) // Truncated / StoredPath 透传 orchestrator 的大结果截断信号, // 让消费层 UI 能渲染 "点击查看完整" 入口. 路径透传不做脱敏 // (StoredPath 不是工具输出, 是 ResultProcessor 落盘位置, 自身 // 可信度由 ResultProcessor 实现决定). // // Truncated / StoredPath forward the orchestrator's large- // result signal so consumer UIs can render a "click to open // full" affordance. Path is not redacted (it is the // ResultProcessor's persistence location, not tool output -- // trust scope is the ResultProcessor implementation). ch <- &ToolResultEvent{ ID: result.ID, ToolName: result.Name, Output: redactedOutput, IsError: result.IsError, Truncated: result.Truncated, StoredPath: result.StoredPath, } // 埋点说明:工具执行是引擎最频繁的副作用操作. // 记录每次工具调用的名称和是否出错,用于分析工具成功率和使用分布. // 生产中发现某工具错误率飙升时,这个埋点是第一个告警来源. e.observeWithChain("tool_executed", chain, map[string]any{ "tool": result.Name, "tool_id": result.ID, "is_error": result.IsError, "turn": turnCount, }) // 记录到统一操作日志 // 升华改进(ELEVATED): 每次工具调用自动记录到操作日志, // 回滚时可以按消息 ID 找到所有操作并倒序撤销. // // 精妙之处(CLEVER): Output 使用 redactedOutput(而非 result.Output)-- // OperationLog 是持久化记录,若保存明文 secret, // 回滚操作读取日志时 secret 暴露给调用方(即使主流程已脱敏). // 凭据注入 Bash 子进程的途径是 env var,不影响工具本身的输出含义; // 脱敏后的 [SECRET:NAME] 占位符在调试时同样直观. // 替代方案: // - 否决:OperationLog 是通用结构体,不应依赖 SecretStore 实现细节; // 在调用点脱敏更符合"入口统一"原则,也让测试更容易验证. if e.operationLog != nil { status := "success" if result.IsError { status = "failed" } // L1223 修复: Input 与 Output 走同一 effectiveSecrets.Redact 脱敏入口, // 保持 "OperationLog 是通用结构体不跨包依赖 SecretStore" 原则 (见上方注释). // Redact 是 string→string, json.RawMessage 底层就是 []byte, 转换零拷贝安全. redactedInput := json.RawMessage(effectiveSecrets.Redact(string(inputByID[result.ID]))) e.operationLog.Record(&OperationEntry{ ID: result.ID, MessageID: fmt.Sprintf("turn-%d", turnCount), TurnNumber: turnCount, ToolName: result.Name, Input: redactedInput, // [SECURITY] L1223 脱敏后的 Input Output: redactedOutput, // [SECURITY] 脱敏后的输出(INF-7 P1) UndoInfo: result.UndoInfo, Status: status, Truncated: result.Truncated, StoredPath: result.StoredPath, }) } // 路径 B: 工具返图. 若 result.Data 是 *ImageResult, 构造 // array-form tool_result (text + image 混排), 让 vision // 模型真读到图. 纯文本结果走原有 string path. 非 image // Data (未来的 document / 其他结构化载荷) 当前也走 // string path, 等各自 wire 接入再扩展. SECURITY: 图片 // base64 不进 Redact (非文本路径, 不会含 secret 子串), // 但 text 仍走 redactedOutput 确保脱敏链不断. // // Path B: tool returns image. When result.Data is // *ImageResult, construct array-form tool_result (text + // image mixed) so vision models see the real pixels. // Plain text results fall through to the string path. // Non-image Data (future document / other structured // payloads) also takes the string path until each gets // its own wire support. trc := query.Content{ Type: query.ContentToolResult, ToolUseID: result.ID, Text: redactedOutput, IsError: result.IsError, } if img, ok := result.Data.(*builtin.ImageResult); ok && img != nil && img.Base64 != "" { trc.ResultBlocks = []query.Content{ {Type: query.ContentText, Text: redactedOutput}, { Type: query.ContentImage, Source: &query.ContentSource{ Type: "base64", MediaType: img.MediaType, Data: img.Base64, }, }, } } toolResultContent = append(toolResultContent, trc) } if e.activity != nil { e.activity.Stop(ActivityToolExec) } // 升华改进(ELEVATED): 并发工具结果按到达顺序追加,顺序不确定. // 排序确保 tool_result 消息顺序与 tool_use 请求顺序一致-- // Anthropic API 按 tool_use_id 匹配,但确定性顺序有助于调试和跨 provider 兼容. // 替代方案:<依赖 API 的 ID 匹配,不排序> - 若未来某 provider 依赖顺序则引入 bug. { idxMap := make(map[string]int, len(toolCalls)) for i, tc := range toolCalls { idxMap[tc.ID] = i } sort.Slice(toolResultContent, func(i, j int) bool { return idxMap[toolResultContent[i].ToolUseID] < idxMap[toolResultContent[j].ToolUseID] }) } // 将工具结果追加到消息历史 messages = append(messages, query.Message{Role: query.RoleUser, Content: toolResultContent}) // post_tool_use hook:工具执行后触发 // 精妙之处(CLEVER): 在工具结果已经写入消息历史后触发-- // hook 可以看到完整的工具输出,决定是否阻止后续迭代. // decision=block → 标记最后一个结果为错误(但不撤销已执行的操作). if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookPostToolUse) { for _, tc := range toolCalls { hookType := hooks.HookPostToolUse env := hooks.BuildPostToolEnv(tc.Name, tc.Input, "", false, e.cfg.Cwd) hookResults, _ := effectiveHooks.Execute(ctx, hookType, env) blocked, reason := hooks.ParseToolHookResponse(hookResults) if blocked { ch <- &WarningEvent{ Code: "hook_blocked_post_tool", Message: fmt.Sprintf("post_tool_use hook 阻止了继续: %s", reason), } // post hook block 不阻止当前工具(已执行完), // 但阻止下一轮 API 调用--直接 break 到会话结束. goto sessionEnd } } } // 回到循环顶部,继续调用 API continue } } // 7. stop hook:每轮结束前检查是否应该停止 // 精妙之处(CLEVER): stop hook 让外部系统控制 Agent 循环-- // 编程场景:CI 系统检测到超时,通过 hook 停止 Agent // 仓储场景:库存系统检测到异常,紧急停止所有自动化操作 // 金融场景:合规系统发现违规,立即中止交易流程 if effectiveHooks != nil && effectiveHooks.HasHooks(hooks.HookStop) { env := hooks.BuildSessionEnv("", e.cfg.Cwd, hooks.HookStop) hookResults, _ := effectiveHooks.Execute(ctx, hooks.HookStop, env) shouldStop, reason := hooks.ParseStopHookResponse(hookResults) if shouldStop { ch <- &WarningEvent{ Code: "hook_stop", Message: fmt.Sprintf("stop hook 请求停止 Agent: %s", reason), } break } } sessionEnd: // 8. 没有工具调用(或被 hook 中止),会话结束 // 查询循环正常结束时,检查 Dream 门槛 if e.dreamEngine != nil { e.dreamEngine.RecordSession() go e.dreamEngine.CheckAndRun(e.rootCtx) } // 升华改进(ELEVATED): 查询结束时异步执行记忆提取-- // 用 SubAgent fork 模式执行,共享 prompt cache,不阻塞主流程. // 传入当前 messages 快照,让 SubAgent 能分析完整对话历史(早期方案 Go 缺此步骤). // 替代方案:同步提取(阻塞用户等待)或独立 API 调用(不共享缓存). if e.extractor != nil && e.extractor.ShouldExtract(turnCount, e.extractState.lastExtractTurn) { e.extractState.lastExtractTurn = turnCount // 精妙之处(CLEVER): 快照 messages 切片(复制引用数组,不深拷贝内容)-- // messages 是本次 Run 的局部变量,goroutine 结束后会被 GC; // 快照保证 scheduleMemoryExtraction 拿到稳定的引用. snapMsgs := make([]query.Message, len(messages)) copy(snapMsgs, messages) go e.scheduleMemoryExtraction(e.rootCtx, snapMsgs, turnCount) } // 埋点说明:会话结束是引擎生命周期的终点事件. // 汇总整个会话的 token 用量,花费,轮次数,用于成本报表和容量规划. // 这是计算"每次对话平均成本"的唯一数据来源. sessionCost := e.estimateCost(model, totalInputTokens, totalOutputTokens) e.observeWithChain("session_complete", chain, map[string]any{ "turns": turnCount, "total_input_tokens": totalInputTokens, "total_output_tokens": totalOutputTokens, "total_cost_usd": sessionCost, "model": model, }) if metricObs, ok := e.observer.(MetricObserver); ok { metricObs.Metric("session_cost_usd", sessionCost, map[string]string{"model": model}) metricObs.Metric("session_turns", float64(turnCount), map[string]string{"model": model}) } // 推送 DoneEvent ch <- &DoneEvent{ TotalInputTokens: totalInputTokens, TotalOutputTokens: totalOutputTokens, TotalCostUSD: sessionCost, TurnCount: turnCount, } return } } // buildSystemPromptWithContext 使用 context.Builder 构建完整的系统提示词. // 替代原来的硬编码 buildSystemPrompt,支持 Bundle + SectionRegistry 分块缓存. // // 升华改进(ELEVATED): 早期方案返回 string,现在返回 []SystemPromptBlock-- // 每个块携带 CacheScope 语义("ephemeral" 或 ""), // 让 buildAPIRequest 可以按块设置 cache_control,实现静态/动态分离缓存. // 早期实现 的 SYSTEM_PROMPT_DYNAMIC_BOUNDARY 字符串标记被结构化的 Block 列表替代, // 更清晰且编译时可验证(字符串标记只能运行时检查). // 替代方案:<继续返回 string,一整块 ephemeral 缓存> // - 否决原因:丧失了静态内容全局缓存的机会(每次新会话都要重新 build 缓存). // // overrideBundleKey 为 nil 时使用 resolveBundleKey()(从 Config 推断), // 非 nil 时直接使用指定 key(支持 per-request WithBundleKey RunOption). // // 精妙之处(CLEVER): 用指针区分"未设置"和"设置为零值"-- // BundleKey{} 是有效的零值(等价于 DefaultBundleKey), // 如果用 BundleKey 值类型传参,调用方无法表达"我没有设置 override". // 用 *BundleKey 后:nil = 未设置(用 Config 推断),非 nil = 明确 override(含零值). func (e *Engine) buildSystemPromptWithContext(ctx context.Context, model string, overrideBundleKey *agentctx.BundleKey) []agentctx.SystemPromptBlock { // 修复(2026-04-14): 原调用 agentctx.AutoCompactThreshold(model)+compactReserveTokens // 抵消后等价于 contextWindowProvider(model), 但该 provider 从未被注入 // (SetContextWindowProvider 全代码零调用), 所以永远返回 DefaultContextWindow=200000, // 使 model 参数成为被忽略的幌子 - 换非 200K 窗口的模型时 Builder 会用错窗口大小. // 现直接走 ModelRegistry 这个唯一真相源 (与 engine.go:2556 保持一致). // Builder.maxTokens 语义 (context.go:44) 是 "模型上下文窗口大小" = 物理窗口, 非阈值. builder := agentctx.NewBuilder(e.cfg.Cwd, e.cfg.ModelRegistry().ContextWindow(model)) // 注入 BundleRegistry 和 SectionRegistry(引擎级实例复用) builder.SetBundleRegistry(e.bundleRegistry) builder.SetSectionRegistry(e.sectionRegistry) // 注入模型 ID(用于 env_info section 中的模型描述) builder.SetModelID(model) // 设置 Bundle key:per-request override 优先,没有则从 Config 推断 var bundleKey agentctx.BundleKey if overrideBundleKey != nil { bundleKey = *overrideBundleKey } else { bundleKey = e.resolveBundleKey() } builder.SetBundleKey(bundleKey) // 启用分块缓存语义(只有在 EnableCaching=true 时才有实质效果) builder.SetEnableCaching(e.cfg.EnableCaching) // 设置自定义系统提示(覆盖 Bundle,不走 Section 系统) if e.cfg.SystemPrompt != "" { builder.SetSystemPrompt(e.cfg.SystemPrompt) } // 模块 18.1:协调器模式系统提示注入 + 用户追加提示. // // 升华改进(ELEVATED): 协调器指导段落作为"特权追加内容"注入-- // 不走 Bundle Section 体系(避免改动稳定的 defaultClaudeProgrammingBundle), // 而是在 AppendPrompt 层前置注入(用户的 AppendSystemPrompt 在后,优先级更低). // // 精妙之处(CLEVER): 把协调器指导放在用户追加内容之前-- // 系统指导是元约束,应在用户追加任务描述之前出现,避免被淹没. // isOrchestrator=false 时 buildAppendPromptWithOrchestrator 直接返回 userAppend, // 与原来行为完全一致,零额外开销. if appendText := buildAppendPromptWithOrchestrator(e.cfg.IsOrchestrator, e.cfg.AppendSystemPrompt); appendText != "" { builder.SetAppendPrompt(appendText) } // 注入工具描述 allTools := e.tools.All() if len(allTools) > 0 { descs := make([]agentctx.ToolDescription, 0, len(allTools)) for _, t := range allTools { descs = append(descs, agentctx.ToolDescription{ Name: t.Name(), Description: t.Description(ctx), }) } builder.SetToolDescriptions(descs) } // Inject MCP server connection snapshot. The mcp_servers volatile // section renders this each turn (CacheBreak=true), so the model sees // reconnects / hot-swaps without a prompt-cache hit hiding them. The // volatile section also drives the `section_cache_break` observer // event (pkg/context.SectionRegistry.Compute) carrying the section // NoCacheReason for operator diagnostics. // // 注入 MCP server 连接快照. mcp_servers volatile section 每轮渲染 // (CacheBreak=true), 让模型看到重连 / 热插拔而不被 prompt cache 命中 // 掩盖. volatile section 同时驱动 `section_cache_break` observer 事件 // (pkg/context.SectionRegistry.Compute) 携带 section NoCacheReason // 供运维诊断. if e.mcpMgr != nil { names := e.mcpMgr.ClientNames() if len(names) > 0 { statuses := make([]agentctx.MCPServerStatus, 0, len(names)) for _, name := range names { connected := false if client, ok := e.mcpMgr.GetClient(name); ok { connected = client.IsAlive() } statuses = append(statuses, agentctx.MCPServerStatus{ Name: name, Connected: connected, }) } builder.SetMCPServerStatuses(statuses) } } return builder.BuildSystemPromptBlocks(ctx) } // resolveBundleKey 从 Config 推断 BundleKey. // 精妙之处(CLEVER): ModelFamily 为空时用前缀匹配推断-- // "claude-sonnet-4-6" → "claude","gpt-4o" → "gpt",未知前缀 → "claude"(默认). // 这样 Config.ModelFamily 是可选字段,方便快速上手(不需要显式设置). func (e *Engine) resolveBundleKey() agentctx.BundleKey { scenario := e.cfg.Scenario if scenario == "" { scenario = "programming" } modelFamily := e.cfg.ModelFamily if modelFamily == "" { modelFamily = inferModelFamily(e.cfg.Model) } return agentctx.BundleKey{ ModelFamily: modelFamily, Scenario: scenario, } } // inferModelFamily 从模型 ID 推断模型族. // 使用前缀匹配,未知前缀回退到 "claude"(内置 Bundle 的默认族). func inferModelFamily(modelID string) string { switch { case strings.HasPrefix(modelID, "claude"): return "claude" case strings.HasPrefix(modelID, "gpt"): return "gpt" case strings.HasPrefix(modelID, "gemini"): return "gemini" default: return "claude" // 默认使用 claude Bundle(内置) } } // extractionSnap 捕获一次记忆提取的触发上下文. // 用于后置补跑(trailing run):当前提取进行中时,最新的触发上下文存入此结构, // 等当前提取结束后立即用最新上下文再跑一次. type extractionSnap struct { ctx context.Context messages []query.Message // 对话消息快照(SubAgent 分析用) turnCount int // 对话轮数(供 ShouldExtract 参考,当前未用于决策) } // scheduleMemoryExtraction 调度一次记忆提取. // 实现单飞(single-flight)+ 最新优先(stash)策略: // - 无提取进行中:立即运行 // - 有提取进行中:把本次上下文存入 pendingSnap(覆盖旧的);当前提取结束后补跑一次 // // 精妙之处(CLEVER): stash 只保留最后一次上下文(而非队列)-- // 中间的上下文被覆盖是合理的:最新消息包含所有值得提取的内容, // 中间状态的提取价值很低(和早期实现 pendingContext 覆盖策略一致). func (e *Engine) scheduleMemoryExtraction(ctx context.Context, messages []query.Message, turnCount int) { e.extractState.mu.Lock() if e.extractState.inProgress { // 后置补跑:覆盖旧 pending(只保留最新) e.extractState.pending = &extractionSnap{ctx: ctx, messages: messages, turnCount: turnCount} e.extractState.mu.Unlock() e.observer.Event("memory_extraction_coalesced", map[string]any{ "turn": turnCount, }) return } e.extractState.inProgress = true e.extractState.mu.Unlock() e.runMemoryExtraction(ctx, messages, turnCount) } // runMemoryExtraction 执行一次记忆提取,结束后自动检查并运行 pending 补跑. // 使用 SubAgent fork 模式,共享 prompt cache,不阻塞主流程(由 scheduleMemoryExtraction 在 goroutine 中调用). // // 升华改进(ELEVATED): 相比早期方案 Go 的三项改进: // 1. 传入 messages--SubAgent 有历史可分析(早期方案缺此步骤,SubAgent 看不到任何对话); // 2. hasMemoryWritesSince--主 agent 自己写过记忆则跳过 SubAgent(避免重复); // 3. 单飞+后置补跑--并发触发时不堆积 goroutine(早期方案 go e.runMemoryExtraction() 可能并发). // // 替代方案:直接在 Engine 内部调用 API 执行提取(不共享缓存,职责混乱). func (e *Engine) runMemoryExtraction(ctx context.Context, messages []query.Message, turnCount int) { // 无论成功或失败,结束后都要释放 inProgress 并尝试后置补跑 defer func() { e.extractState.mu.Lock() e.extractState.inProgress = false pending := e.extractState.pending e.extractState.pending = nil e.extractState.mu.Unlock() if pending != nil { // 精妙之处(CLEVER): 后置补跑用最新 pending 上下文(最新消息)递归调用自身-- // 不用 go + scheduleMemoryExtraction,直接调用避免额外 goroutine, // 同时 inProgress 已被置为 false,scheduleMemoryExtraction 会立即进入运行路径. e.scheduleMemoryExtraction(pending.ctx, pending.messages, pending.turnCount) } }() e.observer.Event("memory_extraction_start", map[string]any{ "turn": turnCount, "message_count": len(messages), }) if e.activity != nil { e.activity.Start(ActivityMemoryExtraction) } defer func() { if e.activity != nil { e.activity.Stop(ActivityMemoryExtraction) } e.observer.Event("memory_extraction_complete", nil) }() // 精妙之处(CLEVER): 如果主 agent 在 sinceIdx 之后已经自己写过记忆文件, // 跳过 SubAgent 提取--主 agent 的写操作和 SubAgent 的提取是互斥的, // 避免重复写同一批内容(和早期实现 hasMemoryWritesSince 逻辑一致). sinceIdx := e.extractState.lastExtractMsgIdx if e.hasMemoryWritesSince(messages, sinceIdx) { e.extractState.lastExtractMsgIdx = len(messages) e.observer.Event("memory_extraction_skipped_direct_write", map[string]any{ "turn": turnCount, }) return } // 计算新消息数(SubAgent 用于精准定位分析范围) newMessageCount := len(messages) - sinceIdx if newMessageCount < 0 { newMessageCount = len(messages) // 游标异常时退回全量 } // 获取现有记忆 existingMemories, err := e.mem.List(ctx) if err != nil { return // 静默失败,记忆提取是后台任务 } // 构建提示(含 newMessageCount,让 SubAgent 精准定位最近 N 条消息) prompt := e.extractor.BuildPrompt(existingMemories, newMessageCount) // 构建允许工具的 map allowedToolNames := e.extractor.AllowedTools() allowedMap := make(map[string]bool, len(allowedToolNames)) for _, name := range allowedToolNames { allowedMap[name] = true } // Fork 子 agent 执行,传入历史消息和记忆目录限制. // SilentEvents=true 兜底: 记忆提取是"后台静默任务", 不污染用户可见事件 // 流 (父 Run channel + SSE + TUI 都不该看到). 当前 ctx 走 rootCtx 本就 // 没 EventEmitter, 但显式 flag 防御未来不小心传入带 emitter 的 ctx. // // SilentEvents=true defensive opt-out: memory extraction is a silent // background task that must not pollute user-visible event streams // (parent Run channel / SSE / TUI). The current ctx is rootCtx which // has no EventEmitter anyway; the explicit flag guards against future // accidental propagation of an emitter-carrying ctx. sa := SpawnSubAgent(e, &SubAgentConfig{ Description: "memory-extraction", AllowedTools: allowedMap, MaxTurns: e.extractor.MaxTurns(), HistoryMessages: messages, // 传入完整对话历史,SubAgent 才有内容可分析 MemoryDirRestrict: e.mem.Dir(), // Edit/Write 限制在记忆目录内 SilentEvents: true, }) // 消费事件(不转发给用户;记忆提取是后台静默任务) events := sa.Run(ctx, prompt) for range events { } // 提取完成后推进游标 e.extractState.lastExtractMsgIdx = len(messages) } // hasMemoryWritesSince 检查 messages[sinceIdx:] 中是否有 assistant 消息 // 向记忆目录写文件(通过 Edit/Write tool_use block). // // 精妙之处(CLEVER): 扫描 assistant 消息的 tool_use blocks-- // 主 agent 写记忆时不经过 canUseTool,只要 file_path 在 mem.Dir() 下就算. // 返回 true 时 runMemoryExtraction 会跳过 SubAgent,避免重复写同一批内容. // 和早期实现 hasMemoryWritesSince 策略一致(messages 游标 + 路径检查). func (e *Engine) hasMemoryWritesSince(messages []query.Message, sinceIdx int) bool { memDir := e.mem.Dir() if memDir == "" { return false } if sinceIdx < 0 { sinceIdx = 0 } for i := sinceIdx; i < len(messages); i++ { msg := messages[i] if string(msg.Role) != "assistant" { continue } for _, c := range msg.Content { if c.Type != query.ContentToolUse { continue } if c.Name != "Edit" && c.Name != "Write" { continue } if fp, ok := c.Input["file_path"].(string); ok { if isUnderDir(fp, memDir) { return true } } } } return false } // buildSystemPrompt 是保留的简化版系统提示构建(向后兼容). // 新代码应使用 buildSystemPromptWithContext. func (e *Engine) buildSystemPrompt() string { blocks := e.buildSystemPromptWithContext(e.rootCtx, e.cfg.Model, nil) return agentctx.BlocksToString(blocks) } // orchestratorGuidance 是协调器模式的专用系统提示段落(模块 18.1). // // 精妙之处(CLEVER): 使用 const 而非 var--内容在程序生命周期内不变, // const 由编译器内联,零运行时分配. // 文本使用中文,与其他系统提示段落风格一致(模型已对中英双语提示深度优化). const orchestratorGuidance = "[协调器模式]\n\n" + "你正在作为协调器(Orchestrator)运行,负责管理多个子代理(SubAgent)完成复杂任务。\n\n" + "协调原则:\n" + "- 将大任务分解为可并行的子任务\n" + "- 为每个子代理提供明确、独立、可验证的目标\n" + "- 监控子代理进度(通过 inbox 消息)\n" + "- 合并子代理结果时检查一致性\n" + "- 子代理失败时启动替代方案" // buildAppendPromptWithOrchestrator 根据 isOrchestrator 标志组合追加提示词(模块 18.1). // // 历史包袱(LEGACY): 利用现有的 AppendPrompt 通道注入协调器指导-- // // 不需要扩展 Builder API(AddExtraSection),用最小改动实现最大效果. // 如果将来需要更多"系统级追加段落",再考虑升级为 AdditionalSections []Section 机制. // // 返回组合后的追加文本: // - isOrchestrator=false:直接返回 userAppend(与之前行为完全一致,零开销) // - isOrchestrator=true:orchestratorGuidance + userAppend(先系统指导,后用户追加) func buildAppendPromptWithOrchestrator(isOrchestrator bool, userAppend string) string { if !isOrchestrator { return userAppend } if userAppend == "" { return orchestratorGuidance } return orchestratorGuidance + "\n\n" + userAppend } // buildAPIRequest 构建完整的 API 请求. // 统一处理系统提示缓存,thinking 配置,beta headers,结构化输出等. // // 升华改进(ELEVATED): systemPrompt 参数从 string 升级为 []SystemPromptBlock-- // 每个 block 携带 CacheScope,允许静态块用 ephemeral 缓存,volatile 块不缓存. // 相比早期方案一整块 ephemeral,这里可以精确控制哪些内容需要缓存. // 替代方案:<继续传 string,整块 ephemeral> // - 否决原因:动态内容(FLYTO.md,env_info)每次变化都会使整个缓存失效, // 浪费原本不变的静态内容的缓存命中机会. func (e *Engine) buildAPIRequest(model string, maxTokens int, promptBlocks []agentctx.SystemPromptBlock, messages []query.Message, toolDefs []api.ToolDef) *api.MessageRequest { // 将 query.Message 转换为 api.RequestMessage(Anthropic legacy 路径专属) apiMessages := make([]api.RequestMessage, 0, len(messages)) for _, msg := range messages { apiMessages = append(apiMessages, queryMessageToAPI(msg)) } req := &api.MessageRequest{ Model: model, MaxTokens: maxTokens, Messages: apiMessages, Stream: true, } // FastMode 影响 max_tokens 默认值 if e.cfg.FastMode && maxTokens <= 16384 { req.MaxTokens = 8192 } // 设置系统提示(支持多块分段缓存) if len(promptBlocks) > 0 { if e.cfg.EnableCaching { // 将每个 Block 转换为 api.SystemContentBlock,按 CacheScope 附加 cache_control apiBlocks := make([]api.SystemContentBlock, 0, len(promptBlocks)) for _, b := range promptBlocks { if b.Text == "" { continue } block := api.SystemContentBlock{Type: "text", Text: b.Text} if b.CacheScope != "" { // 精妙之处(CLEVER): 所有非空 CacheScope 都映射到 "ephemeral"-- // "global" scope 是 Anthropic beta 功能,暂时用 "ephemeral" 近似. // 静态内容走 "ephemeral" 已经比一整块强: // 动态内容变化时只有动态块的缓存失效,静态块继续命中. block.CacheControl = &api.CacheControl{Type: "ephemeral"} } apiBlocks = append(apiBlocks, block) } if len(apiBlocks) > 0 { req.SetSystemBlocks(apiBlocks) } } else { // 不缓存:合并所有块为字符串 req.SetSystemString(agentctx.BlocksToString(promptBlocks)) } } // 设置工具 if len(toolDefs) > 0 { req.Tools = toolDefs } // 设置 extended thinking if e.cfg.Thinking != nil && e.cfg.Thinking.Enabled { thinkingCfg := &api.ThinkingConfig{ Type: "enabled", } if e.cfg.Thinking.BudgetTokens > 0 { thinkingCfg.BudgetTokens = e.cfg.Thinking.BudgetTokens } req.Thinking = thinkingCfg } // 设置结构化输出 if len(e.cfg.JSONSchema) > 0 { req.ResponseFormat = &api.ResponseFormat{ Type: "json_schema", JSONSchema: e.cfg.JSONSchema, } } // 构建 Beta features beta := &api.BetaFeatures{} hasBeta := false if e.cfg.EnableCaching { beta.PromptCaching = true hasBeta = true } if e.cfg.Thinking != nil && e.cfg.Thinking.Enabled { beta.ExtendedThinking = true hasBeta = true } if e.cfg.FastMode { beta.FastMode = true hasBeta = true } if e.cfg.Effort != "" { beta.Effort = e.cfg.Effort hasBeta = true } if len(e.cfg.JSONSchema) > 0 { beta.StructuredOutput = true hasBeta = true } if hasBeta { req.Beta = beta } return req } // maybeCompact 检查消息列表是否需要自动压缩,如果需要则执行. // 压缩发生时推送 CompactEvent 到事件 channel. // // 对应原项目中 autoCompact 逻辑. // // 策略: // 1. 先尝试 MicroCompact(轻量修剪,不调用 API) // 2. 如果仍然超过阈值,执行完整 Compact(调用 Haiku 生成摘要) func (e *Engine) maybeCompact( messages []query.Message, compressor *agentctx.Compressor, ch chan<- Event, ) []query.Message { // 将 query.Message 转换为 CompactMessage 用于估算 compactMsgs := queryMessagesToCompactMessages(messages) // 检查是否需要压缩 if !compressor.ShouldCompact(compactMsgs) { return messages } // 第一步:尝试微压缩 microCompacted := compressor.DoMicroCompact(compactMsgs) if !compressor.ShouldCompact(microCompacted) { // 微压缩后不再超过阈值,转换回去 return compactMessagesToQueryMessages(microCompacted) } // 第二步:执行完整压缩 result, err := compressor.DoCompact(microCompacted) if err != nil { // 埋点说明:压缩失败是最危险的降级场景之一. // 压缩失败意味着上下文会继续膨胀,最终触发 API 的 context_length 错误. // 记录失败原因用于分析是 API 问题(429/529)还是内容问题(无法摘要). e.observer.Error(err, map[string]any{ "phase": "compact", "strategy": "full_compact_failed_fallback_to_micro", }) // 压缩失败,使用微压缩结果(降级策略) return compactMessagesToQueryMessages(microCompacted) } // 埋点说明:压缩成功是关键的上下文管理事件. // 记录压缩前后的 token 数变化,用于评估压缩策略的效果. // 如果压缩比一直很低(压缩后仍然很大),说明策略需要调整. e.observer.Event("compact_triggered", map[string]any{ "tokens_before": result.TokensBefore, "tokens_after": result.TokensAfter, "summary_len": len(result.Summary), }) if metricObs, ok := e.observer.(MetricObserver); ok { metricObs.Metric("compact_tokens_before", float64(result.TokensBefore), nil) metricObs.Metric("compact_tokens_after", float64(result.TokensAfter), nil) } // 推送压缩事件 ch <- &CompactEvent{ Summary: result.Summary, TokensBefore: result.TokensBefore, TokensAfter: result.TokensAfter, } // 压缩后重置 Section 缓存--新会话应重新读取 FLYTO.md,环境信息等动态内容. // 精妙之处(CLEVER): Reset 在这里而非 BuildPromptBlocks 内部-- // 让调用时机明确(compact = 新会话边界),而不是隐藏在 build 路径里. e.sectionRegistry.Reset() return compactMessagesToQueryMessages(result.Messages) } // forceCompact 强制执行完整压缩,跳过 ShouldCompact 阈值检查. // // 在 context_too_long 错误后调用--此时消息已超出模型上限, // 无论当前 token 数是多少都必须压缩,不能等 ShouldCompact 说"需要". // // 精妙之处(CLEVER): maybeCompact 依赖"接近阈值"触发压缩,forceCompact 依赖"已经超限"触发-- // 两条路径互不干扰:maybeCompact 是预防性的,forceCompact 是响应性的. // 若 DoCompact 失败,降级到微压缩结果(宁可不完美也不崩溃). func (e *Engine) forceCompact( messages []query.Message, compressor *agentctx.Compressor, ch chan<- Event, tokenGap int, ) []query.Message { compactMsgs := queryMessagesToCompactMessages(messages) // 先微压缩(轻量,无 API 调用) microCompacted := compressor.DoMicroCompact(compactMsgs) // 再完整压缩(调用 Fast 模型生成摘要). 当 API 错误给了 TokenGap // 精确值时走 DoCompactWithHint(WithTokenGap(...)), 让 compressor 一步 // 跳到正确的砍组步长, 而不是一组一组试. 见 api.APIError.TokenGap 和 // agentctx.WithTokenGap godoc. // // 再完整压缩 -- 有 TokenGap 精确值时走 DoCompactWithHint(WithTokenGap) // 让压缩器一步跳到位. var result *agentctx.CompactResult var err error if tokenGap > 0 { result, err = compressor.DoCompactWithHint(microCompacted, agentctx.WithTokenGap(tokenGap)) } else { result, err = compressor.DoCompact(microCompacted) } if err != nil { e.observer.Error(err, map[string]any{ "phase": "compact", "strategy": "force_compact_failed_fallback_to_micro", }) return compactMessagesToQueryMessages(microCompacted) } e.observer.Event("force_compact_triggered", map[string]any{ "tokens_before": result.TokensBefore, "tokens_after": result.TokensAfter, }) ch <- &CompactEvent{ Summary: result.Summary, TokensBefore: result.TokensBefore, TokensAfter: result.TokensAfter, } e.sectionRegistry.Reset() return compactMessagesToQueryMessages(result.Messages) } // queryMessagesToCompactMessages 将 query.Message 列表转换为 CompactMessage. func queryMessagesToCompactMessages(msgs []query.Message) []agentctx.CompactMessage { result := make([]agentctx.CompactMessage, len(msgs)) for i, msg := range msgs { apiMsg := queryMessageToAPI(msg) result[i] = agentctx.CompactMessage{ Role: apiMsg.Role, Content: apiMsg.Content, } } return result } // compactMessagesToQueryMessages 将 CompactMessage 列表转换回 query.Message. func compactMessagesToQueryMessages(msgs []agentctx.CompactMessage) []query.Message { result := make([]query.Message, 0, len(msgs)) for _, msg := range msgs { qm := query.Message{Role: query.Role(msg.Role)} var text string if err := json.Unmarshal(msg.Content, &text); err == nil { qm.Content = []query.Content{{Type: query.ContentText, Text: text}} } else { var blocks []query.Content if err := json.Unmarshal(msg.Content, &blocks); err == nil { qm.Content = blocks } else { qm.Content = []query.Content{{Type: query.ContentText, Text: string(msg.Content)}} } } result = append(result, qm) } return result } // buildToolDefs 构建工具定义列表(提供给 API). func (e *Engine) buildToolDefs(ctx context.Context) []api.ToolDef { allTools := e.tools.All() if len(allTools) == 0 { return nil } defs := make([]api.ToolDef, 0, len(allTools)) for _, t := range allTools { desc := t.Description(ctx) // Agent Tool Safety Protocol: 对 MinConfidence>0 工具, 在 Description // 末尾追加自评置信度契约, 让 LLM 在工具定义层就知道要附 // _flyto_confidence. Orchestrator 侧 gate 真兜底; 此处只负责告知. // // Agent Tool Safety Protocol: for tools with MinConfidence>0, append // the self-assessment contract to Description so the LLM sees the // _flyto_confidence requirement in the tool definition itself. The // orchestrator enforces the gate; this is only the advisory. if cap := tools.GetCapability(t); cap.MinConfidence > 0 { desc = desc + fmt.Sprintf( "\n\n[Safety] This tool requires you to self-report confidence via the reserved input field %q (integer 0-100). Minimum required: %d. Calls missing this field or below the threshold are rejected -- reconsider your input before each call.", tools.ConfidenceInputField, cap.MinConfidence, ) } defs = append(defs, api.ToolDef{ Name: t.Name(), Description: desc, InputSchema: t.InputSchema(), }) } // 升华改进(ELEVATED): 工具 Schema 变化追踪 + Prompt Cache 感知排序-- // 早期实现 每轮把 tools 数组原样传给 API,任何工具描述变化都会导致整个 tools // 缓存前缀失效(cache miss),以 50k token 为例每轮多花 $0.135(10× 价差). // 我们做两件事: // 1. 逐工具计算 SHA-256,只在真正变化时 emit observer 事件(精确定位); // 2. 稳定工具排前,最后一个稳定工具上打 cache_control,不稳定工具排后-- // 即使某个不稳定工具每轮变,稳定工具的缓存前缀也不受影响. // 替代方案:<对整个 tools 数组做哈希,变化就全部失效> - 否决:无法精确定位, // 且一个频繁变化的工具会拖累所有稳定工具的缓存命中率. // 精妙之处(CLEVER): nil guard--测试或 SubAgent 路径可能直接构造 &Engine{} 而非经过 New(), // toolSchemaTracker 未初始化时跳过追踪,直接返回原始顺序的 defs. if e.toolSchemaTracker == nil { return defs } entries := make([]enginecache.ToolEntry, 0, len(defs)) for _, d := range defs { // 精妙之处(CLEVER): canonical bytes = json.Marshal({name,description,input_schema})-- // 按固定字段顺序序列化,确保同一工具不同调用路径产生相同 bytes,哈希稳定. canonical, err := json.Marshal(map[string]any{ "name": d.Name, "description": d.Description, "input_schema": d.InputSchema, }) if err != nil { // 精妙之处(CLEVER): marshal 失败时用空 bytes-- // 空 bytes 的哈希固定,不会因为 err 导致每轮 hash 不同触发误报. canonical = []byte{} } entries = append(entries, enginecache.ToolEntry{ Name: d.Name, Content: canonical, }) } changes := e.toolSchemaTracker.Track(entries) if !changes.Stable { // 工具 Schema 发生变化,通知 observer(监控/日志/测试可监听此事件) e.observer.Event("tool_schema_changed", map[string]any{ "added": changes.Added, "removed": changes.Removed, "changed": changes.Changed, }) } // StableFirstWithBoundary:稳定工具排前,返回排序后名称列表 + 稳定工具数量. // 只有追踪了足够轮次(window 轮)才有稳定工具,首轮全部视为不稳定. names := make([]string, len(defs)) for i, d := range defs { names[i] = d.Name } sorted, stableCount := e.toolSchemaTracker.StableFirstWithBoundary(names) // 按排序结果重建 defs(稳定工具在前,不稳定工具在后) defsByName := make(map[string]api.ToolDef, len(defs)) for _, d := range defs { defsByName[d.Name] = d } reordered := make([]api.ToolDef, 0, len(sorted)) for _, name := range sorted { if d, ok := defsByName[name]; ok { reordered = append(reordered, d) } } // 精妙之处(CLEVER): cache_control 打在最后一个稳定工具上-- // Anthropic Prompt Cache 要求 cache_control 标记缓存边界的"末尾". // 稳定工具 [0..stableCount-1] 构成一个固定前缀,打在 [stableCount-1] 上 // 意味着"从头到这里都缓存".不稳定工具在 [stableCount..] 之后,不受保护, // 但它们的变化不会破坏前面稳定工具的缓存前缀. // 如果 stableCount==0(首轮或所有工具都不稳定),不打 cache_control, // 避免把不稳定内容误标为缓存边界. if e.cfg.EnableCaching && stableCount > 0 && stableCount <= len(reordered) { cc := &api.CacheControl{Type: "ephemeral"} d := reordered[stableCount-1] d.CacheControl = cc reordered[stableCount-1] = d } // 写入缓存快照,供 SpawnSubAgent 读取(精妙之处见 cachedToolDefs 字段注释). e.sessionState.mu.Lock() e.cachedToolDefs = reordered e.sessionState.mu.Unlock() return reordered } // toolDefsSnapshot 返回最近一次 buildToolDefs 的缓存快照,供 SpawnSubAgent 使用. // // 精妙之处(CLEVER): 只读,不调用 Track()-- // SpawnSubAgent 在工具执行阶段调用,此时 buildToolDefs 已在本轮查询开始时运行过, // cachedToolDefs 必然非 nil(正常生产路径). // // 边界情况(测试/直接构造 &Engine{}):cachedToolDefs 为 nil 时, // 退化为无追踪的原始排列--不优化 cache_control,但正确性不受影响. // 替代方案:<强制要求调用方先调一次 buildToolDefs> // - 否决:增加了初始化顺序约束,SDK 嵌入场景容易遗漏. func (e *Engine) toolDefsSnapshot() []api.ToolDef { e.sessionState.mu.RLock() cached := e.cachedToolDefs e.sessionState.mu.RUnlock() if cached != nil { return cached } // Fallback:快照为空,降级为原始列表(无 Track,无排序,无 cache_control). // 发生场景:测试中直接构造 &Engine{} 而非通过 New(),且尚未执行任何查询. if e.tools == nil { return nil } allTools := e.tools.All() defs := make([]api.ToolDef, 0, len(allTools)) for _, t := range allTools { defs = append(defs, api.ToolDef{ Name: t.Name(), Description: t.Description(context.Background()), InputSchema: t.InputSchema(), }) } return defs } // queryMessageToAPI 将单条 query.Message 转换为 api.RequestMessage(Anthropic legacy 路径专属). // // 升华改进(ELEVATED): 这是引擎历史格式迁移到 query.Message 后唯一的出口适配器-- // 只有在走 legacy api.Client 路径(Config.Provider == nil)时才调用. // 新 Provider 路径(Gemini/OpenAI)走 queryMessagesToFlyto,不经过此函数. // 替代方案: // - 否决:每次新增 provider 都要写 api.RequestMessage → flyto 的反向转换, // 且 api.RequestMessage 是 JSON RawMessage,每次读取都要反序列化(运行时开销). func queryMessageToAPI(msg query.Message) api.RequestMessage { if len(msg.Content) == 1 && msg.Content[0].Type == query.ContentText { return api.NewTextMessage(string(msg.Role), msg.Content[0].Text) } blocks := make([]api.ContentBlock, 0, len(msg.Content)) for _, c := range msg.Content { switch c.Type { case query.ContentText: blocks = append(blocks, api.ContentBlock{Type: "text", Text: c.Text}) case query.ContentThinking: blocks = append(blocks, api.ContentBlock{Type: "thinking", Text: c.Text}) case query.ContentToolUse: blocks = append(blocks, api.ContentBlock{Type: "tool_use", ID: c.ID, Name: c.Name, Input: c.Input}) case query.ContentToolResult: blocks = append(blocks, api.ContentBlock{ Type: "tool_result", ToolUseID: c.ToolUseID, Content: c.Text, IsError: c.IsError, }) } } return api.NewBlockMessage(string(msg.Role), blocks) } // buildAssistantBlocks 从解析到的 blocks 构建 assistant 消息的 content blocks. // 升华改进(ELEVATED): 用排序后的 key 遍历 map,而非假设索引从 0 开始连续递增-- // 在非编程场景(如消息队列消费),序号可能不连续(中间有过滤/丢弃), // 旧方案 for i := 0; i < len(blocks) 在非连续索引时会提前终止丢失后面的 block. // 替代方案:<原方案假设 index 从 0 到 len(blocks)-1 连续,用 for i := 0; i < len(blocks); i++> func buildAssistantBlocks(blocks map[int]*blockState) []api.ContentBlock { // 收集并排序所有索引 indices := make([]int, 0, len(blocks)) for idx := range blocks { indices = append(indices, idx) } sort.Ints(indices) result := make([]api.ContentBlock, 0, len(blocks)) for _, i := range indices { block := blocks[i] switch block.blockType { case "text": result = append(result, api.ContentBlock{ Type: "text", Text: block.text, }) case "thinking": result = append(result, api.ContentBlock{ Type: "thinking", Text: block.text, }) case "tool_use": var input map[string]any if block.partialJSON != "" { _ = json.Unmarshal([]byte(block.partialJSON), &input) } if input == nil { input = make(map[string]any) } result = append(result, api.ContentBlock{ Type: "tool_use", ID: block.id, Name: block.name, Input: input, }) } } return result } // extractResponsePreview 从 assistant content blocks 中提取回复文本预览. // 拼接所有 text 块(跳过 thinking / tool_use),截断到前 500 字节. // 用于 post_sampling hook 的 RESPONSE_PREVIEW 环境变量. // // 精妙之处(CLEVER): 截断在字节边界而非字符边界-- // 多字节 UTF-8 截断可能生成末尾乱码,但 hook 脚本只做模式匹配, // 500 字节内的内容通常都是完整的中英文句子,极少被截断在字符中间. // 若要严格正确,应用 utf8.ValidString 修剪,但那会引入更多代码,不值得. func extractResponsePreview(blocks []api.ContentBlock) string { var buf strings.Builder for _, b := range blocks { if b.Type == "text" && b.Text != "" { buf.WriteString(b.Text) } } preview := buf.String() const maxBytes = 500 if len(preview) > maxBytes { preview = preview[:maxBytes] } return preview } // extractResponsePreviewFromContent 从 query.Content 列表中提取回复文本预览. // 替代旧的 extractResponsePreview(接受 []api.ContentBlock)-- // 引擎历史迁移到 query.Message 后,assistant 内容用 []query.Content 存储. func extractResponsePreviewFromContent(content []query.Content) string { var buf strings.Builder for _, c := range content { if c.Type == query.ContentText && c.Text != "" { buf.WriteString(c.Text) } } preview := buf.String() const maxBytes = 500 if len(preview) > maxBytes { preview = preview[:maxBytes] } return preview } // extractToolCalls 从解析到的 blocks 中提取工具调用请求. // 升华改进(ELEVATED): 与 buildAssistantBlocks 一致,用排序后的 key 遍历-- // 保证工具调用的执行顺序与模型返回的顺序一致,不因 map 遍历随机性而变. // 替代方案:<原方案用 for i := 0; i < len(blocks) 遍历,非连续索引时会丢失 block> func extractToolCalls(blocks map[int]*blockState) []tools.ToolCall { indices := make([]int, 0, len(blocks)) for idx := range blocks { indices = append(indices, idx) } sort.Ints(indices) var calls []tools.ToolCall for _, i := range indices { block := blocks[i] if block.blockType != "tool_use" { continue } var rawInput json.RawMessage if block.partialJSON != "" { rawInput = json.RawMessage(block.partialJSON) } else { rawInput = json.RawMessage("{}") } calls = append(calls, tools.ToolCall{ ID: block.id, Name: block.name, Input: rawInput, }) } return calls } // getToolSchemaHint 查找工具的 InputSchema,生成诊断提示字符串. // // 升华改进(ELEVATED): 失败时不仅告知"JSON 无效",还附上工具期望的参数 Schema-- // 让模型在下一轮能对症下药地修正调用,而不是重复生成同样残缺的 JSON. // 早期实现 无此机制(失败直接 throw,无上下文). // 替代方案:<直接返回空字符串,不查 schema> - 否决:错误信息不完整,LLM 修复率低. func (e *Engine) getToolSchemaHint(toolName string) string { if e.tools == nil { return "" } t, ok := e.tools.Get(toolName) if !ok { return fmt.Sprintf("(工具 %q 未在注册表中找到,请确认工具名称正确)", toolName) } schema := t.InputSchema() if len(schema) == 0 { return "" } const maxSchemaBytes = 512 hint := string(schema) if len(hint) > maxSchemaBytes { hint = hint[:maxSchemaBytes] + "...(截断)" } return fmt.Sprintf("工具 %q 期望的 InputSchema: %s", toolName, hint) } // newErrorEvent 从 EngineError 创建结构化的 ErrorEvent. // // 如果 err 是 EngineError,提取结构化信息填充到 ErrorEvent; // 否则创建一个普通的 ErrorEvent. func newErrorEvent(err error) *ErrorEvent { var engErr *EngineError if errors.As(err, &engErr) { return &ErrorEvent{ Err: err, Code: string(engErr.Code), Suggestion: engErr.Suggestion, Retryable: engErr.Retryable, } } return &ErrorEvent{Err: err} } // emitCheckpointSuggested 对工具调用进行高风险静态分析, // 若匹配到危险模式则向 ch 推送 CheckpointSuggestedEvent(非阻塞建议). // // 检测范围: // - Bash 工具:IsDangerousCommand 分析命令文本(rm -rf / SQL DROP / sudo 等) // - FileEdit/FileWrite:检测目标路径是否为受保护文件 // - 其他工具:暂不检测(可通过扩展 ToolRiskAnalyzer 接口扩充) // // 精妙之处(CLEVER): ch 是 engine.Run() 的事件通道,send 永远是非阻塞的-- // 消费层保证 chan 足够大(或无限制),此处无需 select + default. // 若消费层不关心此事件,只需忽略 EventType()=="checkpoint_suggested" 的事件即可. func (e *Engine) emitCheckpointSuggested(ch chan<- flyto.Event, tc tools.ToolCall) { var parsedInput map[string]any _ = json.Unmarshal(tc.Input, &parsedInput) if parsedInput == nil { parsedInput = make(map[string]any) } var isDangerous bool var info permission.DangerInfo switch tc.Name { case "Bash": // Bash 工具:分析 command 字段 if cmd, ok := parsedInput["command"].(string); ok && cmd != "" { isDangerous, info = permission.AnalyzeDanger(cmd) } case "FileEdit": // FileEdit:检测目标路径 if path, ok := parsedInput["file_path"].(string); ok && path != "" { isDangerous, info = permission.AnalyzeDanger("edit " + path) } case "FileWrite": // FileWrite:写入操作对受保护文件尤其危险 if path, ok := parsedInput["file_path"].(string); ok && path != "" { isDangerous, info = permission.AnalyzeDanger("write " + path) } } if !isDangerous { return } ch <- &CheckpointSuggestedEvent{ ToolCallID: tc.ID, ToolName: tc.Name, Input: parsedInput, RiskReason: info.Reason, RiskPattern: info.Pattern, } // 把 DangerInfo 里 heredoc body 在原文的字节半开区间透传给 // 观测面, 当危险字面量归因到某个 heredoc body 时, 审计 / 日志 // 消费端能直接读 "source bytes N-M" 精确定位用户输入的触发段 // (零值表示非 heredoc 归因, 消费端忽略). // // Forward DangerInfo's heredoc body byte interval (in ORIGINAL // source) to the observer payload; when the dangerous literal is // attributed to a heredoc body, audit / logging consumers can // pinpoint the user-input segment via "source bytes N-M" (zero // values mean no heredoc attribution -- consumers should ignore). payload := map[string]any{ "tool": tc.Name, "tool_id": tc.ID, "reason": info.Reason, "pattern": info.Pattern, } if info.HeredocBodyEnd > info.HeredocBodyStart { payload["heredoc_body_start"] = info.HeredocBodyStart payload["heredoc_body_end"] = info.HeredocBodyEnd } e.observer.Event("checkpoint_suggested", payload) } // invokeCheckpointHandler 安全地调用 CheckpointHandlerFn,实现 deny-safe 语义. // // 精妙之处(CLEVER): recover() 将 handler panic 转换为"拒绝"-- // 调用方代码 panic 不应传播到引擎 runLoop 导致整个 session 崩溃. // deny-safe 原则:handler 不可用时宁可拒绝操作,不可默许不可逆行为. // // fn 为 nil 时直接返回 false(未注册 = 拒绝). func (e *Engine) invokeCheckpointHandler(fn CheckpointHandlerFn, evt CheckpointEvent) (allowed bool) { if fn == nil { return false } defer func() { if r := recover(); r != nil { e.observer.Event("checkpoint_handler_panic", map[string]any{ "tool": evt.ToolName, "panic": fmt.Sprintf("%v", r), }) allowed = false } }() return fn(evt) } // estimateCost 估算 API 调用成本(美元). // 优先使用 ModelRegistry 的定价,回退到 tokenizer 内部定价表. func (e *Engine) estimateCost(model string, inputTokens, outputTokens int) float64 { reg := e.cfg.ModelRegistry() return reg.EstimateSimpleCost(model, inputTokens, outputTokens) } // estimateCostStatic 静态版本的成本估算(不需要 Engine 实例). // 使用 tokenizer 包的内部定价表,向后兼容. func estimateCostStatic(model string, inputTokens, outputTokens int) float64 { return tokenizer.EstimateCost(inputTokens, outputTokens, model) } // buildFlytoRequest 将引擎内部请求格式转换为 flyto.Request(供 Provider 接口使用). // // 升华改进(ELEVATED): 替代旧的 buildFlytoRequest(model, req, promptBlocks)-- // 早期方案从 api.MessageRequest 提取 Messages,需要 json.Unmarshal(运行时开销). // 现在直接从 query.Message(结构化类型)转换,零反序列化开销. // // 历史包袱(LEGACY): Anthropic 专有功能(beta headers,cache_control 分块,ExtendedThinking) // 无法通过 flyto.Request 传递--Provider 路径只支持通用功能(text/tool use/tool result). // 如需 Anthropic 扩展功能,应使用 nil Provider(走 api.Client 路径). func buildFlytoRequest(model string, maxTokens int, promptBlocks []agentctx.SystemPromptBlock, messages []query.Message, toolDefs []api.ToolDef, responseFormat *flyto.ResponseFormat, fastMode bool, effort string) *flyto.Request { // 将 promptBlocks 转为 flyto.SystemBlock 切片(用于 per-block 缓存) var systemBlocks []flyto.SystemBlock if len(promptBlocks) > 0 { systemBlocks = make([]flyto.SystemBlock, len(promptBlocks)) for i, b := range promptBlocks { systemBlocks[i] = flyto.SystemBlock{ Text: b.Text, CacheScope: b.CacheScope, } } } return &flyto.Request{ Model: model, Messages: queryMessagesToFlyto(messages), System: agentctx.BlocksToString(promptBlocks), // fallback:不支持 SystemBlocks 的 Provider 仍能读到系统提示 MaxTokens: maxTokens, Tools: apiToolDefsToFlyto(toolDefs), ResponseFormat: responseFormat, FastMode: fastMode, Effort: effort, SystemBlocks: systemBlocks, } } // queryMessagesToFlyto 将 query.Message 列表转换为 flyto.Message 列表(Provider 路径专属). // // 升华改进(ELEVATED): 替代旧的 apiMessagesToFlyto-- // 早期方案从 api.RequestMessage(JSON raw bytes)解析,每次都需要 json.Unmarshal. // 现在 query.Message 是结构化类型,直接字段映射,零反序列化开销. // 替代方案:<以 flyto.Message 作为引擎内部历史格式> // - 否决:需要同时扩展 flyto.Block(加 thinking/image/document 类型) // 并迁移 10 个 norm_*.go 规范化器(约 800 行),工作量过大. // query.Message 已有全部所需类型,normalizer 零改动. func queryMessagesToFlyto(msgs []query.Message) []flyto.Message { result := make([]flyto.Message, 0, len(msgs)) for _, msg := range msgs { fm := flyto.Message{Role: flyto.Role(msg.Role)} for _, c := range msg.Content { switch c.Type { case query.ContentText: fm.Blocks = append(fm.Blocks, flyto.TextBlock(c.Text)) case query.ContentThinking: fm.Blocks = append(fm.Blocks, flyto.ThinkingBlock(c.Text, nil)) case query.ContentToolUse: fm.Blocks = append(fm.Blocks, flyto.ToolUseBlock(c.ID, c.Name, c.Input)) case query.ContentToolResult: // 路径 B: 若 c.ResultBlocks 非空 (工具返图等非文本载荷), // 递归转成 flyto.Block 数组走 array-form tool_result 路径; // 纯文本结果走原 ToolResultBlock(string) 路径. // // Path B: when c.ResultBlocks is non-empty (tool returns // image or other non-text payload), recursively translate // into flyto.Block array for array-form tool_result wire; // plain text results fall through to the original // ToolResultBlock(string) path. if len(c.ResultBlocks) > 0 { nested := make([]flyto.Block, 0, len(c.ResultBlocks)) for _, nc := range c.ResultBlocks { switch nc.Type { case query.ContentText: nested = append(nested, flyto.TextBlock(nc.Text)) case query.ContentImage: if nc.Source != nil { switch nc.Source.Type { case "base64": nested = append(nested, flyto.ImageBlockBase64(nc.Source.MediaType, nc.Source.Data)) case "url": nested = append(nested, flyto.ImageBlockURL(nc.Source.URL)) } } } } fm.Blocks = append(fm.Blocks, flyto.ToolResultBlocks(c.ToolUseID, nested, c.IsError)) } else { fm.Blocks = append(fm.Blocks, flyto.ToolResultBlock(c.ToolUseID, c.Text, c.IsError)) } case query.ContentImage: // 路径 A: 用户粘图. query.ContentSource 已携带分离的 // base64/url 字段, 直接映射到 flyto.BlockImage. 非 vision // provider 会在消息转换时 return error (见各 provider // switch 的 BlockImage case). // // Path A: user-inline image. query.ContentSource carries // separate base64/url fields; map straight to flyto // BlockImage. Non-vision providers will return error at // message-convert time (see BlockImage cases in each // provider switch). if c.Source != nil { switch c.Source.Type { case "base64": fm.Blocks = append(fm.Blocks, flyto.ImageBlockBase64(c.Source.MediaType, c.Source.Data)) case "url": fm.Blocks = append(fm.Blocks, flyto.ImageBlockURL(c.Source.URL)) } } // ContentDocument 暂不接入 (独立主题). } } if len(fm.Blocks) > 0 { result = append(result, fm) } } return result } // sessionTitle 从用户提示词生成简短会话标题. // // 取前 60 个 Unicode 字符,超出时截断并追加省略号. // 精妙之处(CLEVER): 用 []rune 而非 []byte 截断,防止截断在多字节 UTF-8 字符中间(中文每字 3 字节). // 替代方案: - 否决:截断位置在汉字中间会产生乱码. func sessionTitle(prompt string) string { const maxRunes = 60 prompt = strings.TrimSpace(prompt) r := []rune(prompt) if len(r) <= maxRunes { return prompt } return string(r[:maxRunes]) + "…" } // apiToolDefsToFlyto 将 api.ToolDef 列表转换为 flyto.Tool 列表. // 丢弃 CacheControl 字段(cache_control 由 provider 自己管理). func apiToolDefsToFlyto(defs []api.ToolDef) []flyto.Tool { if len(defs) == 0 { return nil } tools := make([]flyto.Tool, len(defs)) for i, d := range defs { tools[i] = flyto.Tool{ Name: d.Name, Description: d.Description, InputSchema: d.InputSchema, } } return tools } // makeMemoryQueryFn 将 flyto.ModelProvider 封装为 memory.ModelQueryFunc. // // 精妙之处(CLEVER): 闭包捕获 provider + model,让 memory 包完全不依赖 flyto-- // memory.ModelQueryFunc 是纯函数类型(string→string), // 而 flyto.ModelProvider.Stream 返回事件流需要在此处收集成文本. // 替代方案:让 memory 包直接 import flyto(会引入 flyto 的所有传递依赖, // 违反 memory 包的轻依赖原则). func makeMemoryQueryFn(provider flyto.ModelProvider, model string) memory.ModelQueryFunc { return func(ctx context.Context, systemPrompt, userPrompt string) (string, error) { req := &flyto.Request{ Model: model, MaxTokens: 256, System: systemPrompt, Messages: []flyto.Message{ {Role: flyto.RoleUser, Blocks: []flyto.Block{flyto.TextBlock(userPrompt)}}, }, } // Memory-query is a background fire-and-forget LLM call on the // memory consolidation path; label it so retry failures are not // confused with main-thread run-loop failures. // // memory-query 是 memory 巩固路径上的后台即发即忘 LLM 调用; // 标记它让重试失败不被误认为主线程 run-loop 失败. ctx = retry.WithQuerySource(ctx, SourceBackground.String()) ch, err := provider.Stream(ctx, req) if err != nil { return "", fmt.Errorf("memory query: stream: %w", err) } var sb strings.Builder for evt := range ch { switch e := evt.(type) { case *flyto.TextDeltaEvent: sb.WriteString(e.Text) case *flyto.ErrorEvent: return "", fmt.Errorf("memory query: model error: %s", e.Err.Error()) } } return sb.String(), nil } }