// Package engine - token_budget.go 管理上下文窗口的 token 预算. // // 模块 7.4: Token 预算精细度. // // 三个核心问题: // 1. 当前上下文占了多少 token?(混合估算法) // 2. 还剩多少空间?(有效窗口多层计算) // 3. 什么时候该触发压缩/警告?(阈值体系) // // 精妙之处(CLEVER): 混合估算法--不纯估也不纯用 API 返回值. // 找到最后一个有 usage 的 assistant 消息(API 返回的精确值)作为"锚点", // 锚点之后的消息(用户输入,工具结果)用粗估(tokenizer.EstimateTokens)补充. // 总量 = 锚点精确值 + 新增粗估. // 这样永远不会大幅偏离真实值--粗估最多偏差几次工具调用的量. // // 替代方案:纯粗估(当前实现,4chars/token,误差可达 30%). // // 维护说明(MAINTENANCE, 2026-04-14): 本文件 535 行曾被 TODO L1225 列为 // "6 大功能合体需拆分 token_estimator + token_window + token_warnings". // 2026-04-14 audit 后否决,理由记录在此避免下次又被提起: // 1. 6 个功能组共享两个依赖 (modelRegistry + observer),它们不是 6 个抽象, // 是同一个抽象 (TokenBudgetManager = "token 预算知识的持有者") 的 6 个入口. // 2. 扩展/修改成本和文件行数无关 (IDE 跳转 O(1)),真正成本是跨抽象边界跳转. // 拆成 3 个文件反而引入 "新方法放哪个文件" 的决策负担 + 跨文件跳转. // 3. warnings 组 (CalculateWarningState) 严重依赖 window 组 // (AutoCompactThreshold/EffectiveContextWindow),硬拆等于人为切断内部联系. // 4. "535 行太长" 是美学焦虑不是工程痛点.等到文件真长到 1000+ 行或出现 // 明显不属于本 struct 的第 7 组功能时,结构自然涌现的重构点才该动手. // // 真正的抽象债应沿 "同一处修改跨多个概念" 或 "新增场景绕开现有结构" 识别, // 而非 "文件行数" 这个表层代理指标. package engine import ( "git.flytoex.net/yuanwei/flyto-agent/internal/tokenizer" "git.flytoex.net/yuanwei/flyto-agent/pkg/config" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" ) // Token 预算常量. const ( // AutoCompactBufferTokens 自动压缩前的缓冲空间(token 数). // 压缩阈值 = 有效窗口 - 此缓冲.留出空间给新的用户消息和模型回复. AutoCompactBufferTokens = 13_000 // WarningThresholdBuffer 黄色警告缓冲(token 数). // 当剩余空间 < 此值时触发黄色警告(提示用户上下文快满了). WarningThresholdBuffer = 20_000 // ErrorThresholdBuffer 红色警告缓冲(token 数). // 当剩余空间 < 此值时触发红色警告(紧急提醒,可能即将溢出). // 历史包袱(LEGACY): 与 WarningThresholdBuffer 相同值看起来没意义, // 但红色警告阈值是在有效窗口扣除 WarningThresholdBuffer 之后再减去此值, // 形成两层递进:黄色(80%)→红色(90%)→阻塞(100%). ErrorThresholdBuffer = 20_000 // ManualCompactBuffer 手动压缩的缓冲(token 数). // 用户手动触发 /compact 时,使用更激进的阈值. ManualCompactBuffer = 3_000 // MaxOutputTokensForSummary 压缩摘要的输出预留(token 数). // 压缩时需要为摘要输出预留空间,从有效窗口中扣除. MaxOutputTokensForSummary = 20_000 // MinEffectiveWindowFloor 有效窗口/阈值的最小下限(token 数). // 即使模型窗口很小或扣减项很多,也保证至少 10K 可用空间,避免阈值坍缩为 0 或负数. // 集中出现在 AutoCompactThreshold,ManualCompactThreshold, // EffectiveContextWindowWithThinking,AutoCompactThresholdWithThinking 四处. MinEffectiveWindowFloor = 10_000 // DefaultMaxOutputTokens 当模型配置缺失 MaxOutputTokens 时的默认输出上限. // 16K 是 Claude 3.5 一代的默认值,对未知模型保守兼容. DefaultMaxOutputTokens = 16_384 // tokenCostDivisor 计费单价的分母(每 1M token 定价). // ModelConfig 的价格字段都是 "Per1M",计算时需要除以这个常量归一化. tokenCostDivisor = 1_000_000 // percentFull 百分比满值(用于 PercentUsed/PercentLeft 的 100% 上限). percentFull = 100 ) // TokenBudgetManager 管理上下文窗口的 token 预算. // // 职责: // - 混合估算当前上下文占用量(精确锚点 + 粗估增量) // - 计算有效上下文窗口(扣除各种预留) // - 计算压缩阈值和警告阈值 // - 模型切换时检测溢出 // - 三种 token 计算函数(总量/计费/预算) type TokenBudgetManager struct { modelRegistry *config.ModelRegistry observer EventObserver } // NewTokenBudgetManager 创建 token 预算管理器. func NewTokenBudgetManager(registry *config.ModelRegistry, observer EventObserver) *TokenBudgetManager { if observer == nil { observer = &NoopObserver{} } return &TokenBudgetManager{ modelRegistry: registry, observer: observer, } } // ============================================================ // 1. 混合估算法 // ============================================================ // EstimateCurrentUsage 估算当前上下文窗口占用的 token 数. // // 精妙之处(CLEVER): 混合估算--最后一个 API 响应的精确 usage + 之后新增消息的粗估. // 实现逻辑: // 1. 从后往前找最后一个有 Usage 的 assistant 消息 // 2. 应用 sibling 回溯(并行工具调用的消息共享同一 api_response_id) // 3. 用 Usage 的 input_tokens + cache_creation + cache_read + output_tokens 作为锚点 // 4. 锚点之后的消息用 tokenizer.EstimateTokens 粗估 // 5. 总量 = 锚点 + 粗估 // // 替代方案:纯粗估(整个消息列表用 tokenizer 估算,误差可达 30%). func (m *TokenBudgetManager) EstimateCurrentUsage(messages []query.Message) int { if len(messages) == 0 { return 0 } // 从后往前找最后一个有 Usage 的 assistant 消息 anchorIdx := -1 var anchorUsage *query.Usage for i := len(messages) - 1; i >= 0; i-- { msg := &messages[i] if msg.Role != query.RoleAssistant { continue } usage := extractUsageFromMetadata(msg) if usage == nil { continue } anchorIdx = i anchorUsage = usage break } // 没有锚点:全部粗估 if anchorIdx < 0 { return m.estimateAllMessages(messages) } // 精妙之处(CLEVER): 并行工具调用时消息被拆成多条 assistant: // assistant(id=A), user(result), assistant(id=A), user(result) // 如果从最后一条 assistant 开始估算,会漏掉前面的 tool_result. // 往前回溯到第一个同 api_response_id 的 assistant,确保所有 interleaved // tool_result 都被包含在粗估范围内. anchorIdx = m.siblingRewind(messages, anchorIdx) // 锚点精确值 anchorTokens := GetTokenCountFromUsage(anchorUsage) // 锚点之后的消息粗估 afterAnchorTokens := 0 if anchorIdx+1 < len(messages) { afterAnchorTokens = m.estimateAllMessages(messages[anchorIdx+1:]) } total := anchorTokens + afterAnchorTokens // 埋点说明:混合估算完成时记录锚点位置和各部分贡献, // 用于验证估算策略的准确度--如果 afterAnchorTokens 占比过大说明锚点太旧. m.observer.Event("token_budget_estimated", map[string]any{ "total": total, "anchor_tokens": anchorTokens, "after_anchor_tokens": afterAnchorTokens, "anchor_index": anchorIdx, "total_messages": len(messages), }) return total } // ============================================================ // 2. Sibling 回溯(并行工具调用) // ============================================================ // siblingRewind 从给定的 assistant 消息位置往前回溯, // 找到第一个具有相同 api_response_id 的 assistant 消息. // // 精妙之处(CLEVER): 并行工具调用时 API 一次返回多个 tool_use, // 但消息列表中它们被拆成多条 assistant + user(result) 交替排列. // 这些消息共享同一个 api_response_id.锚点应该是这组消息中最早的那个, // 这样锚点的 usage 值覆盖了所有这些消息的 input/output. func (m *TokenBudgetManager) siblingRewind(messages []query.Message, startIdx int) int { if startIdx <= 0 { return startIdx } responseID := getAPIResponseID(&messages[startIdx]) if responseID == "" { return startIdx } earliest := startIdx for i := startIdx - 1; i >= 0; i-- { msg := &messages[i] if msg.Role != query.RoleAssistant { continue } rid := getAPIResponseID(msg) if rid == responseID { earliest = i } else { // 遇到不同 response ID 的 assistant 消息,停止回溯 break } } return earliest } // ============================================================ // 3. 三种 token 计算函数 // ============================================================ // GetTokenCountFromUsage 计算完整上下文窗口 token(含 cache). // 用于:自动压缩阈值检查. // = input_tokens + cache_creation + cache_read + output_tokens // // 历史包袱(LEGACY): cache_creation 和 cache_read 都计入 input 的窗口占用, // 因为它们实际上就是 input 的一部分--只是分成了"新写入缓存"和"从缓存读取"两种来源. // API 返回时 input_tokens 不含 cache 部分,需要加回来才是真正的上下文占用量. func GetTokenCountFromUsage(usage *query.Usage) int { if usage == nil { return 0 } return usage.InputTokens + usage.Cache.Written + usage.Cache.Read + usage.OutputTokens } // GetBillingTokens 计算计费 token(考虑 cache 折扣/溢价). // 用于:成本报告. // // 计费规则: // - input_tokens: 按原价计费 // - output_tokens: 按原价计费 // - cache_read: 按 10% 计费(缓存命中的折扣) // - cache_creation: 按 125% 计费(首次写入缓存的溢价) // // 升华改进(ELEVATED): 用 ModelConfig 的实际价格计算,而非硬编码比例. // 每个模型的 cache 定价不同(Opus cache_read=$1.5 vs Haiku cache_read=$0.08). // 替代方案:硬编码 10%/125% 比例(忽略模型差异,在 Opus 和 Haiku 之间误差显著). func GetBillingTokens(usage *query.Usage, pricing *config.ModelConfig) float64 { if usage == nil || pricing == nil { return 0 } // 计算实际美元成本(更准确的方式) inputCost := float64(usage.InputTokens) * pricing.InputPricePer1M / tokenCostDivisor outputCost := float64(usage.OutputTokens) * pricing.OutputPricePer1M / tokenCostDivisor cacheReadCost := float64(usage.Cache.Read) * pricing.CacheReadPricePer1M / tokenCostDivisor cacheWriteCost := float64(usage.Cache.Written) * pricing.CacheWritePricePer1M / tokenCostDivisor return inputCost + outputCost + cacheReadCost + cacheWriteCost } // GetFinalContextTokens 计算 task_budget 用的 token. // 用于:和服务端对齐的预算倒计时. // = input_tokens + output_tokens(不含 cache,和服务端公式一致) // // 精妙之处(CLEVER): 服务端按 input_tokens + output_tokens 扣预算,不算 cache. // 如果我们用含 cache 的值做倒计时,预算会比服务端快耗尽,导致提前终止. // 保持和服务端公式一致,避免 client/server 对不上. func GetFinalContextTokens(usage *query.Usage) int { if usage == nil { return 0 } return usage.InputTokens + usage.OutputTokens } // ============================================================ // 4. 有效窗口多层计算 // ============================================================ // EffectiveContextWindow 计算有效上下文窗口(扣除摘要输出预留). // // 有效窗口 = 模型窗口 - min(模型 maxOutput, MaxOutputTokensForSummary) // 为什么要扣除?压缩时需要为摘要输出预留空间,否则压缩 API 调用本身会溢出. func (m *TokenBudgetManager) EffectiveContextWindow(model string) int { window := m.modelRegistry.ContextWindow(model) maxOutput := m.maxOutputForModel(model) reserved := min(maxOutput, MaxOutputTokensForSummary) return window - reserved } // AutoCompactThreshold 计算自动压缩触发阈值. // 阈值 = 有效窗口 - AutoCompactBufferTokens func (m *TokenBudgetManager) AutoCompactThreshold(model string) int { threshold := m.EffectiveContextWindow(model) - AutoCompactBufferTokens if threshold < MinEffectiveWindowFloor { threshold = MinEffectiveWindowFloor } return threshold } // ManualCompactThreshold 计算手动压缩触发阈值(更激进). // 阈值 = 有效窗口 - ManualCompactBuffer func (m *TokenBudgetManager) ManualCompactThreshold(model string) int { threshold := m.EffectiveContextWindow(model) - ManualCompactBuffer if threshold < MinEffectiveWindowFloor { threshold = MinEffectiveWindowFloor } return threshold } // TokenWarningState 描述当前 token 用量的警告状态. // 消费层根据此状态展示不同的 UI 指示(绿/黄/红/阻塞). type TokenWarningState struct { // PercentUsed 已使用百分比 (0-100) PercentUsed int // PercentLeft 剩余百分比 (0-100) PercentLeft int // IsAboveWarningThreshold 是否超过黄色警告阈值 IsAboveWarningThreshold bool // IsAboveErrorThreshold 是否超过红色警告阈值 IsAboveErrorThreshold bool // IsAboveAutoCompactThreshold 是否超过自动压缩阈值 IsAboveAutoCompactThreshold bool // IsAtBlockingLimit reports whether usage has reached 100% of the effective // window (remaining <= 0). Observability signal only — the engine does not // refuse the next turn here; the pre-turn maybeCompact path (engine.go // ShouldCompact + forceCompact) is the real enforcement. Consumers use // this to render a blocking-level indicator; it strictly implies // IsAboveErrorThreshold and IsAboveWarningThreshold. // // IsAtBlockingLimit 表示 token 用量已达有效窗口 100% (remaining <= 0). // 仅为可观测信号 — 引擎并不在此处拒绝下轮, 真正的兜底是下轮 pre-turn // maybeCompact (engine.go ShouldCompact + forceCompact). 消费层据此渲染 // 阻塞级指示符; 此字段严格蕴含 IsAboveErrorThreshold 和 // IsAboveWarningThreshold 为 true. IsAtBlockingLimit bool } // CalculateWarningState 计算各种警告状态. // // 阈值层级(从松到紧): // - 自动压缩阈值:有效窗口 - 13K // - 黄色警告阈值:有效窗口 - 20K(还有 20K 空间) // - 红色警告阈值:有效窗口 - 20K + 20K = 有效窗口的 ~90%(紧急状态) // - 阻塞限制:100% 有效窗口 func (m *TokenBudgetManager) CalculateWarningState(tokenUsage int, model string) *TokenWarningState { effective := m.EffectiveContextWindow(model) if effective <= 0 { effective = 1 // 避免除零 } percentUsed := int(float64(tokenUsage) / float64(effective) * percentFull) if percentUsed > percentFull { percentUsed = percentFull } if percentUsed < 0 { percentUsed = 0 } percentLeft := percentFull - percentUsed remaining := effective - tokenUsage state := &TokenWarningState{ PercentUsed: percentUsed, PercentLeft: percentLeft, IsAboveAutoCompactThreshold: tokenUsage >= m.AutoCompactThreshold(model), IsAboveWarningThreshold: remaining <= WarningThresholdBuffer, IsAboveErrorThreshold: remaining <= ErrorThresholdBuffer-AutoCompactBufferTokens, IsAtBlockingLimit: remaining <= 0, } return state } // PickWarningCode returns the most-severe WarningEvent code warranted by the // state, or empty when no event should fire. Callers emit the event with this // code and a code-specific message. Severity-first order ensures a blocking // state surfaces as "blocked" rather than being swallowed by the weaker // "critical" branch — the dead-field audit on IsAtBlockingLimit surfaced the // original bug where the else-if chain tested weakest first. // // PickWarningCode 返回 state 对应最严重的 WarningEvent code, 空串表示无事件. // 调用方据此 code 发 WarningEvent 并填具体消息. 严重优先顺序 (blocking > // critical > warning) 确保 blocking 状态真正以 "blocked" code 暴露, 而不是 // 被较弱的 "critical" 分支吞掉 — 原 else-if 链从最弱判起正是 IsAtBlockingLimit // 成 dead 字段的根因. func PickWarningCode(state *TokenWarningState) string { if state == nil { return "" } switch { case state.IsAtBlockingLimit: return "context_window_blocked" case state.IsAboveErrorThreshold: return "context_window_critical" case state.IsAboveWarningThreshold: return "context_window_warning" } return "" } // ============================================================ // 5. Thinking token 扣减 // ============================================================ // EffectiveContextWindowWithThinking 计算扣除 thinking 预算后的有效窗口. // // 升华改进(ELEVATED): thinking token 从有效窗口中扣除. // 如果 thinking budget 是 10K,实际可用上下文 = 200K - 10K = 190K. // 不扣减的话压缩阈值会偏高,导致 thinking 和内容争抢窗口空间. // 替代方案:不扣减,让 thinking 和内容自然竞争(原始实现,可能导致意外截断). func (m *TokenBudgetManager) EffectiveContextWindowWithThinking(model string, thinkingBudget int) int { effective := m.EffectiveContextWindow(model) if thinkingBudget > 0 { effective -= thinkingBudget } if effective < MinEffectiveWindowFloor { effective = MinEffectiveWindowFloor } return effective } // AutoCompactThresholdWithThinking 计算扣除 thinking 后的自动压缩阈值. func (m *TokenBudgetManager) AutoCompactThresholdWithThinking(model string, thinkingBudget int) int { effective := m.EffectiveContextWindowWithThinking(model, thinkingBudget) threshold := effective - AutoCompactBufferTokens if threshold < MinEffectiveWindowFloor { threshold = MinEffectiveWindowFloor } return threshold } // ============================================================ // 6. 动态窗口查询(模型切换时) // ============================================================ // OnModelSwitch 模型切换时检查当前用量是否超过新模型的阈值. // // 精妙之处(CLEVER): 切换模型时上下文窗口可能变化. // 从 Opus (200K) 切到 Haiku (200K) 没事, // 但如果切到某个只有 128K 窗口的模型,压缩阈值要跟着变. // 立刻检查并通知消费层,避免下一轮 API 调用才发现溢出. func (m *TokenBudgetManager) OnModelSwitch(oldModel, newModel string, currentUsage int) *TokenWarningState { newState := m.CalculateWarningState(currentUsage, newModel) if newState.IsAboveAutoCompactThreshold { // 埋点说明:模型切换导致的溢出是一个重要的运维事件. // 如果频繁发生,说明用户在大上下文场景中切模型,可能需要自动压缩提示. m.observer.Event("token_budget_model_switch_overflow", map[string]any{ "old_model": oldModel, "new_model": newModel, "current_usage": currentUsage, "new_threshold": m.AutoCompactThreshold(newModel), "old_threshold": m.AutoCompactThreshold(oldModel), "percent_used": newState.PercentUsed, }) } return newState } // ============================================================ // 内部辅助函数 // ============================================================ // estimateAllMessages 用 tokenizer 粗估消息列表的 token 数. func (m *TokenBudgetManager) estimateAllMessages(messages []query.Message) int { var tokenizerMsgs []tokenizer.Message for _, msg := range messages { text := extractMessageText(msg) tokenizerMsgs = append(tokenizerMsgs, tokenizer.Message{ Role: string(msg.Role), Content: text, }) } return tokenizer.EstimateMessageTokens(tokenizerMsgs) } // extractMessageText 从 query.Message 中提取纯文本用于 token 估算. func extractMessageText(msg query.Message) string { var parts []string for _, c := range msg.Content { switch c.Type { case query.ContentText: parts = append(parts, c.Text) case query.ContentThinking: parts = append(parts, c.Text) case query.ContentToolUse: // 工具调用:名称 + 输入 JSON 的粗略估算 parts = append(parts, c.Name) if c.Input != nil { // 粗略按 key=value 对估算 for k, v := range c.Input { parts = append(parts, k) if s, ok := v.(string); ok { parts = append(parts, s) } } } case query.ContentToolResult: parts = append(parts, c.Text) } } if len(parts) == 0 { return "" } result := "" for i, p := range parts { if i > 0 { result += "\n" } result += p } return result } // extractUsageFromMetadata 从消息元数据中提取 Usage 信息. // Usage 存储在 Metadata["usage"] 中,由 engine 在收到 API 响应后注入. func extractUsageFromMetadata(msg *query.Message) *query.Usage { if msg.Metadata == nil { return nil } usageRaw, ok := msg.Metadata["usage"] if !ok { return nil } usage, ok := usageRaw.(*query.Usage) if ok { return usage } // 尝试 map[string]any 类型(JSON 反序列化后可能是这种类型) usageMap, ok := usageRaw.(map[string]any) if !ok { return nil } return &query.Usage{ InputTokens: intFromMap(usageMap, "input_tokens"), OutputTokens: intFromMap(usageMap, "output_tokens"), Cache: query.CacheTokens{ Read: intFromMap(usageMap, "cache_read_input_tokens"), Written: intFromMap(usageMap, "cache_creation_input_tokens"), }, } } // getAPIResponseID 从消息元数据中获取 API 响应 ID. func getAPIResponseID(msg *query.Message) string { if msg.Metadata == nil { return "" } id, _ := msg.Metadata["api_response_id"].(string) return id } // intFromMap 从 map 中安全提取 int 值. func intFromMap(m map[string]any, key string) int { v, ok := m[key] if !ok { return 0 } switch n := v.(type) { case int: return n case float64: return int(n) case int64: return int(n) default: return 0 } } // maxOutputForModel 获取模型的最大输出 token 数. func (m *TokenBudgetManager) maxOutputForModel(model string) int { cfg := m.modelRegistry.GetConfig(model) if cfg != nil && cfg.MaxOutputTokens > 0 { return cfg.MaxOutputTokens } // 默认值 return DefaultMaxOutputTokens }