# 架构设计文档 Flyto Agent Engine 的内部架构,设计原则和关键决策. ## 目录 - [设计原则](#设计原则) - [模块依赖图](#模块依赖图) - [数据流](#数据流) - [核心模块详解](#核心模块详解) - [NormalizePipeline 架构](#normalizepipeline-架构) - [三层压缩降级](#三层压缩降级) - [SubAgent Fork 模式](#subagent-fork-模式) - [MemoryExtractor 架构](#memoryextractor-架构) - [RelevanceScorer 架构](#relevancescorer-架构) - [MemoryType 注册制](#memorytype-注册制) - [Bash AST 解析器架构](#bash-ast-解析器架构) - [模型角色系统架构](#模型角色系统架构) - [EngineObserver 可观测性架构](#engineobserver-可观测性架构) - [StrictMode 严格模式](#strictmode-严格模式) - [ToolResultPairing 配对修复](#toolresultpairing-配对修复) - [防御性编程的三层防御模式](#防御性编程的三层防御模式) - [KAIROS/Dream 系统的架构位置](#kairosdream-系统的架构位置) - [跨行业扩展的架构支撑](#跨行业扩展的架构支撑) - [叠加而非替换(Composite 层)](#叠加而非替换composite-层) - [ToolCapability 安全协议](#toolcapability-安全协议) - [FileHistory 文件历史系统](#filehistory-文件历史系统) - [OperationLog 统一操作日志](#operationlog-统一操作日志) - [安全审计体系(INF-5)](#安全审计体系inf-5) - [数据安全(文件/DB/API 三维度)](#数据安全文件dbapi-三维度) - [TokenBudgetManager 预算管理](#tokenbudgetmanager-预算管理) - [QueryChainTracking 查询链追踪](#querychaintracking-查询链追踪) - [设计决策记录](#设计决策记录) ## 设计原则 ### 1. 零 UI 依赖 引擎不知道自己跑在终端还是 Web 服务里.所有输出通过 `Event` 接口和 Go channel 推送,消费层自己决定怎么渲染. ``` Engine.Run() --> <-chan Event --> 消费层(CLI / HTTP / SDK) ``` ### 2. 流式优先 不是"先缓冲再输出",而是 token 一到就推送.`TextDeltaEvent` 每次只包含几个 token 的增量文本.消费层用 `for range` 读取 channel 即可实现实时流式输出. ### 3. 可组合 每个子系统都是接口,可替换: - `permission.Engine` -- 权限检查,可换成自定义实现 - `memory.Store` -- 记忆存储,可换成数据库后端 - `permission.Handler` -- 权限 UI,CLI 弹对话框,HTTP 走 WebSocket - `tools.Tool` -- 工具,可动态注册/注销 ### 4. 状态显式传递 不用全局变量和隐式状态.核心 `runLoop` 的消息历史通过参数传递,会话状态集中在 `Session` 结构体中,统计信息通过事件推送. ### 5. 零外部依赖 `go.mod` 只有标准库.HTTP Server 用 Go 1.22+ 标准库路由,JSON 用 `encoding/json`,正则用 `regexp`.编译产物是单个静态二进制. ### 6. 防御性编程 永远不信任模型输出.每个模型交互点都有三层防御(指令 + 参数 + 兜底),每个失败路径都有降级策略.详见 [defensive-programming.md](defensive-programming.md). ### 7. 不硬编码模型 ID 业务逻辑通过 `ModelRole` 引用模型(`RoleMain`,`RoleFast`,`RoleThinking`),不直接使用模型 ID 字符串.角色到模型的映射由 `ModelRegistry` 管理,运行时可修改. ## 消费者架构(框架 vs 消费者) Flyto Agent 是一个**智能体框架**,不是一个 CLI 工具.CLI,HTTP Server,TUI 都是它的消费者. ``` ┌─────────────────────────────────────────────────────────────┐ │ 消费者(应用层) │ │ │ │ tui/agent-engine (独立 TUI,占位) │ │ │ │ platform/ 你的 Go 业务代码 │ │ (平台服务,占位) (直接 import pkg/engine) │ └──────────────────────────────┬──────────────────────────────┘ │ 都通过同一个接口消费 ▼ ┌─────────────────────────────────────────────────────────────┐ │ pkg/engine — 框架本体 │ │ │ │ engine.New(cfg) → *Engine │ │ engine.Session(id) → *Session │ │ session.Send(ctx, prompt) → <-chan Event │ │ │ │ 模型调用规范、token 计费、context window — 都是引擎的职责 │ │ TurnStartEvent 携带 ContextWindowTokens,消费者无需硬编码 │ └─────────────────────────────────────────────────────────────┘ 可选 SDK 包(按需 import,引擎不强依赖): pkg/security/AuditSink — 接口定义(引擎内) audit-pg/ — PostgreSQL 实现(将来,独立 module) audit-file/ — 文件实现(将来,独立 module) ``` ### tui/agent-engine(占位) TUI 终端入口,当前为占位目录.平台服务由 `platform/` 独立实现. ### 消费者接入的三种 API 形态 消费者接入引擎时,面对的接口按机制分**三种形态**,不按业务领域分: - **订阅 (push)** — `<-chan flyto.Event` / `flyto.EventObserver`: 引擎异步通知 - **调取 (pull)** — `Session.Stats()` / `DenialTracker.Stats()` / `Classifier.Classify()`: 消费者同步查询 - **同步回调 (callback)** — `permission.Handler` / `ApprovalPolicy` / `ElicitationHandler` / `AuditSink` / `HookHandler`: 引擎阻塞等决策 完整分类 + 选型速查 + 消费者实现清单 (CLI/SaaS/测试) 见 [`docs/api-reference.md` "API 消费形态" 章节](api-reference.md#api-消费形态--api-consumption-patterns). ## 模块依赖图 ``` 消费层(应用层): tui/agent-engine ─────────┐ ← 终端 UI(占位) platform/ ─────────┐ │ ← 平台服务(占位) examples/basic ────┘ │ ← 10 行最小 demo │ ▼ pkg/engine ◄── (你的 Go 代码) │ ┌────────┬────────┬───┴────┬─────────┬──────────┬──────────┐ ▼ ▼ ▼ ▼ ▼ ▼ ▼ pkg/query pkg/tools pkg/perm pkg/mem pkg/hooks pkg/plugin pkg/evolve │ │ ┌─────┴─────┐ │ ▼ ▼ ▼ pkg/tools/ pkg/tools/ pkg/evolve/ builtin deferred (uses engine) │ ┌─────────────┼──────────────────┐ ▼ ▼ ▼ internal/transport internal/git internal/mcp │ ▼ internal/bash internal/tokenizer internal/logger (AST parser) (token/pricing) pkg/config config.go 多级配置加载 models.go ModelRegistry + ModelConfig(= flyto.ModelInfo 类型别名)+ 角色映射 pkg/context prompts.go 静态提示词常量(9 个英文段落,对应早期方案) prompts_zh.go 静态提示词中文翻译(9 个段落,语义与英文版一字不差) bundle.go Section/SectionRegistry/PromptBundle/BundleRegistry/SystemPromptBlock(模块 15) default_bundle.go claude+programming 默认 Bundle(9 静态 + 6 动态 sections) bundle_overlay.go BundleOverlay(局部覆盖 base Bundle)/ NewBundleFromFunc(函数式工厂) chinese_bundle.go NewChineseBundle / ChineseBundleKeys / RegisterChineseBundle(中文模型适配) compact.go 三层压缩降级(CompactTiered + CircuitBreaker) 通过 flyto.ModelProvider 接口统一所有 Provider(不再接受 Anthropic 专有参数) compact_persist.go 断路器状态跨进程持久化(FilePersister / CircuitBreakerPersister 接口) composite.go CompositePolicy(多策略叠加) policy.go CompactionPolicy 接口 grouping.go 消息按 API 往返分组 restorer.go PostCompactRestorer(压缩后上下文恢复) context.go 上下文构建(Builder + BuildSystemPromptBlocks) instructions.go 指令管理 图例: pkg/ = 公开包(可被外部导入) internal/ = 内部包(仅本模块可用) ──► = 依赖方向 ``` ### 包职责划分 | 包 | 职责 | 关键文件 | |----|------|----------| | `pkg/engine` | 核心引擎:Engine, Session, Events, Errors, runLoop, SubAgent(fork), NormalizePipeline, Fallback, Reminders, FlushGate | 30+ 个文件 | | `pkg/tools` | 工具系统:Tool 接口, Registry, Orchestrator, DeferredRegistry | 4 个文件 | | `pkg/tools/builtin` | 14 个内置工具实现 | 16 个文件 | | `pkg/permission` | 权限系统:Engine, Rules, Checker, Learning, BashSecurity, Denial, Explainer, Filesystem, CompositeHandler | 12 个文件 | | `pkg/memory` | 记忆系统:Store, Frontmatter, Relevance, Scanner, RelevanceScorer, CompositeScorer, ExternalScorer, MemoryExtractor, MemoryTypeRegistry, SyncAdapter, GitSyncAdapter | 9 个文件 | | `pkg/hooks` | Hook 系统:Manager, Executor, Types, Integration(含 14 种 hook 类型) | 5 个文件 | | `pkg/evolve` | 自进化:Evolver, ToolBuilder, SkillLearner, SelfReflector, EngineIntegration | 5 个文件 | | `pkg/config` | 多级配置 + 模型角色系统(ModelRegistry) | 2 个文件 | | `pkg/context` | 系统提示构建,三层压缩降级(CompactTiered),CompositePolicy,CompactCircuitBreaker,指令管理 | 8 个文件 | | `pkg/query` | 查询引擎:Message(含 Metadata 扩展字段), Content 类型定义 | 1 个文件 | | `pkg/plugin` | 插件系统:Host, Manifest, Loader, Skill | 4 个文件 | | `internal/transport` | 通用 SSE 流式 HTTP 客户端:SSE 解析,Prompt Caching,Beta Headers,错误分类,预连接,流守卫.所有配置通过 ClientOption 注入(WithMessagePath/WithAPIVersion/WithRetryPolicy/WithClassifier),无 Anthropic 默认值 | 8 个文件 | | `internal/syslib/bash` | Bash AST 解析器:AST 节点,解析器,信息提取,Heredoc | 4 个文件 | | `internal/syslib/diff` | 文件差异计算(Myers 算法) | 2 个文件 | | `internal/syslib/git` | Git 操作封装:GetInfo + validateRef 注入防御,供 context 提示词组装消费 (memory 同步走 execenv.Executor DI, 刻意独立) | 1 个文件 | | `internal/mcp` | MCP 协议:Client, Bridge, JSON-RPC, Manager, Types | 5 个文件 | | `internal/tokenizer` | Token 估算和模型定价 | 1 个文件 | | `internal/logger` | 结构化日志 | 1 个文件 | | `pkg/websocket` | WebSocket 传输层 | 2 个文件 | ## 数据流 ### 单次运行数据流 ``` 用户输入 "修复 bug" │ ▼ Engine.Run(ctx, prompt) │ ▼ (goroutine) runLoop() │ ├──► 1. buildSystemPromptWithContext() │ 构建系统提示:基础提示 + 工具描述 + 环境信息 + 用户追加 │ 启用 Prompt Caching → 静态段落标记 cache_control │ ├──► 1.5 ReminderSystem.CollectReminders() │ 检查日期变更 / 文件外部修改 / 相关记忆 │ → 注入 到本轮消息 │ ├──► 2. 构建 messages(用户消息 + 历史) │ ├──► 2.5 NormalizePipeline.Run(messages) │ 10 步规范化管道:tool_use/tool_result 配对修复(4 case) │ → 孤立 tool_result 移除 → 错误内容剥离 │ → 孤立 thinking 过滤 → 空消息过滤 → 空白 assistant 过滤 │ → tool_use 输入规范化 → 连续同角色合并 → 图片验证 │ ├──► 3. client.CreateMessageStream() ──► 模型 API │ Beta Headers: prompt-caching, extended-thinking, effort... │ 流式 SSE 连接 │ ├──► 4. 处理流式事件 │ EventContentBlockDelta │ ├── text → TextDeltaEvent → ch │ ├── thinking → ThinkingDeltaEvent → ch │ └── tool_use → 累积 JSON │ EventContentBlockStop │ ├── text → TextEvent → ch │ ├── thinking → ThinkingEvent → ch │ └── tool_use → ToolUseEvent → ch │ ├──► 5. 如果 stop_reason == "tool_use" │ │ │ ├── 权限检查(PermissionEngine.Check) │ │ ├── BashSecurity: AST 解析 → 命令提取 → 安全分析 │ │ └── 规则匹配 / 风险评估 / 用户决策 │ │ │ ├── orchestrator.ExecuteBatch() │ │ ├── 分批:连续 ConcurrencySafe 工具并行 │ │ ├── 非 ConcurrencySafe 工具串行 │ │ ├── 大结果 → ResultStore 存磁盘 → 返回摘要 │ │ └── 每个结果 → ToolResultEvent → ch │ │ │ ├── 追加 tool_result 到消息历史 │ └── → 回到步骤 3(继续调用 API) │ ├──► 6. 如果 stop_reason == "end_turn" 或 "stop_sequence" │ → DoneEvent → ch → close(ch) │ ├──► 7. 如果 stop_reason == "max_tokens" │ → 增大 maxTokens → 追加 "please continue" → 回到步骤 3 │ ├──► 8. API 调用失败 │ → FallbackTracker.ShouldFallback(err) │ → 有备用模型 → 切换模型 → 回到步骤 3 │ → 无备用模型 → ErrorEvent │ └──► 每轮检查: ├── maxTurns 限制 ├── maxBudget 限制(80% 时 WarningEvent) └── maybeCompact()(MicroCompact → CompactTiered 三层降级) ``` ### Bash 工具执行数据流(含 AST 解析) ``` 模型返回 tool_use: Bash {command: "sudo npm install && git push"} │ ▼ 权限系统 (BashSecurity) │ ├──► 1. bash.Parse(command) │ AST 解析(处理 heredoc / 引号 / 命令替换 / 算术展开) │ ├──► 2. bash.ExtractCommands(ast) │ 递归遍历 AST → 提取 CommandInfo 列表 │ [sudo npm install, git push] │ ├──► 3. bash.ExtractCommandName(cmd) │ 跳过前缀(sudo/env/nohup/...) │ → ("npm", "install"), ("git", "push") │ ├──► 4. 命令分类 + 安全检查 │ ClassifyShellCommand → ClassGeneral │ 检查重定向目标是否静态 │ 检查危险命令(rm -rf, git push --force) │ └──► 5. 权限决策 ├── 规则匹配 → Allow/Deny └── 需要询问 → PermissionRequestEvent → 用户决策 ``` ### 权限交互流程 ``` Engine runLoop │ ▼ 工具需要权限 │ ├──► PermissionEngine.Check(req) │ │ │ ├── 匹配规则 → Allow/Deny(直接返回) │ │ │ └── 需要询问 → DecisionAsk │ │ │ ├──► ExplainPermissionRequest() → 人类可读描述 │ ├──► AssessRisk() → 风险等级 │ ├──► SuggestRules() → 建议的永久规则 │ │ │ └──► PermissionHandler(ctx, req) │ │ │ ├── CLI: 终端弹对话框 │ ├── HTTP: PermissionRequestEvent → SSE │ │ 等待 POST /v1/sessions/{id}/permissions/{rid} │ └── SDK: 回调函数 │ └──► 记录决策到 LearningTracker (达到阈值时生成 PermissionLearnEvent) ``` ### 会话生命周期 ``` agent.Session("id") │ ├── newSession(): 创建 Session 结构体 │ messages: [] │ pendingPermissions: map{} │ stats: 0 │ ▼ session.Send(ctx, prompt) │ ├── 快照当前消息历史 ├── engine.Run(ctx, prompt, WithMessages(history)) │ 返回 rawEvents channel │ ├── goroutine: trackEvents() │ ├── 转发事件到消费层 │ ├── 收集 assistantText │ ├── 收集 token 统计 │ └── 完成时: │ ├── 追加 user message 到 history │ ├── 追加 assistant message 到 history │ └── 更新统计 (inputTokens, outputTokens, costUSD, turnCount) │ ▼ session.Send(ctx, next_prompt) ← 第二轮自动包含第一轮历史 │ ... │ ▼ session.Close() │ └── SaveTranscript() → ~/.flyto/projects//transcripts/.json ``` ## 核心模块详解 ### Engine (pkg/engine) 引擎是系统的中央协调器.它的职责: 1. **初始化子系统** -- 创建 Registry,PermissionEngine,MemoryStore,HookManager,PluginHost,ModelRegistry,ReminderSystem,FileStateCache,ResultStore 2. **管理 runLoop** -- 核心查询循环(调用 API -> 处理事件 -> 执行工具 -> 循环) 3. **管理 Session** -- 有状态的多轮对话 + 会话持久化 4. **上下文管理** -- 系统提示构建,自动压缩,系统提醒注入 5. **模型降级** -- FallbackTracker 在 API 失败时自动切换备用模型 6. **子 Agent 管理** -- SubAgentRegistry 跟踪所有活跃的子 Agent 关键类型: - `Engine` -- 主入口,持有所有子系统引用 - `Config` -- 统一配置结构体(含 ModelRegistry,CompactionPolicies,PermissionHandlers,Observer,StrictMode) - `Session` -- 多轮会话状态 - `Event` -- 事件接口(16 种具体类型) - `EngineError` -- 结构化错误(15 种错误码) - `EventObserver` -- 可观测性核心接口(Event + Error) - `MetricObserver` -- 指标接口(可选实现,接口断言检测) - `TraceObserver` -- 调用链接口(可选实现) - `StrictMode` -- 严格模式配置(ToolResultPairing / CompactFailure / NormalizerError) - `NormalizePipeline` -- 10 步可组合消息规范化管道 - `MessageNormalizer` -- 规范化步骤接口 - `FallbackTracker` -- 模型降级追踪器 - `ReminderSystem` -- 系统提醒管理 - `FileStateCache` -- 文件状态缓存(LRU + hash 检测外部修改) - `ResultStore` -- 大结果磁盘持久化 - `SkillLoader` -- 技能文件加载器 - `SubAgent` / `SubAgentRegistry` -- 子 Agent 系统(Fork 模式,共享 prompt cache) - `WorktreeInfo` -- Git Worktree 管理 - `InputProcessor` -- 输入预处理(文件引用,图片,URL) - `ToolSummaryGenerator` -- 工具结果摘要生成 - `FlushGate[T]` -- 泛型消息排队门(压缩/重连期间暂存消息) ### Query (pkg/query) `query.Message` 结构新增 `Metadata map[string]interface{}` 可扩展元数据字段: - `is_attachment` -- 标记附件消息 - `attachment_type` -- 附件类型 - `is_virtual` -- 虚拟消息(不发 API) - `error_source` -- 错误来源映射 设计理念:不为每个场景加专用字段,避免结构体僵化. ### Tools (pkg/tools) 工具系统由四部分组成: 1. **Tool 接口** -- 所有工具的统一契约(Name, Description, InputSchema, Execute) 2. **Registry** -- 线程安全的工具注册表(支持别名,动态注册/注销) 3. **Orchestrator** -- 并发调度器(连续 ConcurrencySafe 工具并行,非安全工具串行) 4. **DeferredRegistry** -- 延迟加载系统(工具数超过阈值时非核心工具按需激活) 延迟加载机制: ``` 工具总数 <= 15 → 全部活跃(无延迟加载) 工具总数 > 15 → 核心工具(Bash/Read/Edit/Write/Glob/Grep/Agent/ToolSearch)始终活跃 其余通过 ToolSearch 按需发现和激活 ``` 编排算法: ``` 输入: [Glob, Grep, Edit, Glob, Grep] 分批: [[Glob, Grep](并行), [Edit](串行), [Glob, Grep](并行)] 执行: batch1 → 并行 → 等待 → batch2 → 串行 → batch3 → 并行 → 等待 ``` ### Permission (pkg/permission) 权限系统的四层检查: 1. **模式判断** -- bypass 直接放行,plan 直接拒绝 2. **规则匹配** -- 前缀匹配(Bash 命令),路径 glob(文件操作),域名匹配(WebFetch) 3. **风险评估** -- 检测危险命令(rm -rf, git push --force 等) 4. **用户决策** -- 通过 Handler 接口让消费层决策 Bash 命令安全分析(`pkg/permission/bash_security.go`)现在基于 AST 解析而非字符串分割: ``` 旧方式: strings.Split(cmd, " ") → 无法处理引号/heredoc/管道 新方式: bash.Parse(cmd) → AST → ExtractCommands() → 完整的命令信息 ``` ### Memory (pkg/memory) 记忆系统使用文件系统存储,每条记忆是一个带 YAML frontmatter 的 markdown 文件. 存储路径:`~/.flyto/projects//memory/` 记忆类型通过 `MemoryTypeRegistry` 分层注册制管理(详见 [MemoryType 注册制](#memorytype-注册制)).内置 4 种编程场景类型: - **User** -- 用户画像(偏好和习惯) - **Feedback** -- 行为指导(根据反馈调整) - **Project** -- 项目上下文(结构,技术栈,约定) - **Reference** -- 外部指针(链接和摘要) 相关性搜索通过 `RelevanceScorer` 接口实现,默认使用文本相似度评分(Jaccard + token 权重 + 子串匹配),支持 `CompositeScorer` 加权组合和 `ExternalScorer` 跨语言桥接. 记忆提取通过 `MemoryExtractor` 接口实现,策略与执行分离. ### 记忆同步(模块 10.2) `SyncAdapter` 接口 + `SyncConfig` 组合实现可插拔同步,调用方通过 `WithSyncAdapter` 选项注入. ``` fileStore.List() / FindRelevant() └── maybePull(ctx) ← 按 PullPolicy 决定是否 Pull └── SyncAdapter.Pull(ctx, baseDir) fileStore.Save() └── maybePush(ctx) ← Save 成功后同步 Push,失败只记录事件 └── SyncAdapter.Push(ctx, baseDir, ConflictPolicy) ``` **核心决策**:Pull 失败 fail-open(本地缓存继续可用),Push 失败仅发 Observer 事件(Save 仍返回成功). | 模式 | 推荐配置 | |------|---------| | CLI 单用户 | `DefaultSyncConfig()` + `GitSyncAdapter` | | API 高频无状态 | `APISyncConfig(5*time.Minute)` + `HTTPSyncAdapter`(P2) | | 离线/不需要同步 | 不设置(默认 `NoopSyncAdapter`,IsAvailable=false,零 overhead)| ### Hooks (pkg/hooks) 14 种 Hook 类型(模块 9.2 新增 pre/post-sampling): | Hook 类型 | 触发时机 | 执行方式 | 可阻止 | |-----------|----------|----------|--------| | `pre_sampling` | API 调用前(每轮) | **同步** | ✅ exit 非零终止本轮 | | `post_sampling` | API 响应后,工具执行前 | **异步** | ❌ | | `pre_tool_use` | 工具执行前 | 同步 | ✅ exit 2 阻止 | | `post_tool_use` | 工具执行成功后 | 同步 | ❌ | | `post_tool_use_failure` | 工具执行失败后 | 同步 | ❌ | | `session_start` | 会话开始 | 异步 | ❌ | | `session_end` | 会话结束 | 异步 | ❌ | | `permission_request` | 请求权限时(可通过 JSON 输出自动批准) | 同步 | ✅ | | `permission_denied` | 权限被拒绝 | 同步 | ❌ | | `stop` | Agent 停止 | 同步 | ✅ | | `notification` | 发送通知 | 异步 | ❌ | | `config_change` | 配置变更 | 同步 | ❌ | | `task_created` | 子任务创建 | 同步 | ❌ | | `task_completed` | 子任务完成 | 同步 | ❌ | 执行策略: - 同步 Hook 按注册顺序执行,fail-open(失败不阻塞后续 Hook) - 异步 Hook 在后台 goroutine 执行,使用独立 context - Hook 命令通过环境变量接收输入,通过 stdout JSON 返回控制指令 ### 插件-Hook 桥接(模块 9.3) `HookDef.Source` 字段(空字符串=全局,非空=插件名)实现单注册表多来源精准管理. ``` Engine.New() ├── 加载 HooksConfig → Register(hooks) // Source="" 全局 └── syncPluginHooks() ├── UnregisterAllBySource(pluginName) // 原子清除旧状态 └── Register(hooks, Source=pluginName) // 重新注册已启用插件 ``` 优先级由注册顺序自然保证(全局先注册 → 全局先执行).`Enable/Disable/Load` 均触发 `syncPluginHooks()` 完整重建,避免 hook 残留. ## NormalizePipeline 架构 `pkg/engine/normalize.go` + `pkg/engine/normalizer.go` + `pkg/engine/norm_*.go`(含 `norm_tool_result_pairing.go`) 从硬编码 3 步升级为 10 步可组合 Pipeline.每个步骤实现 `MessageNormalizer` 接口,通过 Priority 排序执行. ``` MessageNormalizer 接口: Name() string Priority() int // 越小越先执行 Normalize(messages []query.Message) []query.Message NormalizePipeline: Add(step) 追加步骤 Remove(name) 按名称移除 Run(msgs) 按 Priority 排序后依次执行 ``` ### 10 步管道(按 Priority 排序) | Priority | 步骤 | 职责 | |----------|------|------| | 5 | AttachmentReorderer | 附件消息上浮(暂未启用) | | 8 | ToolResultPairingNormalizer | 完整的 tool_use/tool_result 配对修复(4 种 case),通过 Observer 记录修复,严格模式下 panic | | 10 | OrphanToolResultRemover | 移除无对应 tool_use 的 tool_result | | 15 | ErrorContentStripper | 剥离错误内容中的噪音模式 | | 18 | OrphanThinkingFilter | 过滤纯 thinking 的 assistant 消息 | | 20 | EmptyMessageFilter | 过滤空消息 | | 22 | WhitespaceAssistantFilter | 过滤纯空白 assistant 消息 | | 25 | ToolUseInputNormalizer | 规范化 tool_use 输入 | | 30 | ConsecutiveRoleMerger | 合并连续同角色消息 | | 50 | ImageValidator | 验证图片大小(默认 20MB 上限) | ### 设计决策 - **Priority 隐式排序**:每个步骤只关心自己排在第几,新增步骤不需要知道其他步骤的存在 - **每次 Run 排序**:允许 Add 之后,Run 之前动态调整 Priority - **最后一道防线**:即使上层 runLoop 完美,压缩,恢复等操作都可能产生畸形消息序列 - **场景可扩展**:仓储场景追加 `SensorDataNormalizer`,法律场景追加 `RedactionNormalizer` ```go pipeline := DefaultNormalizePipeline() pipeline.Add(&SensorDataNormalizer{}) // 仓储 pipeline.Add(&RedactionNormalizer{}) // 法律 normalized := pipeline.Run(messages) ``` ## 三层压缩降级 `pkg/context/compact.go` -- `CompactTiered` 方法 当上下文接近模型窗口时,按三层降级策略压缩: ``` CompactTiered(ctx, messages, policy) │ ├── 断路器检查: circuitBreaker.ShouldAttempt() │ └── 打开 → 返回错误 │ ├── 第 1 层: singleCompact(单次压缩) │ 估算 token <= compactModelWindow × 85% │ 完整对话 → API 生成摘要 → 摘要 + 最近消息 │ └── 成功 → 返回, PTL → 进入第 2 层 │ ├── 第 2 层: truncateAndRetryCompact(砍头重试) │ 逐步砍掉最旧的消息组(保留 preamble) │ 最多重试 maxTruncateRetries 次 │ └── 成功 → 返回, PTL → 进入第 3 层 │ └── 第 3 层: chunkedCompact(分块压缩) 消息按 token 预算切分为多块 每块独立压缩 → 合并摘要 └── 成功/失败 → 返回 ``` ### CompactCircuitBreaker 断路器防止反复调用失败的 API: - 连续失败 N 次后打开(默认 3 次),拒绝后续压缩请求 - **rate limit (429/529) 不计入失败次数**(rate limit 通常 30 秒恢复,断路器关闭后要重启才能重试) - **时间重置**(默认 5 分钟后自动 Reset,不需要半开状态和后台定时器) - 成功后立即 Reset ### CircuitBreakerPersister(6.5 失败计数持久化) `pkg/context/compact_persist.go` 跨进程保持断路器状态,解决 daemon 崩溃重启后每次浪费 3 次 API 调用的问题: - **接口 `CircuitBreakerPersister`**(`Save/Load`),默认实现 `FilePersister`(JSON 文件) - **per-project 隔离**:文件名 = SHA-256(cwd)[:16],不同项目互不干扰 - **TTL = 1 小时**:超期状态加载时自动丢弃,防止历史失败永久锁死压缩 - **原子写入**(write-then-rename),防止进程崩溃产生截断的 JSON - **fail-open**:Load/Save 失败时静默记录 observer event,不影响压缩功能 - 注入方式:`compressor.SetPersister(p)`(Setter 模式,向后兼容) ### 反向思考决策记录 1. **为什么第 2 层逐步砍而不一次算好?** token 估算有 20% 误差,一次性计算可能砍多或砍少.逐步砍保证不会砍过头. 2. **为什么 rate limit 不计入?** rate limit 通常 30 秒恢复,如果计入会触发断路器,之后要重启 Engine 才能重试.不计入的话,等 rate limit 过了自然能压缩. 3. **为什么用时间重置而不是半开状态?** 简单,不需要后台 goroutine.5 分钟足以让大多数临时问题恢复. 4. **为什么 per-project 而非全局?** 同机器多个项目,A 项目的 prompt_too_long 失败不应污染 B 项目的断路器状态. ## SubAgent Fork 模式 `pkg/engine/subagent.go` 子 Agent 从独立 Engine 实例改为共享 prompt cache 的 fork 模式. ``` 父 Engine │ ├── API Client ────────────┐ ├── System Prompt ─────────┤ 共享(cache key 一致) ├── 完整工具定义列表 ───────┤ │ │ └── fork() ──► SubAgent │ │ ├── 复用 ◄──┘ │ ├── 独立消息历史 │ ├── allowedTools(运行时拦截) │ └── 独立轮数限制 ``` ### 经济账 API 层传完整工具列表(和父 agent 一样)确保 cache key 一致,运行时用 `canUseTool` 函数拦截实际执行. 以 Sonnet 定价计算: - 多传 3K 没用的工具描述:多花 $0.009(3K x $3/M) - 省下 10K 系统提示的缓存命中:省 $0.027(9K x $3/M 变成 0.9K x $0.3/M) - 每个子 agent 调用净省 ~$0.018 - 一个复杂任务 10 个子 agent = 省 $0.18 ### 安全性 `canUseTool` 在运行时拦截.子 agent 请求被禁工具会收到错误消息,模型自己换工具.安全性不降低. ## MemoryExtractor 架构(模块 10.3) `pkg/memory/extractor.go` 配方和厨房分离:Extractor 定义"提取什么",SubAgent fork 模式负责"怎么跑". ``` MemoryExtractor 接口: Name() string ShouldExtract(turnCount, lastExtractTurn) bool BuildPrompt(existingMemories, newMessageCount) string ← 新增 newMessageCount AllowedTools() []string MaxTurns() int │ ▼ Engine.scheduleMemoryExtraction(ctx, messages, turnCount) ├── 单飞:extractInProgress 互斥 ├── 后置补跑:extractPending stash(最新覆盖) └── Engine.runMemoryExtraction(ctx, messages, turnCount) ├── hasMemoryWritesSince:主 agent 已写 → 跳过 ├── newMessageCount = len(messages) - lastExtractMsgIdx ├── BuildPrompt(existingMemories, newMessageCount) └── SpawnSubAgent( HistoryMessages: messages, ← 传入完整对话历史 MemoryDirRestrict: e.mem.Dir(), ← Edit/Write 限制在记忆目录 ) ``` ### DefaultCodeExtractor 内置编程场景提取器: - 每 5 轮对话触发一次(对应一个完整的"提问→探索→实现→验证"子任务周期) - BuildPrompt 注入 newMessageCount:"Analyze the most recent ~N messages"(精准定位) - 并行策略提示词:Turn 1 所有 Read 并行,Turn 2 所有 Write/Edit 并行(节省轮次) - 关注:项目结构,代码规范,技术决策,用户偏好 - 允许工具:Read, Grep, Glob, Edit, Write(不允许 Bash/Agent) - 最大 5 轮 ### 自定义场景扩展 实现 `MemoryExtractor` 接口即可注入不同场景的提取策略: ```go type WarehouseMemoryExtractor struct{} func (e *WarehouseMemoryExtractor) BuildPrompt(existing []*Entry, count int) string { // 关注:SKU规则、货架布局、库存阈值、工作流偏好 return fmt.Sprintf("Analyze the most recent ~%d messages...", count) } ``` ### SubAgent.HistoryMessages + MemoryDirRestrict `SubAgentConfig` 新增两个字段: | 字段 | 用途 | |------|------| | `HistoryMessages []api.RequestMessage` | 预置父对话历史,SubAgent 有内容可分析 | | `MemoryDirRestrict string` | 非空时 Edit/Write 只允许写入此目录 | ## 记忆新鲜度系统(模块 10.4) `pkg/memory/freshness.go` 记忆是点时刻观测值,随时间可能过时.新鲜度系统通过两个维度告知模型: ### FreshnessConfig ```go type FreshnessConfig struct { GlobalThreshold time.Duration // 全局阈值,0 = 总是警告 TypeOverrides map[string]time.Duration // 按类型覆盖 } ``` 典型配置: | 场景 | GlobalThreshold | TypeOverrides | |------|----------------|---------------| | CLI 默认 | 24h | - | | 仓储 | 24h | project=2h | | 医疗 | 0(总是警告) | - | ### 三个输出层 ``` FreshnessText → 自然语言警告("This memory is 3 days old...") FreshnessNote → 包裹,运行时注入到 CheckMemoryRelevance IndexAnnotation → "_(last updated 3 days ago)_" 嵌入 MEMORY.md 行末 ``` ### 数据流 ``` Config.FreshnessConfig (nil = 不启用) │ ├──→ memory.WithFreshness(cfg) → fileStore.freshnessConfig │ └── UpdateIndex: IndexAnnotation() 替换硬编码 30 天 │ └──→ reminderSys.freshnessConfig └── CheckMemoryRelevance: FreshnessNote() 追加到每条相关记忆 ``` ### TruncateIndex 双重截断 MEMORY.md 写入前应用两重限制(防止超大索引压缩 context window): - 200 行上限(对应早期方案的 sliceLines 200) - 25KB 字节上限(新增防线,覆盖长描述场景) - 超出任一限制:截断 + 末尾追加 WARNING(告知模型有内容未显示) ## RelevanceScorer 架构 `pkg/memory/scorer.go` 从包级函数 `Score()` 提升为接口,支持不同场景注入不同评分策略. ``` RelevanceScorer 接口: Name() string Score(query string, header *MemoryHeader) float64 实现: ├── TextScorer 默认文本相似度(Jaccard + token 权重 + 子串匹配) ├── CompositeScorer 多评分器加权组合(支持运行时 Add/Remove) └── ExternalScorer 桥接外部进程(stdin/stdout JSON Lines 协议) CompositeScorer 使用示例: composite := NewCompositeScorer( WeightedScorer{Scorer: &TextScorer{}, Weight: 0.7}, WeightedScorer{Scorer: warehouseScorer, Weight: 0.3}, ) ExternalScorer 跨语言桥接: 请求: {"query": "...", "name": "...", "description": "...", "type": "..."} 响应: {"score": 0.85} 失败时返回 0.0(不阻断主流程) ``` ### SelectRelevant 向后兼容 `SelectRelevant(query, headers, limit, scorer...)` 使用变参实现可选参数.旧代码 `SelectRelevant(q, h, n)` 继续工作,新代码 `SelectRelevant(q, h, n, myScorer)` 注入自定义评分器. ## MemoryType 注册制 `pkg/memory/types_registry.go` 从 4 个硬编码 const 升级为分层注册制,支持跨行业扩展. ``` MemoryTypeRegistry: parent *MemoryTypeRegistry // 上级注册表(只读继承) local map[string]*MemoryTypeInfo // 本级注册 分层继承: org 级(管理层) → team 级(运营) → local 级(加盟仓) sla_rule exception_pattern warehouse_layout 查询: local 优先,沿 parent 链向上冒泡 注册: 只写入本级 local map,不污染上级 ``` ### 多格式提示词 `FormatForPrompt(format)` 根据模型自动选择格式: - Claude 模型 → XML(训练有素) - GPT/Gemini → Markdown - `AutoPromptFormat(modelID)` 自动检测 ### MemoryTypeInfo 结构 每种类型包含:Name, Scope, Description, WhenToSave, HowToUse, BodyStructure, Examples, SortOrder.这些信息注入系统提示词,指导模型何时保存,如何使用. ## Bash AST 解析器架构 `internal/syslib/bash/` 包实现了一个宽容的 Bash 命令 AST 解析器,用于安全分析. ### 架构分层 ``` ┌──────────────────────────────────────────────────────┐ │ 调用方 │ │ pkg/permission/bash_security.go │ │ pkg/tools/builtin/bash_classify.go │ └───────────────┬──────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ extract.go(信息提取层) │ │ ExtractCommands(ast) → []*CommandInfo │ │ ExtractCommandName(cmd) → (command, subcommand) │ │ GetCommandPrefixes(cmd) → []string │ │ IsStaticRedirectTarget(target) → bool │ └───────────────┬──────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ parser.go(语法解析层) │ │ Parse(source) → *Node (AST root) │ │ 处理:管道 | 列表 && || ; & | 简单命令 | 复合命令 │ │ 处理:if/for/while/case | 函数 | 子 shell │ │ 处理:重定向 > >> < 2>&1 | 变量赋值 VAR=val │ │ 处理:引号 "..." '...' $'...' | 命令替换 $() `` │ │ 处理:进程替换 <() >() | 算术展开 $(()) │ │ 处理:变量展开 $VAR ${VAR} | 注释 # │ └───────────────┬──────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ heredoc.go(Heredoc 预处理层) │ │ PreprocessHeredocs(source) → (processed, []info) │ │ 在主解析之前识别并提取 heredoc body │ │ 支持 < >> < 2>&1` | | `NodeHeredoc` | `<(cmd)` | | `NodeArithmeticExpansion` | `$((expr))` | | `NodeVariableExpansion` | `$VAR` 或 `${VAR}` | | `NodeQuotedString` | `"..."` 或 `'...'` 或 `$'...'` | | `NodeComment` | `# ...` | ### 宽容解析 解析器采用宽容模式:无法识别的语法不报错,标记为 `NodeWord` 节点.这保证了即使遇到非标准或超复杂的 Bash 语法,解析也不会失败,安全分析可以在不完整的 AST 上进行. ## 模型角色系统架构 ``` ┌────────────────────────────────────────────────────────────┐ │ Engine (使用角色引用模型) │ │ │ │ runLoop() → registry.GetRole(RoleMain) → 主对话 │ │ compact() → registry.GetRole(RoleFast) → 摘要压缩 │ │ toolSummary() → registry.GetRole(RoleFast) → 结果摘要 │ │ thinking() → registry.GetRole(RoleThinking) → 深度推理 │ │ costCalc() → registry.EstimateCost(modelID) → 成本估算 │ └─────────────────────────┬──────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────┐ │ ModelRegistry(线程安全) │ │ │ │ roles: map[ModelRole]string │ │ (DefaultRoles 为空,引擎不预设角色映射) │ │ 所有角色 fallback 到 Config.Model │ │ │ │ models: map[string]*ModelConfig │ │ ModelConfig = flyto.ModelInfo(类型别名,统一类型) │ │ 定价数据为空(由框架层运行时获取,引擎不硬编码) │ │ │ │ API: SetRole / GetRole / Register / EstimateCost / ... │ └────────────────────────────────────────────────────────────┘ ``` 详见 [model-roles.md](model-roles.md). ## EngineObserver 可观测性架构 `pkg/engine/observer.go` 结构化的事件/指标/调用链流,给监控系统,数据仓库,告警平台消费.与 `internal/logger` 互补:logger 是给开发者看的文本日志,Observer 是给系统消费的结构化数据. ### 接口层次 ``` EventObserver(核心,必须实现) ├── Event(name, data) 离散事件(某件事发生了) └── Error(err, context) 错误事件(含上下文,用于告警) MetricObserver(可选,接口断言检测) └── Metric(name, value, tags) 数值指标 TraceObserver(可选,接口断言检测) ├── SpanStart(name, tags) → spanID └── SpanEnd(spanID, err) ``` 可选接口使用 `type assertion` 检测,未实现的 Observer 不会收到对应调用,零开销. ### 实现层次 ``` EventObserver 接口 │ ├── NoopObserver 空实现(未配置时用,避免 nil 检查散布各处) │ ├── StderrObserver 开发调试用(MinLevel 过滤,mu 加锁) │ MinLevel: "debug" / "info" / "warn" / "error" │ Output: io.Writer(默认 os.Stderr) │ ├── CompositeObserver 多路复合(同时发到 DataDog + 审计日志 + stderr) │ observers: []EventObserver │ Metric/Span 只转发到实现了对应接口的子 observer │ └── BufferedObserver 异步缓冲(不阻塞热路径) inner: EventObserver(实际发送方) buffer: chan observerEntry(缓冲区) batchSize: 批量大小(默认 100) interval: 刷新间隔(默认 1s) Event 非阻塞发送(缓冲区满丢弃) Error 优先发送(缓冲区满直接同步) Close() 关闭并等待刷新完毕 ``` ### 埋点位置 引擎内关键埋点: | 事件名 | 触发位置 | 用途 | |--------|---------|------| | `tool_result_pairing_repaired` | ToolResultPairingNormalizer | 配对修复发生时记录修复详情 | | `strict_mode_would_fail` | StrictMode.Check | 非严格模式下记录"本应失败"的条件 | | `api_latency_ms` | runLoop API 调用 | API 延迟指标(MetricObserver) | | `token_usage` | 轮次结束 | Token 用量指标(MetricObserver) | #### 压缩模块(pkg/context/compact.go) 通过 `flyto.EventObserver` 契约直接接入,`Compressor.SetObserver()` 注入 (2026-04-16 / L1326 衍生 a 重构: 原 `CompactObserver` 本地接口删除, context 直接消费 flyto 契约). | 事件名 | 触发位置 | 用途 | |--------|---------|------| | `compact_started` | CompactWithStrategy / CompactTiered 开始 | 记录压缩触发时的 token 量和策略 | | `compact_completed` | CompactWithStrategy / CompactTiered 完成 | 压缩效果评估(before/after token,耗时,恢复项数) | | `compact_circuit_breaker_open` | CompactTiered 断路器触发 | 压缩连续失败告警,需要人工介入 | | `compact_chunked` | chunkedCompact 分块压缩 | 记录分块数和并行度,用于 API 成本分析 | | `micro_compact_triggered` | DoMicroCompact | 轻量修剪效果追踪 | #### 记忆模块(pkg/memory/memory.go) 通过 `flyto.EventObserver` 契约直接接入,`WithObserver()` functional option 注入 (2026-04-16 / L1326 重构: 原 `MemoryObserver` 本地接口删除, memory 直接消费 flyto 契约). | 事件名 | 触发位置 | 用途 | |--------|---------|------| | `memory_saved` | Save 成功后 | 记忆增长趋势和磁盘空间监控 | | `memory_deleted` | Delete 成功后 | 安全审计(谁删了什么) | | `memory_search` | FindRelevant 完成后 | 搜索延迟和结果质量分析 | | `memory_index_updated` | UpdateIndex 完成后 | 索引更新频率和条目数追踪 | | `memory_symlink_detected` | checkSymlinkWithObserver | 安全审计(符号链接检测) | #### Hook 模块(pkg/hooks/hooks.go) 通过 `flyto.EventObserver` 契约直接接入,`Manager.SetObserver()` 注入 (2026-04-16 / L1326 衍生 a 重构: 原 `HookObserver` 本地接口删除, hooks 直接消费 flyto 契约). | 事件名 | 触发位置 | 用途 | |--------|---------|------| | `hook_started` | Execute 开始 | Hook 执行性能分析起点 | | `hook_executed` | 单个 Hook 完成 | 每个 hook 的耗时,退出码,JSON 输出 | | `hook_timeout` | 单个 Hook 超时 | 用户脚本挂起告警 | | `hook_async_started` | ExecuteAsync 开始 | 后台 hook 任务追踪 | #### 自进化模块(pkg/evolve/evolve.go) 通过 `flyto.EventObserver` 契约直接接入,`Config.Observer` 构造函数注入 (2026-04-16 / L1326 衍生 a 重构: 原 `EvolveObserver` 本地接口删除, evolve 直接消费 flyto 契约). | 事件名 | 触发位置 | 用途 | |--------|---------|------| | `evolution_proposed` | Propose 提案提交 | 自进化意图审计 | | `evolution_approved` | Propose 审批通过 | 能力变更审计 | | `evolution_rejected` | Propose 审批拒绝 | 进化策略优化分析 | | `tool_created` | apply 工具创建成功 | Agent 能力扩展里程碑 | | `skill_learned` | apply 技能学习成功 | 自进化有效性评估 | | `reflection_saved` | apply 反思记录保存 | Agent 元认知深度指标 | ### Config 集成 ```go // Engine 初始化时: observer := cfg.Observer if observer == nil { observer = &NoopObserver{} // 永不为 nil } ``` ### 子模块 Observer 适配 memory / context / hooks / evolve 四个子模块均已收敛到 `flyto.EventObserver` 契约 (memory: 2026-04-16 L1326; context / hooks / evolve: 2026-04-16 L1326 衍生 a).消费者一个 `flyto.EventObserver` 实例可喂给所有子模块 + `engine` 本身,不再需要鸭子类型隐式实现的隐性 coupling. ```go // engine.go 中的集成示例: compressor.SetObserver(observer) // observer 是 flyto.EventObserver, 契约强制匹配 hooksMgr.SetObserver(observer) // 同上 ``` ## StrictMode 严格模式 `pkg/engine/strict.go` 来自 inc-4977 事故的教训:安全评估时不能让引擎静默修改模型看到的上下文. ### 三维控制 | 字段 | 控制范围 | true 行为 | false 行为 | |------|---------|-----------|-----------| | `ToolResultPairing` | 消息配对异常 | panic | 修复 + Observer 记录 | | `CompactFailure` | 压缩失败 | panic | 降级 + Observer 记录 | | `NormalizerError` | 规范化异常 | panic | 跳过 + Observer 记录 | ### Check 方法 `StrictMode.Check(condition, enabled, observer, detail)` 一行代码同时处理严格模式和可观测性: - `enabled=true` -- panic(中断执行) - `enabled=false` -- Observer.Event("strict_mode_would_fail", ...)(记录但继续) 便捷方法:`CheckToolResultPairing` / `CheckCompactFailure` / `CheckNormalizerError`. ### 使用场景 ``` 安全评估环境 → StrictMode 全开 → 配对异常立即 panic → 评估结果可信 生产环境 → StrictMode nil → 静默修复 + Observer 记录 → 用户无感知 ``` ## ToolResultPairing 配对修复 `pkg/engine/norm_tool_result_pairing.go` 从早期方案 OrphanToolResultRemover 的单一清理升级为 4 种 case 的完整修复.Priority 8,在 OrphanToolResultRemover(10) 之前执行. ### 4 种 case ``` Case 1: tool_use 无 tool_result 原因: 会话中断、压缩截断、异常退出 修复: 注入合成 tool_result(追加到消息末尾作为 user 消息) 合成文案: "[Tool result not available - session may have been interrupted...]" Case 2: tool_result 无 tool_use 原因: 消息历史损坏、压缩丢失 tool_use 修复: 委托给后续 OrphanToolResultRemover(Priority 10)处理 Case 3: 重复 tool_use ID 原因: 压缩合并、消息回放 修复: 去重(保留第一个) Case 4: 重复 tool_result ID 原因: 网络重传、SDK 重试 修复: 去重(保留第一个) ``` ### 执行阶段 ``` 阶段 1: 收集所有 tool_use 和 tool_result 的 ID 阶段 2: 第一遍扫描——去重 tool_use (case 3) 和去重 tool_result (case 4) 阶段 3: 第二遍扫描——注入缺失的 tool_result (case 1),标记孤立 tool_result (case 2) 阶段 4: 通过 Observer 记录修复详情,检查 StrictMode ``` ### 诊断快照 `buildDiagnosticDetail` 只提取消息结构信息(角色,content 类型,ID),不序列化消息内容,可安全发到外部监控. ## 防御性编程的三层防御模式 ``` 模型交互点 │ ┌───────────┼───────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ 指令层 │ │ 参数层 │ │ 兜底层 │ │ │ │ │ │ │ │ 提示词中 │ │ API 参数 │ │ 代码中 │ │ 约束行为 │ │ 阻止行为 │ │ 处理异常 │ └─────────┘ └─────────┘ └─────────┘ ``` 应用位置: | 交互点 | 指令层 | 参数层 | 兜底层 | |--------|--------|--------|--------| | 压缩 API 调用 | "不要调用工具" | 不传 tools 参数 | tool_use 当文本处理 | | 子代理递归 | "不要使用 Agent" | 工具列表移除 Agent | 递归深度检测 | | 工具输入 | description 说明格式 | InputSchema 约束 | json.Unmarshal 错误返回提示 | | 权限超时 | 无 | context.WithTimeout | 默认拒绝 | 详见 [defensive-programming.md](defensive-programming.md). ## KAIROS/Dream 系统的架构位置 AutoDream(记忆巩固)是 KAIROS 系统的核心组件,与自进化系统(Evolve)直接关联: ``` Engine │ ┌────────────┼────────────┐ ▼ ▼ ▼ Memory Evolve Dream (AutoDream) Store System Engine │ │ │ │ │ ├── 触发条件:时间(24h) + 会话数(5次) + 文件锁 │ │ │ │ │ ├── Phase 1: Orient(扫描现状) │ │ ├── Phase 2: Gather(收集新信号) │ │ ├── Phase 3: Consolidate(整合) │ │ └── Phase 4: Prune(清理索引) │ │ │ │ │ ▼ │◄─────────────┼──────── 整理后的记忆 │ │ │ ▼ │ 自进化基于整理后的记忆: │ - Reflect 读取记忆做反思 │ - LearnSkill 基于项目记忆学习技能 │ - CreateTool 基于使用模式创建工具 │ ▼ 记忆文件 (markdown + frontmatter) ~/.flyto/projects//memory/ ``` Dream 与 Evolve 的关系:Dream 负责"整理记忆",Evolve 负责"基于记忆自我改进".Dream 的输出是 Evolve 的输入. 当前状态:Dream 系统已完整实现(模块 16).核心文件: - `pkg/engine/dream.go` - DreamEngine(门槛,锁,fork,PeriodicInterval,SessionProvider) - `pkg/engine/dream_lock.go` - FileLock(flock + mtime-as-state,ReadLockMtime) - `pkg/engine/dream_prompt.go` - BuildConsolidationPrompt(叙事式 4 阶段) - `pkg/engine/dream_task.go` - DreamTaskStore(任务状态 + Turns 滚动窗口) - `pkg/engine/subagent.go` - RunSyncWithCallback(onTurn 回调,ToolUseInfo) **mtime-as-state 设计**:lock 文件的 mtime = lastConsolidatedAt,消除 crash-between-dream-and-state-write 的重复触发 bug.flock(2) 负责互斥,mtime 负责持久状态,两者互补. **SessionProvider 接口**:抽象会话数据源,FileSessionProvider 是 CLI 的 JSONL 实现,SDK/API 可注入自定义实现(数据库,消息队列等). ## Plan Mode 系统(模块 17) UltraPlan 是复杂任务的规划工作流.模型在实际编码前先探索代码库并设计方案,用户审批后再执行. ### 核心组件 ``` EnterPlanModeTool ──→ PlanModeManager ──→ ExitPlanModeTool │ ┌──────┼──────┐ ▼ ▼ ▼ PlanStore perms ApprovalPolicy (File/Memory) (SetMode) (Func/Noop) ``` ### 关键文件 - `pkg/engine/plan.go` - PlanModeManager 状态机 + EnterPlanMode/ExitPlanMode 工具 + ApprovalPolicy 接口 - `pkg/engine/plan_store.go` - PlanStore 接口 + FilePlanStore(word slug)+ MemoryPlanStore ### 安全设计:文件读取防注入 ExitPlanMode 从 PlanStore 读计划内容,而非接受 `plan` 参数.如果允许参数传入,模型可以注入任意内容绕过用户审查.对应早期方案 `ExitPlanModeV2Tool` 的同等设计. ### ApprovalPolicy 接口 抽象审批机制,不同部署方式注入不同实现: - `NoopApprovalPolicy` - 自动批准(测试/bypass 场景) - `FuncApprovalPolicy` - 函数回调(SDK 嵌入最常用) - CLI 实现终端交互,SaaS 实现 WebSocket/webhook 审批 ### prePlanMode 恢复 进入 plan 模式前记录当前权限模式(default/accept_edits/bypass),退出时恢复. 用户拒绝计划时保持 plan 模式--让模型修改后再次提交. ### PlanStep 依赖图(P1) `PlanStep.Deps []string` 引用其他步骤 ID,消费方可做拓扑排序并行调度. 引擎只暴露结构,不做调度("叠加而非替换"原则). ## 异步计划队列(模块 20.3) UltraPlan(模块 17)是交互式规划工作流,要求用户在场审批并等待执行. 对于"超大计划要干很久而发起方要继续干别的"场景,引入异步计划队列. ### 架构 ``` 客户端(CLI/另一个 Agent) │ submit_plan / plan_status / plan_cancel / plan_list │ JSON over UDS: /tmp/flyto-plan-{sessionID}.sock ▼ PlanCommandServer(请求-响应 UDS 服务端) │ dispatch ▼ FilePlanQueue(状态机 + 持久化 + 执行循环) │ PlanExecFunc(注入的执行函数) ▼ Engine.Run()(实际 Agent 执行) │ 状态文件: ~/.flyto/plans/{plan-id}.json ▼ 文件系统(客户端可直接读取 JSON 轮询进度) ``` ### 关键设计 - **状态文件持久化**:每个计划一个 JSON 文件,daemon 崩溃重启后 `RecoverPending` 自动恢复 running→pending(at-least-once 执行语义) - **原子写入**:write-then-rename 防止截断 JSON - **串行执行**:避免并发计划操作同一批文件产生冲突(可扩展为有 file-lock 检查的并行策略) - **依赖反转**:`FilePlanQueue` 不导入 Engine,只持有 `PlanExecFunc` 函数指针,测试可用 mock 替换 - **per-step 状态**:`onStepDone` 回调实时更新步骤状态,客户端轮询可见精确进度 - **TTL 清理**:终态计划文件 24 小时后自动删除,防止 `~/.flyto/plans/` 无限膨胀 ### 与 SubAgent 的区别 | 特性 | SubAgent | PlanQueue | |------|----------|-----------| | 执行模式 | 同步(Orchestrator goroutine 阻塞等待) | 异步(fire-and-forget,立即返回 planID) | | 适用场景 | Orchestrator 需要子任务结果才能继续 | 超大计划,发起方要继续做其他事 | | 跨进程 | 否(同进程内 goroutine) | 是(UDS 跨进程通信) | | 崩溃恢复 | 否 | 是(RecoverPending) | ### 配置 ```go engine.Config{ EnablePlanQueue: true, // 启用 PlanQueueDir: "~/.flyto/plans/", // 可选,默认此路径 PlanQueueSessionID: "my-session", // 可选,默认时间戳 } ``` 启用后 `FLYTO_PLAN_SOCK` 环境变量会被设置为 socket 路径,子进程可通过 `SendPlanCommand` 提交计划. ### 相关文件 - `pkg/engine/plan_queue.go` - FilePlanQueue 状态机 + 持久化 + 执行循环 - `pkg/engine/plan_command_server.go` - PlanCommandServer UDS 请求-响应服务端 + SendPlanCommand 客户端辅助 - `pkg/engine/plan_queue_test.go` - 完整测试(含 RecoverPending,TTL 清理,端到端) ## 跨行业扩展的架构支撑 引擎的架构设计不绑定软件工程场景,以下机制支撑跨行业扩展: ``` ┌─────────────────────────────────────────────────────┐ │ 行业无关的核心引擎 │ │ Engine + runLoop + Events + Session │ └──────────────┬──────────────────────────────────────┘ │ ┌──────────┼──────────┬──────────┬──────────┐ ▼ ▼ ▼ ▼ ▼ Tool 接口 权限规则 技能系统 系统提示 MCP 协议 │ │ │ │ │ │ 任何行业 │ 通用格式 │ 不假设 │ 行业内容 │ 任何行业 │ 的工具 │ prefix/ │ 编程场景 │ 通过 │ 的服务 │ 通过接口 │ glob/ │ │ FLYTO.md │ 通过协议 │ 接入 │ domain │ │ 注入 │ 接入 ▼ ▼ ▼ ▼ ▼ 医疗诊断 金融合规 供应链 法律文档 数据分析 工具集 规则 工作流 提示词 服务 ``` 关键设计: - **工具系统**:`Tool` 接口不假设任何行业.4 个方法(Name/Description/InputSchema/Execute)足以封装任何操作 - **权限规则**:`prefix:` / `glob:` / `domain:` 语法通用,不绑定编程工具 - **技能系统**:SkillDefinition 的 Steps 是文本描述 + 工具调用,不假设编程场景 - **系统提示**:行业相关内容通过 FLYTO.md 注入(`pkg/context/instructions.go`),不硬编码进 `prompts.go` - **MCP 协议**:4 种传输协议(stdio/sse/http/ws),任何行业的服务都能接入 ## 叠加而非替换(Composite 层) 宪法第 8 条:所有可插拔接口支持多实现共存叠加.现实使用永远是交叉的. ### 核心思想 单一策略/处理器的设计假设用户处于单一场景(纯编程或纯仓储),但现实中 Agent 经常跨场景工作.Composite 层让多个实现叠加共存,而不是互相替换. ### CompositePolicy(压缩策略叠加) `pkg/context/composite.go` ```go codePolicy := &DefaultCodePolicy{} // 保留: 文件路径、函数名 warehousePolicy := &WarehousePolicy{} // 保留: 订单号、SKU combined := NewCompositePolicy(codePolicy, warehousePolicy) compressor.SetPolicy(combined) // 压缩时同时保留文件路径和订单号 ``` 叠加规则: | 方法 | 叠加策略 | 原因 | |------|----------|------| | PreserveKeywords | 合并去重 | 不丢任何场景的关键词 | | ScoreMessageImportance | 取最高分 | 宁可多保留不可多丢弃 | | MaxRecentRoundsToKeep | 取最大值 | 保留更多上下文 | | PreprocessForCompaction | 依次应用 | 前一个的裁剪影响后一个 | 支持运行时动态添加/移除策略: ```go composite.Add(&WarehousePolicy{}) // 用户开始仓储任务 composite.Remove("warehouse") // 用户回到纯编程 ``` ### CompositeHandler(权限处理器叠加) `pkg/permission/composite_handler.go` ```go ch := NewCompositeHandler( NamedHandler{Name: "cli", Handler: cliHandler, IsDecisionMaker: true}, NamedHandler{Name: "audit", Handler: auditHandler, IsDecisionMaker: false}, ) engine := permission.NewEngine(permission.ModeDefault, ch.Handle) // 权限请求同时弹框给用户 + 记录审计日志 ``` 关键设计:决策者与观察者分离.`IsDecisionMaker: true` 的处理器参与决策(第一个有效响应胜出),`IsDecisionMaker: false` 的处理器仅观察/审计,所有处理器都会被执行. ### Config 向后兼容 `pkg/engine/engine.go` - `CompactionPolicies []CompactionPolicy` -- 多策略自动叠加为 CompositePolicy - `PermissionHandlers []NamedHandler` -- 多处理器自动叠加为 CompositeHandler - 旧字段 `PermissionHandler` 仍然有效(向后兼容) - 空数组时使用默认值(DefaultCodePolicy / nil Handler) ### 接口合规表 | 接口 | 叠加实现 | Name() | 动态增删 | |------|----------|--------|----------| | CompactionPolicy | CompositePolicy | composite(code+warehouse) | Add / Remove | | permission.Handler | CompositeHandler | N/A (函数类型) | Add / Remove | ## QueryChainTracking 查询链追踪 查询链追踪(模块 7.5),用 3 个字段追踪从用户请求到所有子 agent 的完整调用链. ### 核心结构 ``` QueryChainTracking { ChainId string // 整条链的唯一 ID(所有子 agent 共享) Depth int // 调用深度(0=主查询, 1=子agent, 2=子子agent) ParentAgentId string // 父 agent 的 ID(主查询为空) } ``` ### 工作流 ``` 用户请求 → Engine.runLoop → NewQueryChain() → chain{id="chain_xxx", depth=0} ↓ SpawnSubAgent(cfg.Chain=chain) → chain.Fork(sa.ID) → child{id="chain_xxx", depth=1, parent="subagent_xxx"} ↓ 再次 Fork → grandchild{depth=2} ``` ### Observer 集成 所有 runLoop 中的关键 Observer 事件(`api_call_complete`,`tool_executed`,`session_complete`)自动注入链追踪字段: | 字段 | 说明 | 用途 | |------|------|------| | query_chain_id | 链唯一 ID | BigQuery 按链分组 | | query_depth | 调用深度 | 嵌套深度统计 | | parent_agent_id | 父 agent ID(仅子链) | 构建调用树 | ### 设计决策 - **不修改 Observer 接口**:通过 `observeWithChain` 在调用侧 merge 字段,所有现有 Observer 实现零改动. - **可选追踪**:`chain` 为 nil 时功能不受影响,追踪可渐进式接入. - **链 ID 可读性**:用 `chain__` 格式,比 UUID 更适合日志追踪. ## 设计决策记录 ### 为什么选 Go - **单二进制部署** -- 不需要 Node.js 运行时,`scp` 一个文件即可部署 - **并发原语** -- goroutine + channel 天然适合流式事件和工具并发调度 - **编译时类型安全** -- 事件类型用 `type switch` 分发,比 TypeScript 的 union type 更可靠 - **零外部依赖可行** -- Go 标准库足够强大(HTTP server,JSON,正则,文件系统) - **交叉编译** -- `GOOS=darwin GOARCH=arm64 go build` 即可编译 Mac 版 ### 为什么零外部依赖 - 减少供应链攻击面 - 避免依赖版本冲突 - 编译速度快(<3 秒) - 二进制体积小 Go 标准库覆盖了所有需求: - HTTP 路由:Go 1.22+ `net/http` 支持路径参数 - JSON 处理:`encoding/json` - 正则搜索:`regexp` - 文件操作:`os` + `path/filepath` - 并发控制:`sync` + channel - HTML 解析:手写的轻量级解析器(WebFetch 工具内) - Hash 计算:`crypto/sha256`(文件缓存) ### 可选 CGO 功能:HEIC 图片解码 HEIC/HEIF/AVIF 是 iPhone 默认相机格式,仓储,现场巡检等场景需要支持. 引擎通过 build tag 实现**按需 opt-in**,不影响默认构建: | 构建方式 | HEIC 支持 | 系统依赖 | |---------|----------|---------| | `go build ./...`(默认) | 降级(原样透传) | 无 | | `go build -tags libheif ./...` | 完整转换为 JPEG | `apt install libheif-dev libde265-dev` | | `CGO_ENABLED=0 go build ./...` | 降级(原样透传) | 无 | **SDK 消费方**(如 flysafe)按场景选择: - 运维场景:默认构建即可 - 仓储场景:`-tags libheif` + 服务器安装 libheif-dev ### 为什么自建 Bash AST 解析器 - 字符串分割无法正确处理引号内的空格,heredoc 中的命令,`$(())` 中的 `<<` - 安全分析需要理解命令的结构(哪些是命令名,哪些是参数,哪些是重定向) - 不追求完整的 Bash 兼容性,宽容解析够用即可 - 零外部依赖约束排除了使用第三方解析库 ### 为什么用 channel 而不是 callback - `for range ch` 比嵌套回调更易读 - 天然支持背压(channel 满了会自动阻塞生产者) - 消费层可以用 `select` 同时处理多个事件源 - 关闭 channel 自动通知消费层事件流结束 ### 为什么工具编排用分批而不是依赖图 - 模型 API 返回的工具调用是有序列表,没有显式的依赖关系 - 分批算法简单可靠:连续的安全工具并行,不安全的串行 - 实测中模型很少一次返回复杂的工具依赖链 ### 为什么权限系统独立于工具 - 工具只关心"怎么做",权限引擎关心"能不能做" - 同一个工具在不同模式下有不同的权限策略 - 权限规则可以跨工具统一管理(用户级 + 项目级 + 会话级) - 方便单元测试(权限逻辑不依赖工具实现) ### 为什么自进化需要人类审批 - 安全边界:Agent 不能在没有人类审批的情况下改造自己 - 可追溯性:每个进化提案都持久化,可以回溯和撤销 - 渐进信任:先从只读进化开始(技能学习),逐步放宽到工具创建 ### 为什么使用角色系统而不是直接传模型 ID - 业务逻辑与模型选择解耦:更换模型只需修改角色映射 - 成本优化:不同用途使用不同成本的模型(摘要用 Haiku,对话用 Sonnet) - 一处修改全局生效:`registry.SetRole(RoleFast, "new-haiku")` 即可全局切换 ## ToolCapability 安全协议 Agent Tool Safety Protocol -- 文件/数据库/API 三种操作统一为同一套能力声明. ### 三个安全等级 | Level | 能力 | 示例 | Agent 行为 | |-------|------|------|-----------| | 0 | 无安全能力 | Bash | 调用前警告 | | 1 | Reversible(可回滚) | API 调用 | Saga 补偿模式 | | 2 | DryRun + Reversible | FileEdit, FileWrite | 先预览再执行,最安全 | ### 可选接口设计 ``` Tool(必须实现) ├── MetadataProvider(可选) → 声明"工具是什么" ├── CapabilityProvider(可选)→ 声明"工具能做什么安全措施" ├── DryRunnable(可选) → 模拟执行 └── Reversible(可选) → 生成撤销信息 ``` 关键决策:全部使用可选接口(type assertion 检测),不修改 Tool 接口.现有的 10+ 个工具无需任何改动. ### DryRun 工作流 ``` Agent 想修改文件 → 调用 FileEdit.DryRun() 获取 diff 预览 → 分析 diff,确认修改合理 → 调用 FileEdit.Execute() 真正执行 → 自动生成 UndoInfo 存入 OperationLog ``` ## FileHistory 文件历史系统 内容寻址备份 + 按消息回滚. ### 内容寻址存储 ``` 文件内容 → SHA256 → 前 16 字符作为备份文件名 相同内容不重复存储(编辑 10 次但 3 次内容相同 = 只存 7 份) ``` 存储路径:`~/.flyto/history//` ### 备份时机 - `FileEdit.Execute()` 开头自动调用 `FileHistory.BeforeEdit()` - `FileWrite.Execute()` 开头自动调用 `FileHistory.BeforeWrite()` - 同一消息内多次编辑同一文件,只保留第一次修改前的状态 ### 回滚机制 ``` Engine.Rollback(messageID) → FileHistory.Rollback(messageID) // 物理层:恢复文件内容 → OperationLog.RollbackMessage() // 逻辑层:倒序执行补偿操作 ``` - 已有文件被修改:恢复到修改前的备份内容 - 新建的文件:回滚时删除 - maxSnapshots = 100(超过时淘汰最旧的快照) ## OperationLog 统一操作日志 记录所有工具操作,支持按消息 ID 回滚.统一文件/数据库/API 三种操作的日志格式. ### Saga 补偿模式 回滚时倒序执行所有补偿操作(和分布式事务的 Saga 模式一样): ``` 操作顺序:op-1 → op-2 → op-3 回滚顺序:undo-3 → undo-2 → undo-1 ``` - 如果某个补偿失败,继续执行后续补偿(best-effort) - 不可逆操作(Irreversible=true)跳过,记录 ManualGuide 人工处理指南 - 无 UndoInfo 的操作(只读工具)自动跳过 ### 与 FileHistory 的协同 | 组件 | 职责 | 层面 | |------|------|------| | FileHistory | 文件内容的物理备份 | 磁盘层面 | | OperationLog | 所有操作的逻辑记录 | 编排层面 | 文件回滚时两者协同:OperationLog 提供顺序,FileHistory 提供内容. ## 安全审计体系(INF-5) `pkg/security/` + `pkg/engine/audit_*.go` ### 核心设计:翻转默认值 早期方案只保护 TeamMem 同步路径("显式开启").我们翻转:**默认全路径扫描,显式豁免收窄**. ``` FileWriteTool.Execute() │ ├─ SecretGuard.Scan(path, content) │ ├─ 豁免路径?→ 放行 │ ├─ 超过 MaxScanBytes?→ ErrContentTooLarge(放行) │ └─ 命中规则?→ return error Result(拦截) │ └─ 正常写入流程 ``` ### 组件关系 ``` pkg/security/ ├── SecretGuard(接口) │ └── DefaultSecretGuard(45条规则,sync.Once惰性编译) ├── AuditSink(接口) │ ├── NoopAuditSink │ └── CompositeAuditSink(叠加原则) └── AuditEntry(跨行业可扩展,Extra map[string]string) pkg/engine/ ├── LocalAuditSink(JSONL追加,~/.flyto/audit.jsonl) └── AuditObserver(EventObserver实现,桥接事件→审计记录) 监听:operation_recorded / secret_scan_blocked ``` ### 导入方向(无循环) ``` pkg/security(零依赖) ↑ pkg/tools/builtin(注入 SecretGuard) ↑ pkg/engine(默认注入 DefaultSecretGuard、LocalAuditSink、AuditObserver) ``` ### 引擎默认行为 `New(cfg)` 时若 `cfg.SecretGuard == nil`,自动注入 `DefaultSecretGuard`. nil 表示"忘了配置",`NoopSecretGuard{}` 表示"明确关闭".两者含义不同,不可混淆. ```go // 默认保护(不设置 SecretGuard 即受保护) engine.New(&engine.Config{...}) // 明确关闭(必须显式传入) engine.New(&engine.Config{ SecretGuard: security.NoopSecretGuard{}, }) // 自定义规则 engine.New(&engine.Config{ SecretGuard: security.NewSecretGuardWithRules( append(security.BuiltinRules(), internalRules...), ), }) ``` ### 秘密规则 45 条来自 gitleaks(MIT 协议)的高置信度规则,覆盖:云服务商(AWS/GCP/Azure),AI API(Anthropic/OpenAI),版本控制(GitHub/GitLab),通信(Slack/Twilio),支付(Stripe/Shopify),私钥(PEM 格式)等. 只收录有独特前缀的规则,排除"关键词上下文"类规则(误报率过高). ### 跨行业扩展示例 ```go // 仓储场景:库存变更审计 entry := security.AuditEntry{ ToolName: "Write", Operation: "write", Resource: "/warehouse/inventory.csv", Outcome: "allowed", Extra: map[string]string{ "sku": "SKU-001", "qty_delta": "-20", }, } ``` ## 数据安全(文件/DB/API 三维度) > 详细设计见 [data-safety.md](data-safety.md),本节只列核心决策和与引擎的接入点. ### 核心原则 **AI 负责决策,业务系统 API 负责执行.AI 不直接写业务数据库.** ### 三个操作维度 | 维度 | 引擎已有支持 | 数据安全文档补充 | |------|------------|----------------| | 文件操作 | FileHistory 原子写 + OperationLog Saga 补偿 | - | | 数据库操作 | AuditSink 接口 + ToolCapability.DryRunnable | Dry-run 流程,写操作分级,影子表 | | API 调用 | Reversible 接口 + Saga 补偿 | Dry-run 验证后执行 | ### 写操作风险分级 | 级别 | 操作 | 处理 | |------|------|------| | L1 追加写 | 审计日志,状态推进 | 直接提交,异步审计 | | L2 修改已有记录 | 订单状态,数量调整 | 业务不变量检查后提交 | | L3 高风险写 | 金额,库存,关键配置 | 必须人工审批 | | L4 不可逆删除 | 物理删除,DDL 变更 | 默认禁止,需 override | ### Dry-run 设计要点 ``` 业务 API dry_run=true → BEGIN TX → 执行 SQL → 捕获 diff → ROLLBACK → diff 返回给调用方 → ML 验证(事务外,无锁,毫秒后) → 通过 → 短事务 + CAS 乐观锁正式写入 → 不通过 → Agent 重新规划 ``` ML 验证在事务外执行是关键设计:事务只持有毫秒级,验证耗时不占锁. ### 四层防御 1. 输出约束(SQL 白名单 + 业务规则预校验) 2. Dry-run + ML 验证 3. 熔断器(连续失败自动暂停 AI 写权限) 4. 人工 kill switch(`permission.Handler`) ### 引擎接口对接点 - `tools.DryRunnable` / `tools.Reversible` / `tools.CapabilityProvider`(`pkg/tools/tool.go`) - `security.AuditSink` / `security.AuditEntry`(`pkg/security/audit.go`) - `permission.Handler`(`pkg/permission/permission.go`) - `engine.Config.AuditSink`(`pkg/engine/config.go`) 消费层实现:SQL 解析器,ML 验证器,熔断器,影子表管理. --- ## TokenBudgetManager 预算管理 Token 预算精细度管理(模块 7.4),集中管理上下文窗口的 token 预算计算. ### 混合估算法 不纯估也不纯用 API 返回值: ``` 找到最后一个有 Usage 的 assistant 消息(精确锚点) | + 锚点之后的消息用 tokenizer 粗估 | = 总量(锚点精确值 + 新增粗估) ``` 误差范围被限制在"最近几次工具调用"的粗估量,而非整个上下文. ### Sibling 回溯 并行工具调用时消息被拆成多条 assistant 交替排列: ``` assistant(id=A) -> user(result) -> assistant(id=A) -> user(result) ``` 通过 `api_response_id` 元数据追踪,回溯到同一批次的第一个 assistant. ### 三种 token 计算 | 函数 | 公式 | 用途 | |------|------|------| | GetTokenCountFromUsage | input + cache_creation + cache_read + output | 压缩阈值检查 | | GetBillingTokens | 按模型实际价格计算 | 成本报告 | | GetFinalContextTokens | input + output(不含 cache) | 服务端预算对齐 | ### 有效窗口多层计算 ``` 模型窗口 (200K) - 摘要输出预留 (min(maxOutput, 20K)) = 有效窗口 (180K~184K) - Thinking 预算 (可选) = Thinking 后有效窗口 - AutoCompactBuffer (13K) = 自动压缩阈值 - WarningBuffer (20K) = 黄色警告阈值 - ErrorBuffer (7K) = 红色警告阈值 ``` ### 模型切换检测 切换模型时立刻检查当前用量是否超过新模型的阈值,通过 Observer 事件通知消费层. ## API 错误分类系统 模块 8.1,结构化 API 错误分类.核心思路:一次分类,多处消费. ### 架构 ``` HTTP 响应 → ErrorClassifier.Classify() → *APIError(结构化) ├─ Category(20种枚举) ├─ RetryInfo(重试建议) ├─ TokenGap(压缩跳步量) ├─ Hint(用户可操作提示) └─ Headers/Body(原始信息) ``` ### ErrorCategory 枚举(20种) | 分类 | 分析标签 | 触发条件 | |------|---------|---------| | ErrAborted | aborted | 请求取消 | | ErrTimeout | api_timeout | 超时/408 | | ErrRateLimit | rate_limit | 429 | | ErrOverloaded | server_overload | 529/overloaded_error | | ErrPromptTooLong | prompt_too_long | prompt 超长 | | ErrMediaTooLarge | media_too_large | 图片/PDF 超限 | | ErrRequestTooLarge | request_too_large | 413 | | ErrAuthentication | auth_error | 401/403 | | ErrModelNotFound | model_not_found | 404 | | ErrBilling | billing_error | 余额不足 | | ErrServerError | server_error | 5xx | | ErrConnection | connection_error | 网络错误 | | ErrSSL | ssl_cert_error | SSL/TLS 证书 | | ErrToolMismatch | tool_use_mismatch | tool_use 无 tool_result | | ErrInvalidModel | invalid_model | 模型名无效 | ### ErrorClassifier 接口 ```go type ErrorClassifier interface { Classify(statusCode int, headers http.Header, body []byte, cause error) *APIError } ``` 三个实现: - **DefaultClassifier** - 纯 HTTP 状态码,不解析供应商 header - **AnthropicClassifier** - 增强 Anthropic header(ratelimit/retry/reset)+ 诊断提示 - **CompositeClassifier** - 叠加多个分类器,第一个非 Unknown 胜出 ### DiagnosticHinter 接口 为 SSL/连接错误提供用户可操作的诊断提示: - 19 个 SSL 错误码 → 中文可操作提示 - DNS/超时/拒绝/重置 → 具体检查建议 - CompositeHinter 支持叠加 ## 重试策略系统 模块 8.2,可组合的重试决策框架. ### 架构 ``` APIError → RetryPolicy.ShouldRetry() → RetryDecision ├─ ForegroundOnly (后台 529 不重试) ├─ ServerDirective (服务端 x-should-retry) ├─ SubscriptionAwareRetry (Anthropic: 订阅 429 不重试) ├─ FastModeCooldown (Anthropic: 快速模式降级) ├─ ConsecutiveLimit (连续 N 次放弃) ├─ ModelFallback (Anthropic: 过载→模型降级) └─ ExponentialBackoff (兜底退避) ``` ### RetryPolicy 接口 ```go type RetryPolicy interface { ShouldRetry(err *APIError, attempt int, ctx *RetryContext) *RetryDecision } ``` 返回 nil = "不表态",非 nil = 明确决策.CompositeRetryPolicy 按顺序评估,第一个非 nil 胜出. ### Retryer 执行器 ```go retryer := &retry.Retryer{ Policy: retry.NewAnthropicRetryPolicy(opts), OverflowHandler: retry.DefaultOverflowHandler(), OnRetry: func(err, attempt, delay, reason) { observer.OnEvent(...) }, } err := retryer.Do(ctx, rctx, func(attempt int, rctx *RetryContext) error { return client.CreateMessageStream(...) }) ``` ### 通用策略 vs Anthropic 子集 | 策略 | 类型 | 说明 | |------|------|------| | ExponentialBackoff | 跨行业 | 500ms*2^n + 25%抖动,上限 32s | | ForegroundOnly | 跨行业 | 后台过载直接失败,防级联放大 | | ConsecutiveLimit | 跨行业 | 连续 N 次同类错误放弃 | | ServerDirective | 跨行业 | 尊重 x-should-retry | | ContextOverflowHandler | 跨行业 | max_tokens 溢出自动修正 | | SubscriptionAwareRetry | Anthropic | 订阅用户 429 不重试(窗口级) | | FastModeCooldown | Anthropic | 快速模式 429/529 降级 | | ModelFallback | Anthropic | 连续过载 → 模型降级信号 | ## API 预连接 模块 8.3,TCP+TLS 握手预热 + HTTP Transport 优化. ### 连接预热 ```go transport := api.NewTransport(nil) // 默认配置:MaxIdlePerHost=2, HTTP/2 client := api.NewClient(key, url, api.WithTransport(transport)) preconn := api.NewPreconnector(url, transport) // 共享同一个 Transport preconn.Warmup(ctx) // fire-and-forget HEAD,100-400ms 握手与初始化并行 ``` ### DNS 预解析 ```go cache := api.NewDNSCache(5 * time.Minute) cache.Prefetch(ctx, "api.anthropic.com") // fire-and-forget ``` ## SSE 流守卫 模块 8.4,SSE 流边界情况检测 + 空闲看门狗. ### 架构 ``` HTTP Body → parseSSE(裸解析)→ rawCh → StreamGuard.Watch() → guardedCh → 消费者 ├─ 状态追踪(message_start/stop_reason/block_count) ├─ 空闲看门狗(90s 无事件 → 中止流) ├─ 停顿检测(>30s 间隔 → OnStall 回调) ├─ 空响应检测(无 message_start → EventError) └─ 部分流检测(无 stop_reason → EventError) ``` ### 检测的边界情况 | 情况 | 检测方式 | 行为 | |------|---------|------| | 空响应(代理返回 HTML) | `!HasMessageStart` | 追加 EventError | | 部分流(网络中断) | `HasMessageStart && !HasStopReason` | 追加 EventError | | 空闲挂起(静默断开) | 看门狗计时器 90s | OnIdleTimeout + 追加 EventError | | 停顿(网络抖动) | 事件间隔 > 30s | OnStall 回调(纯诊断) | | Scanner 错误 | `scanner.Err()` | 追加 EventError | | 合法空响应 | 有 stop_reason | 不误报 | ### 配置 ```go client := api.NewClient(key, url, api.WithStreamGuard(&api.StreamGuardConfig{ IdleTimeout: 90 * time.Second, StallThreshold: 30 * time.Second, OnIdleTimeout: func() { observer.OnEvent("stream_idle_timeout", nil) }, OnStall: func(gap, count, total) { observer.OnEvent("stream_stall", ...) }, OnStreamEnd: func(stats) { log.Info("stream stats", stats) }, })) ``` ## 版本兼容基础设施(INF-6) `pkg/engine/migrate.go` + `pkg/memory/migrate.go` ### 设计原则 **当前没有历史债**(无 v1/v2 格式差异),INF-6 的价值是: 1. **防患于未然**--为未来 breaking change 建立规范路径 2. **防降级数据损坏**--旧引擎读到新格式文件时明确报错,而非静默损坏 ### 两层版本 | 字段 | 类型 | 用途 | |------|------|------| | `FormatVersion int` | int 单调递增 | 格式迁移:`LoadTranscript` 决定是否迁移 | | `EngineVersion string` | 应用版本号 | 仅 audit,不参与迁移逻辑 | 早期方案 `SerializedMessage.version` 只存应用版本(如 "1.2.3"),两层混用无法区分"格式变了"和"版本升了". ### MaxSupportedVersion 保护 ``` LoadTranscript / ScanMemoryDir: if version > maxSupportedVersion → 明确报错 / 跳过文件 if version == 0 (旧文件缺字段) → 规范化为 1 if version < current → 运行迁移函数(当前空表,no-op) ``` ### 注册式迁移骨架 ```go // 当前为空——无历史债 var transcriptMigrations = map[int]MigrateFunc{} // 未来 v1→v2 迁移示例(在 migrate.go 中注册): func init() { transcriptMigrations[1] = func(t *Transcript) error { // 就地修改字段,FormatVersion 由 migrateTranscript() 统一递增 return nil } } ``` Memory frontmatter 的 `migrate.go` 结构完全对称. ### 文件格式变化 ``` 旧 Transcript(无 format_version): {"session_id": "x", "version": ..., ...} ← "version" 已弃用(只有旧文件有) 新 Transcript(INF-6 后): {"format_version": 1, "engine_version": "1.0.0", "session_id": "x", ...} 记忆文件 frontmatter(INF-6 后): --- name: xxx description: xxx type: user version: 1 ← 新增,旧文件无此行(读取时规范化为 1) --- ``` --- ## 多模型适配系统(PromptBundle,模块 15) ### 整体架构 ``` Config.Scenario + Config.ModelFamily ↓ resolveBundleKey() ← 引擎每次 Run 时调用 ↓ BundleRegistry.Resolve(key) ← 精确匹配,回退到 DefaultBundleKey ↓ PromptBundle ├── StaticSections() → SectionRegistry 缓存 → ephemeral cache block └── DynamicSections() → SectionRegistry 缓存 → ephemeral / no-cache block ↓ Builder.BuildSystemPromptBlocks(ctx) ↓ []SystemPromptBlock → buildAPIRequest → Anthropic API ``` ### BundleRegistry 查找策略 1. **精确匹配**:`BundleRegistry.Resolve({ModelFamily, Scenario})` 2. **回退默认**:未找到时使用 `DefaultBundleKey`(`{claude, programming}`) 3. **安全保证**:永远不返回 nil(但 registry 为空时返回 nil,需检查) ### 内置 Bundle | Bundle | Key | 静态段落 | 动态段落 | |--------|-----|---------|---------| | `DefaultBundle` | `{claude, programming}` | 9 个英文段落 | 6 个(env/tools/instructions 等) | | `ChineseBundle` | `{qwen, programming}` 等 5 个 | 9 个中文段落 | 继承 DefaultBundle | ### BundleOverlay 组合关系 ``` DefaultBundle (base) ↑ 继承 BundleOverlay ├── overrides["intro"] = 自定义 Section ├── overrides["env_info"] = 自定义 Section └── 其他 section → 透传到 base.StaticSections() / base.DynamicSections() ``` 覆盖逻辑(`applyOverrides`): - 遍历 base 的 section 列表,按 Name 查找 overrides 表 - 命中 → 替换为 override Section - 未命中 → 透传 base Section(零分配路径:无覆盖时直接返回 base slice) - override 中存在但 base 中不存在的 Name → 静默忽略(防止拼写错误引入幽灵 section) ### per-request 切换(WithBundleKey RunOption) ``` Run(ctx, prompt, WithBundleKey(key)) ↓ runConfig.bundleKey = &key ↓ buildSystemPromptWithContext(ctx, model, &key) ↓ overrideBundleKey != nil builder.SetBundleKey(*key) ← 跳过 resolveBundleKey() ``` 同一个 Engine 实例可以在不同请求间使用不同 Bundle, 无需为每个场景创建独立 Engine(节省资源,适合 SaaS 多场景部署). ### Section 缓存生命周期 ``` SectionRegistry(会话级) ├── 首次 Compute → 计算 + 写入缓存 ├── 命中缓存 → 直接返回(零 IO) ├── CacheBreak=true → 每轮强制重算,绕过缓存 └── Reset() → 在 /clear 或 /compact 后清空,让下轮重新渲染 ``` 注意:BundleOverlay 的 override Section 与 base Section 共享同一个 SectionRegistry, 同名 section 的缓存条目由最后写入者决定(先英文后中文 → 中文覆盖). 多语言 Bundle 并存于同一 Engine 时,建议使用带前缀的 Name(如 `"zh.intro"`). --- ## 权限动态注册(P0修复) ### 架构 ``` tools.Metadata.PermissionClass = "file" ↓ tools.Registry.Register(tool) ↓ permission.RegisterToolClass(toolName, permClass) ↓ toolClassRegistry map[string]PermissionClass ↓ permission.CheckToolPermission(toolName, ...) ├── lookupToolClass(toolName) → 动态注册表查找(优先) └── 未注册 → 旧版硬编码 switch(向后兼容) ``` ### PermissionClass 常量 | 常量 | 值 | 安全行为 | |------|-----|---------| | `PermClassBash` | `"bash"` | AST 解析 + 风险评估 + 用户确认 | | `PermClassFile` | `"file"` | 路径安全检查 + 沙箱验证 | | `PermClassWebFetch` | `"webfetch"` | URL 安全检查 | | `PermClassReadOnly` | `"readonly"` | 直接通过,无需用户确认 | | `PermClassGeneric` | `"generic"` | 要求用户批准 | 空字符串(未声明)→ 回退到旧版按工具名的硬编码逻辑(向后兼容). --- ## 压缩器依赖注入(P0修复) ### 问题根源 旧版 `compact.go` 用两个包级全局变量持有回调: ```go var contextWindowProvider func(string) int // ❌ 全局变量 var compactModelProvider func() string // ❌ 全局变量 ``` 多个 Engine 实例(多租户)共享这两个变量,最后注册的 Engine 会覆盖前者的配置. ### 修复方案 将回调迁移到 `Compressor` 实例字段,Engine 创建后注入: ```go Compressor struct { contextWindowFn ContextWindowFunc // 实例级,互不干扰 compactModelFn CompactModelFunc // 实例级,互不干扰 httpClient CompactHTTPClient // 实例级,独立 HTTP 客户端 } ``` `effectiveXxx()` helpers 实现两级回退: 1. 实例字段非 nil → 使用实例配置 2. 实例字段为 nil → 使用全局默认(向后兼容单 Engine 场景)