// Package context - compact.go 实现上下文压缩功能. // // 对应原项目中同名模块的功能. // 当对话消息的 token 总量接近模型上下文窗口时,自动触发压缩: // - 调用轻量模型(Haiku)生成对话摘要 // - 用摘要替换旧消息,保留最近几轮 // - MicroCompact 则是轻量修剪:截断旧的工具结果内容 // // 改进: // - 使用更准确的 token 估算(基于 tokenizer 包) // - 压缩后自动注入 PostCompactContext(最近文件列表,git diff 摘要) // - 压缩提示词改进(保留文件路径,函数名,关键决策) // - MicroCompact 智能截断(保留头尾而不只是头部) // - 工具错误输出保留完整内容(错误信息通常更重要) // - 区分部分压缩和完全压缩 // - 支持可插拔的 CompactionPolicy 和 PostCompactRestorer // - 消息按 API 往返分组,压缩边界对齐到组边界 // - 多策略支持(full / partial / reactive) package context import ( "context" "encoding/json" "fmt" "math" "strings" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/internal/transport/retry" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // L1326 衍生 a (2026-04-16) 重构: context 包直接消费 flyto.EventObserver 作为 // Observer 契约, 不再定义本地 CompactObserver 接口. // // 历史包袱(LEGACY): 早期注释声称 "直接依赖 flyto.EventObserver 会循环依赖" 是误判 -- // flyto 是零外部依赖的契约层 (见 pkg/flyto/doc.go), 整个项目 29 个包 // import flyto, flyto 零反向 import. context → flyto 单向依赖完全安全, // 不构成循环. 方法集"逐字相同"+Go 结构化类型鸭子类型隐式满足的模式 // 造成了隐性 coupling 债 (一方加方法另一方不编译报错). // 现在 Compressor.observer 字段直接是 flyto.EventObserver, 契约变化编译期强制同步. // // 替代方案: 保留 CompactObserver 作为 flyto.EventObserver 的类型别名 // (type CompactObserver = flyto.EventObserver) - 否决, 两个名字指同一 // 类型增加读者认知负担 (与 memory 包 L1326 决策保持一致). // noopCompactObserver 是 context 包的内部 noop 实现, 在 Observer 未注入时兜底. // 它自然满足 flyto.EventObserver (Event + Error 两个空方法), 不引入对 // engine.NoopObserver 的依赖 (会构成 context → engine 循环). type noopCompactObserver struct{} func (n *noopCompactObserver) Event(name string, data map[string]any) {} func (n *noopCompactObserver) Error(err error, ctx map[string]any) {} // DefaultContextWindow 是默认的上下文窗口大小(token 数). // 当通过 ModelRegistry 查询不到时使用此默认值. const DefaultContextWindow = 200000 // ContextWindowFunc 是获取模型上下文窗口大小的回调函数类型. // 允许外部注入 ModelRegistry 的查询能力,避免循环依赖. type ContextWindowFunc func(model string) int // contextWindowProvider 是全局的上下文窗口查询函数. // 默认返回 DefaultContextWindow;可通过 SetContextWindowProvider 注入 ModelRegistry. // // 精妙之处(CLEVER): 全局兜底仅用于没有注入实例级 contextWindowFn 的旧代码路径. // 新代码应通过 Compressor.SetContextWindowFn 注入实例级函数-- // 这样同进程多 Engine 实例各自持有独立查询函数,互不覆盖. // 替代方案(原方案): 维护 modelContextWindows 硬编码表(需随每次新模型发布手动更新, // 且与 ModelRegistry 数据重复,二者不一致时静默返回旧窗口大小). var contextWindowProvider ContextWindowFunc = defaultContextWindowLookup // defaultContextWindowLookup 是默认的上下文窗口查询实现. // 返回 DefaultContextWindow(所有现代大模型的通用默认值). // 实际窗口大小通过 Compressor.SetContextWindowFn 注入 ModelRegistry 查询. func defaultContextWindowLookup(_ string) int { return DefaultContextWindow } // SetContextWindowProvider 设置全局的上下文窗口查询函数. // 典型用法:在引擎初始化时注入 ModelRegistry 的查询能力. // // context.SetContextWindowProvider(func(model string) int { // return registry.ContextWindow(model) // }) func SetContextWindowProvider(fn ContextWindowFunc) { if fn != nil { contextWindowProvider = fn } } // CompactModelFunc 是获取压缩用模型 ID 的回调函数类型. type CompactModelFunc func() string // compactModelProvider 是获取压缩用模型 ID 的全局回调. // 默认返回空字符串--必须通过 SetCompactModelProvider 注入实际模型 ID. // 历史包袱(LEGACY): 早期方案硬编码 "claude-haiku-3-5",已移除以消除 Anthropic 耦合. var compactModelProvider CompactModelFunc = func() string { return "" // 由 provider/config 层注入 } // SetCompactModelProvider 设置获取压缩用模型 ID 的回调函数. // 典型用法:注入 ModelRegistry 的 RoleFast 查询. // // context.SetCompactModelProvider(func() string { // return cfg.ModelForRole(config.RoleFast) // }) func SetCompactModelProvider(fn CompactModelFunc) { if fn != nil { compactModelProvider = fn } } // getCompactModel 获取当前配置的压缩用模型 ID. func getCompactModel() string { return compactModelProvider() } // 压缩相关常量 const ( // compactReserveTokens 是压缩预留空间(token 数). // 留出足够空间给新的用户消息和模型回复. compactReserveTokens = 13000 // microCompactMaxLen 是 MicroCompact 保留的工具结果最大字符数. // 超过此长度的旧工具结果会被截断. microCompactMaxLen = 8000 // microCompactTailLen 是 MicroCompact 截断时保留的尾部字符数. // 保留头尾两端比只保留头部更有用(尾部通常有结论/错误信息). microCompactTailLen = 2000 // recentTurnsToKeep 是压缩时保留的最近轮次数. // 保留最近几轮完整消息,避免丢失关键上下文. recentTurnsToKeep = 4 // maxRecentFiles 是压缩后保留的最近文件路径数量. maxRecentFiles = 5 // compactFallbackMessages 是压缩失败时兜底摘要的消息数量. // // 精妙之处(CLEVER): 压缩失败时的最后防线-- // 取最近 5 条消息拼接为 fallback 摘要,确保 Agent 不会失忆. // 这比空字符串好:至少保留了最近的上下文. // N=5 是经验值:足够保留最近意图,又不超过 compact 后的 token 预算. // 替代方案:N=3(太少,丢失中间意图);N=10(可能超预算且含大量工具噪音). compactFallbackMessages = 5 ) // CompactMessage 是压缩模块使用的通用消息格式. // 与 api.RequestMessage 解耦,避免循环依赖. type CompactMessage struct { Role string `json:"role"` Content json.RawMessage `json:"content"` } // CompactStrategy 压缩策略类型. type CompactStrategy string const ( // StrategyFull 完整压缩--生成摘要替换所有旧消息 StrategyFull CompactStrategy = "full" // StrategyPartial 部分压缩--保留前缀,只压缩中间部分 StrategyPartial CompactStrategy = "partial" // StrategyReactive 反应式压缩--API 413 后紧急裁剪 StrategyReactive CompactStrategy = "reactive" ) // CompactResult 是压缩操作的结果. type CompactResult struct { // Messages 是压缩后的消息列表 Messages []CompactMessage // Summary 是生成的对话摘要 Summary string // KeptMessages 保留的最近消息 KeptMessages []CompactMessage // RestoredItems 恢复的内容 RestoredItems []RestoreItem // Strategy 使用的策略 Strategy CompactStrategy // TokensBefore 压缩前的估算 token 数 TokensBefore int // TokensAfter 压缩后的估算 token 数 TokensAfter int // DurationMs 压缩耗时(毫秒) DurationMs int64 } // 历史包袱(LEGACY, 2026-04-14 删除): 曾在此处定义包级函数 // AutoCompactThreshold(model string) int - 公式 windowSize - compactReserveTokens. // 删除原因: // 1. 唯一非测试调用方 engine.go:3456 做 +compactReserveTokens 抵消, 真实意图是 // "拿物理窗口", 该调用已直接改走 ModelRegistry.ContextWindow(model). // 2. 其内部依赖 contextWindowProvider 的 setter SetContextWindowProvider 全代码零调用, // 永远回落到 DefaultContextWindow=200000, model 参数是被忽略的幌子. // 3. 与 engine.TokenBudgetManager 的阈值方法语义重复 (后者公式更精细, 考虑 // maxOutput 预留 + thinking 扣减 + 是生产路径的唯一真相源). // contextWindowProvider 本身保留 - Compressor.effectiveContextWindow 仍依赖它作为 // 实例级 contextWindowFn 为 nil 时的全局兜底, 属另一笔独立清理. // EstimateTokens 估算消息列表的 token 数. // 使用改进的估算策略: // - 对英文文本:按空格分词,每词约 1.3 token // - 对代码/JSON:按结构特征估算 // - 对 CJK 字符:每字符约 1.5 token // // 比旧的 4 chars/token 估算准确 2-3 倍. func EstimateTokens(messages []CompactMessage) int { totalTokens := 0 for _, msg := range messages { // 角色名开销约 3-4 token totalTokens += 4 // 解析消息内容 var text string if err := json.Unmarshal(msg.Content, &text); err == nil { totalTokens += estimateTextTokens(text) } else { // 可能是 content block 数组 totalTokens += estimateTextTokens(string(msg.Content)) } } // 消息序列格式开销 totalTokens += 3 if totalTokens < 1 { totalTokens = 1 } return totalTokens } // estimateTextTokens 估算一段文本的 token 数. // 综合考虑英文,代码,CJK 字符的不同编码效率. func estimateTextTokens(text string) int { if len(text) == 0 { return 0 } tokens := 0 cjkCount := 0 wordChars := 0 otherChars := 0 for _, r := range text { if isCJKRune(r) { cjkCount++ } else if isWordChar(r) { wordChars++ } else { otherChars++ } } // CJK 字符:每个约 1.5 token tokens += (cjkCount*3 + 1) / 2 // 英文/代码文本:按单词估算 // 平均每个英文单词约 5 个字符,约 1.3 个 token // 所以 wordChars / 5 * 1.3 ≈ wordChars * 0.26 // 简化为 wordChars / 4 tokens += wordChars / 4 if wordChars > 0 && tokens == 0 { tokens = 1 } // 标点和特殊字符:约 2-3 个字符 1 个 token tokens += (otherChars + 2) / 3 return tokens } // isCJKRune 判断是否为 CJK 字符. func isCJKRune(r rune) bool { return (r >= 0x4E00 && r <= 0x9FFF) || // CJK Unified Ideographs (r >= 0x3400 && r <= 0x4DBF) || // CJK Extension A (r >= 0x3000 && r <= 0x303F) || // CJK Symbols and Punctuation (r >= 0x3040 && r <= 0x309F) || // Hiragana (r >= 0x30A0 && r <= 0x30FF) || // Katakana (r >= 0xAC00 && r <= 0xD7AF) || // Hangul (r >= 0xFF00 && r <= 0xFFEF) // Fullwidth Forms } // isWordChar 判断是否为单词字符(字母,数字,下划线). func isWordChar(r rune) bool { return (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' } // Compact 执行上下文压缩(向后兼容的旧入口). // 历史包袱(LEGACY): 早期方案使用 apiKey/baseURL 直连 Anthropic HTTP API. // 已改为要求 provider 非 nil,调用方应迁移到 Compressor.CompactTiered(). func Compact(messages []CompactMessage, provider flyto.ModelProvider, model string) (*CompactResult, error) { if provider == nil { return nil, fmt.Errorf("compact: provider is nil; use Compressor.CompactTiered() with an injected provider") } if len(messages) <= recentTurnsToKeep*2 { // 消息太少,无需压缩 return &CompactResult{ Messages: messages, Summary: "", TokensBefore: EstimateTokens(messages), TokensAfter: EstimateTokens(messages), Strategy: StrategyFull, }, nil } tokensBefore := EstimateTokens(messages) // 将消息分为两部分:旧消息(需要压缩)和最近消息(保留) keepCount := recentTurnsToKeep * 2 if keepCount > len(messages) { keepCount = len(messages) } splitIdx := len(messages) - keepCount oldMessages := messages[:splitIdx] recentMessages := messages[splitIdx:] // 提取关键上下文信息(用于 PostCompactContext) postCtx := buildPostCompactContext(messages) // 通过 Provider 生成摘要 summary, err := generateSummaryViaProvider(context.Background(), oldMessages, provider, model, nil) if err != nil { return nil, fmt.Errorf("compact: generate summary: %w", err) } // 在摘要后追加 PostCompactContext if postCtx != "" { summary = summary + "\n\n" + postCtx } // 构建压缩后的消息列表:摘要消息 + 最近消息 summaryContent, _ := json.Marshal(summary) compactedMessages := make([]CompactMessage, 0, 1+len(recentMessages)) // 摘要作为第一条 user 消息 compactedMessages = append(compactedMessages, CompactMessage{ Role: "user", Content: summaryContent, }) // 追加最近消息 compactedMessages = append(compactedMessages, recentMessages...) tokensAfter := EstimateTokens(compactedMessages) return &CompactResult{ Messages: compactedMessages, Summary: summary, KeptMessages: recentMessages, Strategy: StrategyFull, TokensBefore: tokensBefore, TokensAfter: tokensAfter, }, nil } // CompactOption is a functional option for CompactTiered. Added 2026-04-19 // so callers can hand the compressor a precise token gap parsed from a // PromptTooLong API error (see internal/transport.APIError.TokenGap), which // lets truncateAndRetryCompact jump straight to the right stride instead // of shaving one group at a time. Older callers that don't have the gap // continue to omit the option; behavior is unchanged when no option is // passed. // // CompactOption 是 CompactTiered 的 functional option. 2026-04-19 新加, // 让调用方把从 PromptTooLong API 错误解出的精确 token gap (见 // internal/transport.APIError.TokenGap) 传给压缩器, truncateAndRetryCompact // 一步跳到正确的步长, 不用一组一组砍. 老调用方不传此 option 行为不变. type CompactOption func(*compactOptions) type compactOptions struct { // tokenGap is the "how many tokens over the limit" hint from the // last PromptTooLong API error, parsed by api.ParseTokenGap. Zero // means "no hint, fall back to incremental shaving". // // tokenGap 是最近一次 PromptTooLong API 错误报出的溢出量 (由 // api.ParseTokenGap 解析). 0 表示 "没有提示, 回退到逐组砍". tokenGap int } // WithTokenGap supplies the precise excess-token count from the last // PromptTooLong error to CompactTiered. If > 0, truncateAndRetryCompact // accumulates the oldest groups' tokens until it meets or exceeds the gap // and drops them in one pass, then retries compactFull. If that still // returns PromptTooLong, the existing incremental retry loop takes over. // // WithTokenGap 把最近一次 PromptTooLong 错误报出的精确溢出 token 数传进 // CompactTiered. > 0 时 truncateAndRetryCompact 从最旧的组开始累积 token, // 一旦累积量 >= gap 就一次性丢掉, 再试一次 compactFull. 那次还 PromptTooLong // 就 fall through 到原来的逐组重试 loop. func WithTokenGap(n int) CompactOption { return func(o *compactOptions) { if n > 0 { o.tokenGap = n } } } // CompactWithStrategy 使用指定策略和策略接口执行压缩. // 升华改进(ELEVATED): 三种策略各有场景-- // - Full: 常规压缩,生成完整摘要替换旧消息 // - Partial: 只压缩中间部分,保留首尾(适合长对话中间有大量工具输出) // - Reactive: API 返回 413 后紧急裁剪,不调用摘要 API,直接丢弃最旧的组 // // 替代方案:只有 Full 一种策略(原始设计,无法处理 413 紧急情况和部分压缩需求). func (c *Compressor) CompactWithStrategy( ctx context.Context, messages []CompactMessage, strategy CompactStrategy, policy CompactionPolicy, ) (*CompactResult, error) { start := time.Now() obs := c.getObserver() if policy == nil { policy = &DefaultCodePolicy{} } tokensBefore := EstimateTokens(messages) // 埋点说明:压缩开始是容量规划的关键事件--记录触发时的 token 量和策略, // 用于分析"什么时候触发压缩"和"哪种策略被用得最多". obs.Event("compact_started", map[string]any{ "strategy": string(strategy), "tokens_before": tokensBefore, }) // 策略预处理 preprocessed := policy.PreprocessForCompaction(messages) // 按 API 往返分组 groups := GroupByAPIRound(preprocessed) if len(groups) == 0 { return &CompactResult{ Messages: messages, Strategy: strategy, TokensBefore: tokensBefore, TokensAfter: tokensBefore, DurationMs: time.Since(start).Milliseconds(), }, nil } var result *CompactResult var err error switch strategy { case StrategyReactive: result, err = c.compactReactive(groups, tokensBefore) case StrategyPartial: result, err = c.compactPartial(ctx, groups, policy, tokensBefore) default: // StrategyFull result, err = c.compactFull(ctx, groups, policy, tokensBefore) } if err != nil { // 埋点说明:压缩失败需要告警--压缩失败意味着上下文可能溢出, // 后续 API 调用会 413,用户体验严重退化. obs.Error(err, map[string]any{ "strategy": string(strategy), "phase": "compact_with_strategy", }) return nil, err } result.Strategy = strategy result.DurationMs = time.Since(start).Milliseconds() // 压缩后执行恢复 if c.restoreManager != nil { state := c.buildCompactState(messages) restored, restoreErr := c.restoreManager.Execute(ctx, state) if restoreErr == nil && len(restored) > 0 { result.RestoredItems = restored // 将恢复内容作为附加消息追加 restoreMsg := formatRestoredItems(restored) if restoreMsg != "" { restoreContent, _ := json.Marshal(restoreMsg) result.Messages = append(result.Messages, CompactMessage{ Role: "user", Content: restoreContent, }) result.TokensAfter = EstimateTokens(result.Messages) } } } // 埋点说明:压缩完成是效果评估的核心数据--before/after 的 token 差值 // 直接反映压缩效率,duration_ms 用于性能监控,restored_items 用于评估恢复质量. obs.Event("compact_completed", map[string]any{ "strategy": string(result.Strategy), "tokens_before": result.TokensBefore, "tokens_after": result.TokensAfter, "duration_ms": result.DurationMs, "restored_items": len(result.RestoredItems), }) return result, nil } // compactFull 完整压缩策略. func (c *Compressor) compactFull( ctx context.Context, groups []MessageGroup, policy CompactionPolicy, tokensBefore int, ) (*CompactResult, error) { // 保留最近 N 轮 keepRounds := policy.MaxRecentRoundsToKeep() if keepRounds <= 0 { keepRounds = recentTurnsToKeep } // 精妙之处(CLEVER): 压缩边界对齐到组边界.不是简单地按消息数切分, // 而是按完整的 API 往返组切分,保证不会把 user 和 assistant 拆开. splitGroupIdx := len(groups) - keepRounds if splitGroupIdx < 1 { splitGroupIdx = 1 // 至少压缩第一组 } if splitGroupIdx >= len(groups) { // 没有足够的组来压缩 var allMsgs []CompactMessage for _, g := range groups { allMsgs = append(allMsgs, g.Messages...) } return &CompactResult{ Messages: allMsgs, TokensBefore: tokensBefore, TokensAfter: tokensBefore, }, nil } // 收集旧消息(需要压缩) var oldMessages []CompactMessage for _, g := range groups[:splitGroupIdx] { oldMessages = append(oldMessages, g.Messages...) } // 收集最近消息(保留) var recentMessages []CompactMessage for _, g := range groups[splitGroupIdx:] { recentMessages = append(recentMessages, g.Messages...) } // 剥离图像后调用摘要 API // 升华改进(ELEVATED): 始终走 Provider 路径(flyto.ModelProvider 接口统一调用). // provider 为 nil 时生成失败--调用方(CompactTiered/DoCompact)负责兜底降级. stripped := StripImages(oldMessages) var summary string var err error if c.provider != nil { summary, err = generateSummaryViaProvider(ctx, stripped, c.provider, c.effectiveCompactModel(), policy) } else { err = fmt.Errorf("compact: no provider configured; inject via NewCompressor(threshold, provider)") } if err != nil { return nil, fmt.Errorf("compact full: generate summary: %w", err) } // 精妙之处(CLEVER): compactFull 层的双重兜底-- // generateSummary 内部已有 buildFallbackSummary,但 summary 仍可能为空(极端情况: // 所有 messages 内容均为空字符串,fallback 拼接结果也是空). // 这里二次兜底保证 CompactResult.Summary 永远不为空, // 并且 compacted 消息列表不会出现空内容的摘要消息(等同于删除全部历史). // 替代方案:依赖 generateSummary 单层兜底(但 generateSummary 只看 stripped messages, // 若 oldMessages 为空,则 stripped 也为空,fallback 会返回固定字符串而非最近消息). if summary == "" { summary = buildFallbackSummary(recentMessages) if summary == "" { summary = "Conversation history was compacted." } } // 构建压缩后消息 summaryContent, _ := json.Marshal("[Previous conversation summary]\n" + summary) compacted := make([]CompactMessage, 0, 1+len(recentMessages)) compacted = append(compacted, CompactMessage{ Role: "user", Content: summaryContent, }) compacted = append(compacted, recentMessages...) return &CompactResult{ Messages: compacted, Summary: summary, KeptMessages: recentMessages, TokensBefore: tokensBefore, TokensAfter: EstimateTokens(compacted), }, nil } // compactPartial 部分压缩策略--保留前言组和最近组,只压缩中间部分. func (c *Compressor) compactPartial( ctx context.Context, groups []MessageGroup, policy CompactionPolicy, tokensBefore int, ) (*CompactResult, error) { keepRounds := policy.MaxRecentRoundsToKeep() if keepRounds <= 0 { keepRounds = recentTurnsToKeep } if len(groups) <= keepRounds+1 { // 组太少,降级到完整压缩 return c.compactFull(ctx, groups, policy, tokensBefore) } // 保留前言(group 0)和最近 keepRounds 组 preamble := groups[0] middleGroups := groups[1 : len(groups)-keepRounds] recentGroups := groups[len(groups)-keepRounds:] // 收集中间消息用于摘要 var middleMessages []CompactMessage for _, g := range middleGroups { middleMessages = append(middleMessages, g.Messages...) } stripped := StripImages(middleMessages) var summary string var err error if c.provider != nil { summary, err = generateSummaryViaProvider(ctx, stripped, c.provider, c.effectiveCompactModel(), policy) } else { err = fmt.Errorf("compact: no provider configured; inject via NewCompressor(threshold, provider)") } if err != nil { return nil, fmt.Errorf("compact partial: generate summary: %w", err) } // 精妙之处(CLEVER): compactPartial 层的双重兜底,与 compactFull 保持一致. // 确保 CompactResult.Summary 永远不为空,消息列表不出现空摘要消息. if summary == "" { var recentForFallback []CompactMessage for _, g := range recentGroups { recentForFallback = append(recentForFallback, g.Messages...) } summary = buildFallbackSummary(recentForFallback) if summary == "" { summary = "Conversation history (middle section) was compacted." } } // 组装:前言 + 摘要 + 最近消息 summaryContent, _ := json.Marshal("[Conversation summary (middle section)]\n" + summary) compacted := make([]CompactMessage, 0, len(preamble.Messages)+1+keepRounds*4) compacted = append(compacted, preamble.Messages...) compacted = append(compacted, CompactMessage{ Role: "user", Content: summaryContent, }) var recentMessages []CompactMessage for _, g := range recentGroups { recentMessages = append(recentMessages, g.Messages...) } compacted = append(compacted, recentMessages...) return &CompactResult{ Messages: compacted, Summary: summary, KeptMessages: recentMessages, TokensBefore: tokensBefore, TokensAfter: EstimateTokens(compacted), }, nil } // compactReactive 反应式压缩--不调用 API,直接丢弃最旧的组. // 用于 API 返回 413 后的紧急裁剪. func (c *Compressor) compactReactive( groups []MessageGroup, tokensBefore int, ) (*CompactResult, error) { if len(groups) <= 2 { var allMsgs []CompactMessage for _, g := range groups { allMsgs = append(allMsgs, g.Messages...) } return &CompactResult{ Messages: allMsgs, TokensBefore: tokensBefore, TokensAfter: EstimateTokens(allMsgs), }, nil } // 丢弃前半部分的组,保留后半部分 keepFrom := len(groups) / 2 var kept []CompactMessage for _, g := range groups[keepFrom:] { kept = append(kept, g.Messages...) } // 添加一条提示消息 notice, _ := json.Marshal("[Note: Earlier conversation was dropped due to context size limits. Some context may be missing.]") compacted := make([]CompactMessage, 0, 1+len(kept)) compacted = append(compacted, CompactMessage{ Role: "user", Content: notice, }) compacted = append(compacted, kept...) return &CompactResult{ Messages: compacted, Summary: "[reactive compaction: dropped oldest groups]", KeptMessages: kept, TokensBefore: tokensBefore, TokensAfter: EstimateTokens(compacted), }, nil } // buildCompactState 从消息历史中提取压缩状态快照. func (c *Compressor) buildCompactState(messages []CompactMessage) *CompactState { state := &CompactState{ CustomData: make(map[string]any), } // 提取文件路径 seenFiles := make(map[string]bool) for i := len(messages) - 1; i >= 0 && len(state.RecentFiles) < maxRecentFiles; i-- { paths := extractFilePaths(string(messages[i].Content)) for _, p := range paths { if !seenFiles[p] && len(state.RecentFiles) < maxRecentFiles { seenFiles[p] = true state.RecentFiles = append(state.RecentFiles, p) } } } return state } // formatRestoredItems 将恢复的条目格式化为消息文本. func formatRestoredItems(items []RestoreItem) string { if len(items) == 0 { return "" } var sb strings.Builder sb.WriteString("[Post-compact context restoration]\n") for _, item := range items { sb.WriteString("\n") sb.WriteString(item.Content) sb.WriteString("\n") } return sb.String() } // StripImages 压缩 API 调用前剥离图像块. // 精妙之处(CLEVER): 图像对摘要无帮助但可能占 >50K tokens(一张截图). // 剥离后压缩请求的 token 预算全部留给文本内容. func StripImages(messages []CompactMessage) []CompactMessage { result := make([]CompactMessage, len(messages)) for i, msg := range messages { result[i] = msg var blocks []map[string]any if err := json.Unmarshal(msg.Content, &blocks); err != nil { continue } filtered := make([]map[string]any, 0, len(blocks)) modified := false for _, block := range blocks { blockType, _ := block["type"].(string) if blockType == "image" { modified = true // 替换为占位文本 filtered = append(filtered, map[string]any{ "type": "text", "text": "[image removed for compaction]", }) continue } // 检查嵌套的 source 类型(Document 嵌套图像) if source, ok := block["source"].(map[string]any); ok { if mediaType, _ := source["media_type"].(string); strings.HasPrefix(mediaType, "image/") { modified = true filtered = append(filtered, map[string]any{ "type": "text", "text": "[embedded image removed for compaction]", }) continue } } filtered = append(filtered, block) } if modified { if newContent, err := json.Marshal(filtered); err == nil { result[i].Content = newContent } } } return result } // buildPostCompactContext 从完整消息历史中提取关键上下文信息. // 在压缩后注入,帮助模型恢复重要的上下文: // - 最近读过/编辑过的文件列表 // - 最近的 git diff 摘要(如果有) func buildPostCompactContext(messages []CompactMessage) string { var recentFiles []string seenFiles := make(map[string]bool) var lastGitDiff string // 从后往前扫描,提取文件路径和 git 信息 for i := len(messages) - 1; i >= 0 && len(recentFiles) < maxRecentFiles; i-- { msg := messages[i] content := string(msg.Content) // 提取文件路径(从 tool_use 和 tool_result 中) paths := extractFilePaths(content) for _, p := range paths { if !seenFiles[p] && len(recentFiles) < maxRecentFiles { seenFiles[p] = true recentFiles = append(recentFiles, p) } } // 提取 git diff 摘要 if lastGitDiff == "" && strings.Contains(content, "git diff") { lastGitDiff = extractGitDiffSummary(content) } } if len(recentFiles) == 0 && lastGitDiff == "" { return "" } var sb strings.Builder sb.WriteString("[Post-compact context recovery]") if len(recentFiles) > 0 { sb.WriteString("\nRecently accessed files:") for _, f := range recentFiles { sb.WriteString("\n - ") sb.WriteString(f) } } if lastGitDiff != "" { sb.WriteString("\nRecent git changes: ") sb.WriteString(lastGitDiff) } return sb.String() } // extractFilePaths 从消息内容中提取文件路径. // 查找常见的文件路径模式(绝对路径和相对路径). func extractFilePaths(content string) []string { var paths []string // 查找 "file_path" 或 "path" JSON 字段中的值 for _, key := range []string{`"file_path":"`, `"file_path": "`, `"path":"`, `"path": "`} { idx := 0 for { pos := strings.Index(content[idx:], key) if pos < 0 { break } start := idx + pos + len(key) end := strings.Index(content[start:], `"`) if end > 0 && end < 500 { // 路径不应超过 500 字符 path := content[start : start+end] if isLikelyFilePath(path) { paths = append(paths, path) } } idx = start + 1 if idx >= len(content) { break } } } return paths } // isLikelyFilePath 判断字符串是否可能是文件路径. func isLikelyFilePath(s string) bool { if len(s) < 2 || len(s) > 500 { return false } // 以 / 或 ./ 开头,或包含扩展名 if strings.HasPrefix(s, "/") || strings.HasPrefix(s, "./") { return true } // 包含文件扩展名 if strings.Contains(s, ".") && !strings.HasPrefix(s, "http") { ext := s[strings.LastIndex(s, "."):] switch ext { case ".go", ".ts", ".tsx", ".js", ".jsx", ".py", ".rs", ".java", ".yaml", ".yml", ".json", ".toml", ".md", ".txt", ".sh", ".css", ".html", ".sql", ".proto", ".graphql": return true } } return false } // extractGitDiffSummary 从消息内容中提取 git diff 摘要. func extractGitDiffSummary(content string) string { // 查找 "X files changed, Y insertions, Z deletions" 模式 idx := strings.Index(content, "file") if idx < 0 { return "" } // 向前找数字开头 start := idx for start > 0 && (content[start-1] >= '0' && content[start-1] <= '9' || content[start-1] == ' ') { start-- } // 向后找行末或删除信息 end := idx for end < len(content) && content[end] != '\n' && end-idx < 200 { end++ } summary := strings.TrimSpace(content[start:end]) if len(summary) > 200 { summary = summary[:200] + "..." } return summary } // generateSummaryViaProvider 使用注入的 flyto.ModelProvider 生成对话摘要. // 完全绕开 Anthropic HTTP 直连,让 Compressor 可与任意 Provider 协同工作. // // 升华改进(ELEVATED): 早期方案 generateSummary 硬编码 Anthropic REST 协议(x-api-key 头, // /v1/messages 路径,JSON 请求体格式)--换 Provider 时压缩功能直接失效. // 通过 flyto.ModelProvider.Stream 统一所有 Provider 调用,无需知道下层协议. // 替代方案:<为每个 Provider 维护单独的 HTTP 函数> - 否决:每新增 Provider 就要改 compact.go. func generateSummaryViaProvider(ctx context.Context, messages []CompactMessage, provider flyto.ModelProvider, model string, policy CompactionPolicy) (string, error) { // 将 CompactMessage 序列化为可读文本(与 generateSummary 逻辑一致) var conversationText strings.Builder for _, msg := range messages { var content string if err := json.Unmarshal(msg.Content, &content); err != nil { content = string(msg.Content) } if len(content) > 2000 { content = content[:2000] + "... [truncated]" } conversationText.WriteString(fmt.Sprintf("[%s]: %s\n\n", msg.Role, content)) } preserveKeywords := "file paths, function names, class names, error messages, key decisions" if policy != nil { preserveKeywords = policy.PreserveKeywords() } summaryPrompt := fmt.Sprintf(`IMPORTANT: Do NOT call any tools. Do NOT output tool_use blocks. Simply output a text summary. Summarize the following conversation between a user and an AI assistant. Your summary MUST preserve: 1. **Key identifiers**: %s 2. **Key decisions**: What decisions were made and why 3. **Current state**: What has been accomplished and what remains to be done 4. **Error context**: Any errors encountered and how they were resolved (or not) Be concise but preserve all important details that would be needed to continue the conversation. Conversation: %s`, preserveKeywords, conversationText.String()) req := &flyto.Request{ Model: model, MaxTokens: 2048, Messages: []flyto.Message{ {Role: flyto.RoleUser, Blocks: []flyto.Block{flyto.TextBlock(summaryPrompt)}}, }, } // Compact is the context-compression path. Users are indirectly // waiting (run-loop pauses to compact), but failures can be absorbed // by falling back to larger context. Label so retry diagnostics // distinguish compact failures from main-thread failures. // // compact 是上下文压缩路径. 用户间接等待 (run-loop 暂停做压缩), // 但失败可由回退到更大上下文吸收. 标记让重试诊断区分 compact 失败 // 与主线程失败. ctx = retry.WithQuerySource(ctx, "compact") ch, err := provider.Stream(ctx, req) if err != nil { return "", fmt.Errorf("provider stream: %w", err) } var sb strings.Builder for evt := range ch { switch e := evt.(type) { case *flyto.TextDeltaEvent: sb.WriteString(e.Text) case *flyto.ErrorEvent: return "", fmt.Errorf("provider stream error: %w", e.Err) } } summary := strings.TrimSpace(sb.String()) if summary == "" { summary = buildFallbackSummary(messages) } return summary, nil } // 历史包袱(LEGACY): 早期方案 generateSummary 函数在此处--直连 Anthropic HTTP API(x-api-key 头, // /v1/messages 端点),已删除.所有摘要生成现在统一走 generateSummaryViaProvider 路径, // 通过 flyto.ModelProvider 接口调用,对供应商完全无感知. // 原方案:generateSummary(messages, apiKey, baseURL, policy, bearerAuth, httpClient, model) // buildFallbackSummary 构建降级摘要. // 当压缩 API 返回空结果时,用最近 compactFallbackMessages(5) 条消息的前 500 字符拼接. // // 精妙之处(CLEVER): 压缩失败时的最后防线-- // 取最近 5 条消息拼接为 fallback 摘要,确保 Agent 不会失忆. // 这比空字符串好:至少保留了最近的上下文. // 替代方案:原来取最近 3 条(修改前代码);直接返回固定字符串(信息量为零). func buildFallbackSummary(messages []CompactMessage) string { if len(messages) == 0 { return "Conversation history was compacted." } // 取最近 compactFallbackMessages 条消息 start := len(messages) - compactFallbackMessages if start < 0 { start = 0 } recent := messages[start:] var parts []string for _, msg := range recent { var content string if err := json.Unmarshal(msg.Content, &content); err != nil { content = string(msg.Content) } if len(content) > 500 { content = content[:500] + "..." } if content != "" { parts = append(parts, fmt.Sprintf("[%s]: %s", msg.Role, content)) } } if len(parts) == 0 { return "Conversation history was compacted." } return strings.Join(parts, "\n") } // MicroCompact 执行轻量级上下文修剪. // 改进版: // - 智能截断:保留头尾而不只是头部(尾部通常有结论/错误信息) // - 工具错误输出保留完整内容(错误信息通常更重要) // - 区分 text 和 tool_result 类型的截断策略 // // 策略: // - 只修剪距离最近轮次较远的消息(保留最近 recentTurnsToKeep*2 条不动) // - 只修剪 tool_result 类型的内容块 // - 截断超过 microCompactMaxLen 字符的内容(保留头尾) // - 标记为错误的 tool_result 不截断(错误信息通常更重要) func MicroCompact(messages []CompactMessage) []CompactMessage { if len(messages) == 0 { return messages } // 保留最近的消息不动 keepCount := recentTurnsToKeep * 2 if keepCount >= len(messages) { return messages } result := make([]CompactMessage, len(messages)) copy(result, messages) // 只修剪较旧的消息 for i := 0; i < len(result)-keepCount; i++ { msg := &result[i] if msg.Role != "user" { continue } // 尝试解析为 content block 数组 var blocks []map[string]any if err := json.Unmarshal(msg.Content, &blocks); err != nil { // 不是数组格式(可能是简单字符串),跳过 continue } modified := false for j, block := range blocks { blockType, _ := block["type"].(string) if blockType != "tool_result" { continue } // 检查是否为错误输出 -- 错误信息不截断,因为更重要 isError, _ := block["is_error"].(bool) if isError { continue } content, ok := block["content"].(string) if !ok || len(content) <= microCompactMaxLen { continue } // 升华改进(ELEVATED): 尾部保留长度按内容总长度动态计算-- // 在非编程场景(如法律文书摘要,医疗报告),结论/诊断往往在末尾, // 且越长的文档结论部分越长.用 sqrt 函数让尾部占比随长度增长而缓慢增大, // 比固定 2000 字符更适应不同长度的内容. // 替代方案:<原方案固定 tailLen = microCompactTailLen (2000)> tailLen := int(math.Sqrt(float64(len(content)))) * 10 if tailLen < microCompactTailLen { tailLen = microCompactTailLen } if tailLen > microCompactMaxLen/2 { tailLen = microCompactMaxLen / 2 // 尾部不超过保留总量的一半 } headLen := microCompactMaxLen - tailLen if headLen < 1000 { headLen = 1000 tailLen = microCompactMaxLen - headLen } if headLen+tailLen > len(content) { // 内容不够长,不需要截断 continue } truncatedContent := content[:headLen] + fmt.Sprintf("\n\n... [micro-compact: %d chars truncated] ...\n\n", len(content)-headLen-tailLen) + content[len(content)-tailLen:] blocks[j]["content"] = truncatedContent modified = true } if modified { newContent, err := json.Marshal(blocks) if err == nil { result[i].Content = newContent } } } return result } // --- 电路断路器 --- // CompactCircuitBreaker 是压缩操作的电路断路器. // 连续失败 maxFailures 次后停止尝试,避免反复调用失败的 API. // 一旦成功,重置失败计数. // // 升华改进(ELEVATED): 加 mutex 保护--断路器模式源自微服务架构(Netflix Hystrix), // 在任何多线程环境(不限于编程)都可能被并发访问:多个压缩请求同时检查/更新状态. // 原方案无锁在单线程 Engine 中无问题,但断路器作为独立类型应自带线程安全保证, // 遵循"对象管理自身一致性"原则. // 替代方案:<原方案无 mutex,依赖外部调用者保证单线程访问> // // 升华改进(ELEVATED): 断路器区分错误类型 + 时间重置. // // 改进 1:rate limit (429/529) 不计入失败次数. // 反向思考:早期方案不区分看似有道理(rate limit 时重试也没用). // 但问题是:rate limit 通常 30 秒恢复,断路器关闭后要重启才能重试. // 不计入的话,rate limit 过了自然能压缩. // // 改进 2:时间重置(5 分钟后自动 Reset). // 替代方案:半开状态 + 探测请求(更精细但更复杂,需要后台定时器). // 反向思考后选择时间重置:简单,不需要后台 goroutine, // 5 分钟足以让大多数临时问题恢复. type CompactCircuitBreaker struct { mu sync.Mutex failures int maxFailures int lastFailure time.Time resetAfter time.Duration // 时间重置:超过此时间自动恢复(默认 5 分钟) } // defaultCircuitBreakerResetAfter 是断路器默认的时间重置间隔. // 精妙之处(CLEVER): 5 分钟足以让大多数临时性 API 问题恢复 // (部署切换,临时过载等),又不至于太短导致频繁重试. const defaultCircuitBreakerResetAfter = 5 * time.Minute // NewCompactCircuitBreaker 创建一个新的断路器,maxFailures 是最大连续失败次数. func NewCompactCircuitBreaker(maxFailures int) *CompactCircuitBreaker { if maxFailures <= 0 { maxFailures = 3 } return &CompactCircuitBreaker{ maxFailures: maxFailures, resetAfter: defaultCircuitBreakerResetAfter, } } // ShouldAttempt 判断是否应该尝试压缩. // 如果连续失败次数已达上限,检查时间重置:超过 resetAfter 自动恢复. // // 升华改进(ELEVATED): 时间重置机制--超过 resetAfter 后自动清零失败计数. // 反向思考:半开状态(只允许一个探测请求)更精细,但需要后台 goroutine // 和额外状态管理.时间重置在单次检查中完成,无后台开销. func (cb *CompactCircuitBreaker) ShouldAttempt() bool { cb.mu.Lock() defer cb.mu.Unlock() // 时间重置:超过 resetAfter 自动恢复 if cb.failures >= cb.maxFailures && !cb.lastFailure.IsZero() && time.Since(cb.lastFailure) > cb.resetAfter { cb.failures = 0 return true } return cb.failures < cb.maxFailures } // RecordFailure 记录一次失败. // // 升华改进(ELEVATED): isRateLimit=true 时不计入失败次数. // 反向思考:rate limit (429/529) 是临时性限流,通常 30 秒后恢复. // 如果计入失败,3 次 rate limit 就会关闭断路器, // 导致限流恢复后仍然无法压缩,必须重启才行. // 不计入的话,rate limit 过了自然恢复,无需重启. func (cb *CompactCircuitBreaker) RecordFailure(isRateLimit bool) { if isRateLimit { return // rate limit 不计入 } cb.mu.Lock() defer cb.mu.Unlock() cb.failures++ cb.lastFailure = time.Now() } // Reset 重置断路器(压缩成功时调用). func (cb *CompactCircuitBreaker) Reset() { cb.mu.Lock() defer cb.mu.Unlock() cb.failures = 0 cb.lastFailure = time.Time{} } // Failures 返回当前连续失败次数. func (cb *CompactCircuitBreaker) Failures() int { cb.mu.Lock() defer cb.mu.Unlock() return cb.failures } // lastFailureTime 返回最近一次失败的时间(供持久化使用). func (cb *CompactCircuitBreaker) lastFailureTime() time.Time { cb.mu.Lock() defer cb.mu.Unlock() return cb.lastFailure } // --- Compressor(有状态的压缩管理器) --- // Compressor 负责上下文压缩. // 对应原项目中 autoCompact / microCompact / sessionMemoryCompact. // // Compressor 包装了 Compact 和 MicroCompact 函数, // 提供有状态的压缩管理(追踪阈值,决定何时触发). // // 升华改进(ELEVATED): 内嵌断路器实例,CompactTiered 主流程自动管理断路器状态. // 替代方案:断路器外置,由调用者管理(早期方案设计,断路器和压缩器分离增加调用复杂度). type Compressor struct { // threshold 是自动压缩的 token 阈值 threshold int // policy 压缩策略(可选,nil 时使用 DefaultCodePolicy) policy CompactionPolicy // restoreManager 恢复管理器(可选) restoreManager *RestoreManager // circuitBreaker 内嵌断路器,跟踪压缩失败次数 circuitBreaker *CompactCircuitBreaker // observer 可观测性接口,nil 时用 noopCompactObserver 兜底. // 升华改进(ELEVATED): 压缩是引擎最重的内部操作之一(调用外部 API,大量 token 处理), // 没有埋点就无法回答"压缩花了多久""压缩效果如何""为什么上下文突然变小"等问题. // 替代方案:只在 engine.go 层面埋点(丢失压缩内部的策略选择和分块细节). observer flyto.EventObserver // persister 断路器状态持久化接口(可选). // nil = 纯内存(向后兼容),非 nil = 跨进程持久化. // 升华改进(ELEVATED): 持久化让 daemon 模式(systemd/K8s)崩溃重启后 // 不再从头浪费 3 次 API 调用.见 compact_persist.go. // 替代方案:构造函数参数(破坏所有现有调用点,NewCompressor 已有 3 个参数). persister CircuitBreakerPersister // contextWindowFn 实例级上下文窗口查询函数. // 优先于全局 contextWindowProvider,nil 时降级使用全局. // // 升华改进(ELEVATED): P0-1 修复--取代全局 SetContextWindowProvider. // 同进程多 Engine 实例(multi-tenant SaaS)各自持有独立函数,互不覆盖. // 旧 API SetContextWindowProvider 保留向后兼容,单 Engine 场景无需迁移. contextWindowFn ContextWindowFunc // compactModelFn 实例级压缩用模型查询函数. // 优先于全局 compactModelProvider,nil 时降级使用全局. compactModelFn CompactModelFunc // provider 是注入的模型 Provider(flyto.ModelProvider 接口). // 所有摘要生成统一走 generateSummaryViaProvider 路径,对供应商完全无感知. // 历史包袱(LEGACY): 早期方案还持有 httpClient CompactHTTPClient 字段用于 Anthropic HTTP 直连, // 已随 generateSummary 函数一同删除. provider flyto.ModelProvider } // NewCompressor 创建压缩器. // threshold 是触发自动压缩的 token 阈值;传入 0 则使用默认阈值(200000 - 13000). // provider 是模型调用接口;传入 nil 则退化为仅支持微压缩(无摘要生成). // // 升华改进(ELEVATED): 签名从 (threshold, apiKey, baseURL, bearerAuth) 简化为 (threshold, provider). // 旧签名将 Anthropic 特有认证字段泄漏给所有消费方(OpenAI/Vertex/本地模型无 x-api-key 概念). // 新签名通过 flyto.ModelProvider 接口统一所有 provider,调用方不感知底层协议. // 替代方案(原方案): 构造函数直接持有 apiKey/baseURL/bearerAuth,provider 通过 SetProvider 注入-- // 遗留了"旧路径 vs Provider 路径"的双重代码分支,在每个摘要生成点都要 if/else 分叉. func NewCompressor(threshold int, provider flyto.ModelProvider) *Compressor { if threshold <= 0 { threshold = 200000 - compactReserveTokens } return &Compressor{ threshold: threshold, provider: provider, circuitBreaker: NewCompactCircuitBreaker(3), } } // SetObserver 设置可观测性接口. // 升华改进(ELEVATED): Setter 注入而非构造函数参数-- // 因为 NewCompressor 已有三个参数且在多处调用,加参数会破坏所有调用点. // Setter 注入让现有代码零改动即可工作(observer 为 nil 时自动用 noop 兜底). // 替代方案:修改 NewCompressor 签名(破坏所有现有调用点). func (c *Compressor) SetObserver(obs flyto.EventObserver) { c.observer = obs } // getObserver 获取 observer,nil 时返回 noop 兜底. func (c *Compressor) getObserver() flyto.EventObserver { if c.observer != nil { return c.observer } return &noopCompactObserver{} } // SetPersister 设置断路器状态持久化接口,并立即加载历史状态(如果有). // // 升华改进(ELEVATED): Setter 注入与 SetObserver 模式一致,不破坏 NewCompressor 调用点. // SetPersister 被调用时立即执行一次 Load(),将历史失败计数注入内存断路器. // 这样进程重启后断路器从上次离开的状态继续,而不是从零开始. // // fail-open 设计:如果 Load() 失败(文件损坏,权限问题), // 记录 observer event 但继续运行,断路器从零开始(不影响功能). func (c *Compressor) SetPersister(p CircuitBreakerPersister) { c.persister = p if p == nil { return } state, err := p.Load() if err != nil { // fail-open:持久化加载失败不阻断压缩功能 c.getObserver().Event("compact_breaker_load_failed", map[string]any{ "error": err.Error(), }) return } if state == nil || state.Failures == 0 { return // 无历史状态,断路器从零开始 } // 将历史失败计数注入内存断路器 // 精妙之处(CLEVER): 直接设置 failures 字段而非调用 N 次 RecordFailure, // 避免触发 persister.Save() 形成写-读-写循环. // 这里绕过了 RecordFailure 的 mutex,因为 SetPersister 必须在并发访问前调用. c.circuitBreaker.mu.Lock() c.circuitBreaker.failures = state.Failures c.circuitBreaker.lastFailure = state.LastFailedAt c.circuitBreaker.mu.Unlock() c.getObserver().Event("compact_breaker_state_restored", map[string]any{ "failures": state.Failures, "last_failed_at": state.LastFailedAt, }) } // SetContextWindowFn 设置实例级上下文窗口查询函数. // 优先于全局 SetContextWindowProvider,用于同进程多 Engine 场景. // 旧的全局 API 保留向后兼容,单 Engine 场景无需迁移. func (c *Compressor) SetContextWindowFn(fn ContextWindowFunc) { if fn != nil { c.contextWindowFn = fn } } // SetCompactModelFn 设置实例级压缩用模型查询函数. // 优先于全局 SetCompactModelProvider,用于同进程多 Engine 场景. func (c *Compressor) SetCompactModelFn(fn CompactModelFunc) { if fn != nil { c.compactModelFn = fn } } // SetProvider 更新模型 Provider(支持运行时热切换). // NewCompressor 构造时已通过参数注入 provider,此方法用于构造后替换(如插件热加载场景). // 传入 nil 时无效(防止意外清除已注入的 provider). func (c *Compressor) SetProvider(p flyto.ModelProvider) { if p != nil { c.provider = p } } // effectiveContextWindow 获取给定模型的上下文窗口大小. // 优先使用实例级函数,nil 时降级到全局 contextWindowProvider. func (c *Compressor) effectiveContextWindow(model string) int { if c.contextWindowFn != nil { return c.contextWindowFn(model) } return contextWindowProvider(model) } // effectiveCompactModel 获取压缩用模型 ID. // 优先使用实例级函数,nil 时降级到全局 compactModelProvider. func (c *Compressor) effectiveCompactModel() string { if c.compactModelFn != nil { return c.compactModelFn() } return compactModelProvider() } // saveBreakerState 将当前断路器状态持久化(如果有 persister). // 在 RecordFailure / Reset 调用后触发,fail-open. func (c *Compressor) saveBreakerState() { if c.persister == nil { return } state := BreakerState{ Failures: c.circuitBreaker.Failures(), LastFailedAt: c.circuitBreaker.lastFailureTime(), } if err := c.persister.Save(state); err != nil { c.getObserver().Event("compact_breaker_save_failed", map[string]any{ "error": err.Error(), }) } } // SetPolicy 设置压缩策略. func (c *Compressor) SetPolicy(p CompactionPolicy) { c.policy = p } // SetRestoreManager 设置恢复管理器. func (c *Compressor) SetRestoreManager(rm *RestoreManager) { c.restoreManager = rm } // Threshold 返回当前压缩阈值. func (c *Compressor) Threshold() int { return c.threshold } // ShouldCompact 检查是否需要触发自动压缩. func (c *Compressor) ShouldCompact(messages []CompactMessage) bool { return EstimateTokens(messages) > c.threshold } // DoCompact 执行压缩操作. // 优先走 CompactTiered(provider 路径 + 三层降级),无 provider 时退化为 MicroCompact 降级. // // 升华改进(ELEVATED): 原实现调用 Compact(msgs, apiKey, baseURL)-- // 绕过了 provider 抽象,始终走 Anthropic HTTP 直连,无法与 OpenAI/Vertex 等 Provider 协同. // 替代方案(原方案): 保留 Compact(apiKey,baseURL) 调用,provider 路径在 CompactTiered 里另起炉灶-- // 两条路径并存,调用方需要知道"用哪个入口",复杂度增加. func (c *Compressor) DoCompact(messages []CompactMessage) (*CompactResult, error) { return c.CompactTiered(context.Background(), messages, c.policy) } // DoCompactWithHint is DoCompact plus an opts pipe for callers that have // extra information (currently: precise token gap from a PromptTooLong // API error). engine.forceCompact uses this variant after a ctx-too-long // error so the compressor's truncateAndRetryCompact can take a precise // stride instead of shaving one group at a time. The plain DoCompact // stays for reflex-path callers (maybeCompact) that have no hint. // // DoCompactWithHint 是 DoCompact 加一个 opts 管道, 给有额外信息的调用方用 // (目前: PromptTooLong API 错误解出的精确 token gap). engine.forceCompact // 在 ctx-too-long 错误后走这个 variant, 压缩器的 // truncateAndRetryCompact 就能精确跳步, 不用一组一组砍. 纯 DoCompact // 留给 reflex-path 调用方 (maybeCompact) -- 它们没有 hint. func (c *Compressor) DoCompactWithHint(messages []CompactMessage, opts ...CompactOption) (*CompactResult, error) { return c.CompactTiered(context.Background(), messages, c.policy, opts...) } // DoMicroCompact 执行微压缩操作. func (c *Compressor) DoMicroCompact(messages []CompactMessage) []CompactMessage { tokensBefore := EstimateTokens(messages) result := MicroCompact(messages) tokensAfter := EstimateTokens(result) // 埋点说明:MicroCompact 是轻量修剪,不调 API,但频率高(每轮都可能触发). // 记录 token 变化用于评估修剪效果--如果 before == after 说明没有可修剪内容. c.getObserver().Event("micro_compact_triggered", map[string]any{ "tokens_before": tokensBefore, "tokens_after": tokensAfter, "messages": len(messages), }) return result } // CircuitBreaker 返回内嵌的断路器(供外部检查状态). func (c *Compressor) CircuitBreaker() *CompactCircuitBreaker { return c.circuitBreaker } // --- 三层压缩降级 --- // 升华改进(ELEVATED): 三层压缩降级策略. // // 反向思考结论:早期方案的"砍了再试"在 95% 场景下够用且简单可靠. // 不应该替换它,而是在它失败时补一层分块压缩. // // 第 1 层:单次压缩(早期方案方案) // 条件:压缩请求能放下压缩模型的上下文窗口 // 做法:完整对话 → API 生成摘要 // 来源:早期方案设计,简单有效,95% 场景走这条路 // // 第 2 层:砍头重试(早期方案方案) // 条件:第 1 层失败(PTL prompt too long) // 做法:砍掉最旧的消息组,缩短后重试单次压缩 // 来源:早期方案 truncateHeadForPTLRetry,处理"略微超限"的场景 // 反向思考:虽然不够优雅但经过大量生产验证,代码简单 bug 少 // // 第 3 层:分块压缩(新增) // 条件:第 2 层砍了 N 次还是 PTL(会话极端长) // 做法:按轮次边界分块,每块独立压缩,合并摘要 // 来源:新设计,解决"200K 会话连压缩请求都超限"的极端场景 // 反向思考:比第 2 层复杂(并行调用,摘要拼接,跨块上下文丢失), // 所以只在第 2 层失败时才启用.95% 的场景不走这里. // // 替代方案: // A. 只用分块压缩(更统一但 95% 场景浪费 API 调用 + 摘要连贯性差) // B. 只用砍头重试(早期方案方案,极端长会话会卡死 - 用户实际遇到过) const ( // maxTruncateRetries 砍头重试的最大次数. // 精妙之处(CLEVER): 来自早期方案 truncateHeadForPTLRetry,经过大量生产验证. // 每次砍一个消息组(GroupByAPIRound 的一个 group),最多砍 5 次. // 反向思考:为什么不一次性估算该砍多少然后砍到位? // 因为 token 估算有 20% 误差,一次性计算可能砍多或砍少. // 逐步砍的方式虽然可能多试几次,但保证不会砍过头. maxTruncateRetries = 5 // maxTokensPerChunk 分块压缩时每块的最大 token 数. // 升华改进(ELEVATED): 50K 确保任何模型都能处理单块. // Haiku 上下文窗口 200K,留足够余量给系统提示和输出. // 反向思考:更大的块减少 API 调用次数但增加单块失败风险; // 更小的块增加调用次数但每块更安全.50K 是在两者之间的平衡点. maxTokensPerChunk = 50000 // maxParallelChunks 分块压缩时的最大并行数. // 精妙之处(CLEVER): 限制并发防止瞬间打满 API rate limit. // 4 个并行块已足够覆盖 200K 会话(200K/50K=4 块). maxParallelChunks = 4 ) // CompactTiered 执行三层压缩降级. // 这是压缩的主入口,依次尝试三层策略,并自动管理断路器状态. // // 升华改进(ELEVATED): 三层降级确保从普通会话到极端长会话都能处理, // 同时保持 95% 场景走最简单的路径. // 替代方案:只有单次压缩(早期方案,极端长会话卡死). func (c *Compressor) CompactTiered( ctx context.Context, messages []CompactMessage, policy CompactionPolicy, opts ...CompactOption, ) (*CompactResult, error) { var cfg compactOptions for _, opt := range opts { opt(&cfg) } obs := c.getObserver() // 检查断路器 if !c.circuitBreaker.ShouldAttempt() { // 埋点说明:断路器打开意味着压缩连续失败多次,系统处于降级状态. // 这是需要人工介入的严重事件--如果不告警,上下文会持续膨胀直到 OOM 或 413. obs.Event("compact_circuit_breaker_open", map[string]any{ "failures": c.circuitBreaker.Failures(), }) return nil, fmt.Errorf("compact circuit breaker open: too many failures") } if policy == nil { policy = &DefaultCodePolicy{} } groups := GroupByAPIRound(messages) // 第 1 层:估算是否能放下压缩模型的上下文窗口 totalTokens := EstimateTokens(messages) // 埋点说明:CompactTiered 开始是追踪三层降级路径的起点-- // tokens_before 和 total_groups 用于事后分析"走了哪层"和"为什么要降级". obs.Event("compact_started", map[string]any{ "strategy": "tiered", "tokens_before": totalTokens, "total_groups": len(groups), }) compactModelWindow := c.getContextWindow() // 精妙之处(CLEVER): 留 15% 余量防止估算误差导致 PTL. // token 估算有约 20% 误差,15% 余量覆盖大多数情况. if totalTokens <= compactModelWindow*85/100 { // 第 1 层:单次压缩 result, err := c.singleCompact(ctx, messages, groups, policy) if err == nil { c.circuitBreaker.Reset() c.saveBreakerState() return result, nil } // 不是 PTL 错误 → 记录失败并返回 if !isPromptTooLong(err) { c.circuitBreaker.RecordFailure(isRateLimit(err)) c.saveBreakerState() return nil, err } // PTL → 进入第 2 层 } // 第 2 层:砍头重试 result, err := c.truncateAndRetryCompact(ctx, messages, groups, policy, cfg.tokenGap) if err == nil { c.circuitBreaker.Reset() c.saveBreakerState() // 埋点说明:走到第 2 层说明第 1 层 PTL,需要关注是否频繁发生. obs.Event("compact_completed", map[string]any{ "strategy": "tiered_truncate", "tokens_before": totalTokens, "tokens_after": result.TokensAfter, "duration_ms": result.DurationMs, "restored_items": len(result.RestoredItems), }) return result, nil } if !isPromptTooLong(err) { c.circuitBreaker.RecordFailure(isRateLimit(err)) c.saveBreakerState() obs.Error(err, map[string]any{ "strategy": "tiered", "phase": "truncate_retry", }) return nil, err } // 第 3 层:分块压缩 result, err = c.chunkedCompact(ctx, messages, groups, policy) if err != nil { c.circuitBreaker.RecordFailure(isRateLimit(err)) c.saveBreakerState() obs.Error(err, map[string]any{ "strategy": "tiered", "phase": "chunked_compact", }) return nil, err } c.circuitBreaker.Reset() c.saveBreakerState() // 埋点说明:走到第 3 层是极端场景,需要高优告警. obs.Event("compact_completed", map[string]any{ "strategy": "tiered_chunked", "tokens_before": totalTokens, "tokens_after": result.TokensAfter, "duration_ms": result.DurationMs, "restored_items": len(result.RestoredItems), }) return result, nil } // singleCompact 单次压缩(第 1 层). // 精妙之处(CLEVER): 来自早期方案设计,简单有效,95% 场景走这条路. // 完整对话 → API 生成摘要 → 摘要 + 最近消息. func (c *Compressor) singleCompact( ctx context.Context, messages []CompactMessage, groups []MessageGroup, policy CompactionPolicy, ) (*CompactResult, error) { return c.compactFull(ctx, groups, policy, EstimateTokens(messages)) } // truncateAndRetryCompact 砍掉最旧的消息组后重试压缩. // // 精妙之处(CLEVER): 来自早期方案 truncateHeadForPTLRetry,经过大量生产验证. // 每次砍一个消息组(GroupByAPIRound 的一个 group),最多砍 maxTruncateRetries 次. // 这样逐步缩短而不是一次砍太多(保留尽可能多的上下文). // // 反向思考:为什么不一次性估算该砍多少然后砍到位? // 因为 token 估算有 20% 误差,一次性计算可能砍多或砍少. // 逐步砍的方式虽然可能多试几次,但保证不会砍过头. // // 2026-04-19 updated: when tokenGap > 0 we DO know the precise excess // count (parsed from the API's own "137500 tokens > 135000 maximum" // error message by api.ParseTokenGap; see internal/transport/errors.go). // The 20% estimate error is irrelevant because the number comes from // the server, not from our tokenizer. So we do one precise-stride pass // first: accumulate oldest groups' tokens until the accumulation meets // or exceeds tokenGap, drop them in one pass, retry compactFull once. // If that attempt still returns PromptTooLong (rare -- means the server's // own count was off, or new context grew during the round-trip), fall // through to the original incremental loop with currentGroups already // trimmed. This is exactly the "let the compactor jump" contract that // api.APIError.TokenGap's field doc promises. // // 2026-04-19 更新: tokenGap > 0 时我们有精确溢出量 (由 api.ParseTokenGap // 从 "137500 tokens > 135000 maximum" 这类服务端错误原话解出, 见 // internal/transport/errors.go). 20% 的 tokenizer 估算误差不适用 -- // 数字来自服务端不是我们本地估算. 于是先做一次精确跳步: 从最旧的组开始 // 累加 tokens, 累加量 >= tokenGap 就一次丢掉这些组, 再试一次 // compactFull. 那次还 PromptTooLong (罕见, 服务端自己计数跑偏或者 // round-trip 期间新上下文又长了) 就 fall through 到原来的逐组 loop, // currentGroups 已经是砍过的. 这正是 api.APIError.TokenGap 字段 godoc // 承诺的 "让压缩器一步跳到位". func (c *Compressor) truncateAndRetryCompact( ctx context.Context, messages []CompactMessage, groups []MessageGroup, policy CompactionPolicy, tokenGap int, ) (*CompactResult, error) { keepRounds := recentTurnsToKeep if policy != nil { if k := policy.MaxRecentRoundsToKeep(); k > 0 { keepRounds = k } } currentGroups := groups // Precise-stride pass (only when we got tokenGap from the API error). // 精确跳步 (仅在从 API 错误拿到 tokenGap 时). if tokenGap > 0 && len(currentGroups) > keepRounds+1 { acc := 0 dropEnd := 1 // keep group 0 (preamble) maxDrop := len(currentGroups) - keepRounds - 1 for i := 1; i <= maxDrop; i++ { acc += EstimateTokens(currentGroups[i].Messages) dropEnd = i + 1 if acc >= tokenGap { break } } if dropEnd > 1 { // Snapshot the Index of each dropped group BEFORE the slice // rebuild below overwrites currentGroups. Operators grepping // "compact_token_gap_stride" see exactly which API-round // groups were dropped (e.g. [1, 2, 3]) rather than only the // count. Without this, MessageGroup.Index is only ever // written, never read outside tests -- a scanner-flagged // dead field wearing real diagnostic intent. // // 在下方 slice 重建覆盖 currentGroups 之前, 先快照每个被丢 // group 的 Index. 运维 grep "compact_token_gap_stride" 能看到 // drop 了哪几个 API-round group (例如 [1, 2, 3]) 而非只有 // 数量. 否则 MessageGroup.Index 仅被写不被读 (test 外), 是 // scanner 标记的 dead field 却有真实诊断意图. droppedIndices := make([]int, 0, dropEnd-1) for i := 1; i < dropEnd; i++ { droppedIndices = append(droppedIndices, currentGroups[i].Index) } currentGroups = append(append([]MessageGroup(nil), currentGroups[:1]...), currentGroups[dropEnd:]...) c.getObserver().Event("compact_token_gap_stride", map[string]any{ "token_gap": tokenGap, "dropped_groups": dropEnd - 1, "dropped_group_indices": droppedIndices, "dropped_tokens": acc, "groups_after": len(currentGroups), }) tokensBefore := EstimateTokens(messages) result, err := c.compactFull(ctx, currentGroups, policy, tokensBefore) if err == nil { return result, nil } if !isPromptTooLong(err) { return nil, err } // Still PTL -- server's own count was off or new context // grew. Fall through to the incremental loop with currentGroups // already trimmed. // // 仍 PTL -- 服务端计数跑偏或新上下文又长了. fall through 到 // 逐组 loop, currentGroups 已砍过. } } // 逐步砍掉最旧的组 var lastErr error for attempt := 0; attempt < maxTruncateRetries; attempt++ { // 至少保留最近 keepRounds 组 + 1 组用于压缩 if len(currentGroups) <= keepRounds+1 { break } // 砍掉最旧的一组(跳过 preamble 后的第一个非前言组) // 精妙之处(CLEVER): 保留 preamble(group 0)因为它包含系统上下文. // 砍的是 group 1(最旧的对话轮次),这样不丢系统指令. if len(currentGroups) > 2 { currentGroups = append(currentGroups[:1], currentGroups[2:]...) } else { currentGroups = currentGroups[1:] } // 用砍后的组重建消息列表 var truncatedMessages []CompactMessage for _, g := range currentGroups { truncatedMessages = append(truncatedMessages, g.Messages...) } tokensBefore := EstimateTokens(messages) result, err := c.compactFull(ctx, currentGroups, policy, tokensBefore) if err == nil { return result, nil } lastErr = err if !isPromptTooLong(err) { // 非 PTL 错误,直接返回 return nil, err } // PTL → 继续砍 } // 所有重试都失败,返回最后一个错误 if lastErr == nil { lastErr = fmt.Errorf("truncate retry: insufficient groups to compact") } return nil, lastErr } // chunkedCompact 按轮次边界分块,各块独立压缩后合并摘要. // // 升华改进(ELEVATED): 解决"砍头重试也失败"的极端场景. // 按 GroupByAPIRound 的轮次边界切分,每块不超过 maxTokensPerChunk. // 各块可以并行压缩(goroutine),最后拼接摘要. // // 反向思考记录: // - 跨块上下文丢失:块 1 的 bug 和块 3 的修复可能关联不上. // 接受这个损失,因为走到第 3 层说明会话极端长, // 完美的摘要已不可能,有摘要比卡死好. // - 并行更贵:4 块 = 4 次 API 调用.但只在极端场景触发, // 正常场景走第 1/2 层不会到这里. // - 摘要连贯性:各块独立摘要后拼接,不如单次压缩连贯. // 但每块内部是连贯的(按轮次边界切分保证完整对话轮次). // // 替代方案:本地截断不调 API(更简单,但丢失所有历史信息. // 反向思考后否决:API 不可用时主对话也不行,截断没意义). func (c *Compressor) chunkedCompact( ctx context.Context, messages []CompactMessage, groups []MessageGroup, policy CompactionPolicy, ) (*CompactResult, error) { start := time.Now() tokensBefore := EstimateTokens(messages) keepRounds := recentTurnsToKeep if policy != nil { if k := policy.MaxRecentRoundsToKeep(); k > 0 { keepRounds = k } } // 分离最近消息(保留不压缩) splitGroupIdx := len(groups) - keepRounds if splitGroupIdx < 1 { splitGroupIdx = 1 } if splitGroupIdx >= len(groups) { // 组太少,无法分块 var allMsgs []CompactMessage for _, g := range groups { allMsgs = append(allMsgs, g.Messages...) } return &CompactResult{ Messages: allMsgs, TokensBefore: tokensBefore, TokensAfter: tokensBefore, Strategy: StrategyFull, DurationMs: time.Since(start).Milliseconds(), }, nil } oldGroups := groups[:splitGroupIdx] recentGroups := groups[splitGroupIdx:] // 按 token 预算分块 chunks := chunkByTokenBudget(oldGroups, maxTokensPerChunk) if len(chunks) == 0 { return nil, fmt.Errorf("chunked compact: no chunks created") } // 埋点说明:分块压缩是第 3 层降级路径的核心操作-- // chunks 数量和并行度是 API 调用成本的直接指标. c.getObserver().Event("compact_chunked", map[string]any{ "chunks": len(chunks), "parallel": maxParallelChunks, }) // 精妙之处(CLEVER): 用 goroutine + channel 控制并发数(maxParallelChunks), // 任何一块失败不影响其他块.最终合并时跳过失败的块. // 如果所有块都失败 → 返回错误(极端中的极端,断路器处理). type chunkResult struct { index int summary string err error } results := make([]chunkResult, len(chunks)) sem := make(chan struct{}, maxParallelChunks) var wg sync.WaitGroup for i, chunk := range chunks { wg.Add(1) go func(idx int, chunkGroups []MessageGroup) { defer wg.Done() sem <- struct{}{} // 获取信号量 defer func() { <-sem }() // 释放信号量 // 收集块内所有消息 var chunkMessages []CompactMessage for _, g := range chunkGroups { chunkMessages = append(chunkMessages, g.Messages...) } // 剥离图像后通过 Provider 调用摘要 API stripped := StripImages(chunkMessages) var summary string var err error if c.provider != nil { summary, err = generateSummaryViaProvider(ctx, stripped, c.provider, c.effectiveCompactModel(), policy) } else { err = fmt.Errorf("compact: no provider configured; inject via NewCompressor(threshold, provider)") } results[idx] = chunkResult{index: idx, summary: summary, err: err} }(i, chunk) } wg.Wait() // 合并摘要(跳过失败的块) var summaries []string successCount := 0 for _, r := range results { if r.err == nil && r.summary != "" { summaries = append(summaries, fmt.Sprintf("[Part %d]\n%s", r.index+1, r.summary)) successCount++ } } // 如果所有块都失败 → 返回错误 if successCount == 0 { return nil, fmt.Errorf("chunked compact: all %d chunks failed", len(chunks)) } // 拼接摘要 combinedSummary := strings.Join(summaries, "\n\n") // 收集最近消息 var recentMessages []CompactMessage for _, g := range recentGroups { recentMessages = append(recentMessages, g.Messages...) } // 构建压缩后消息 summaryContent, _ := json.Marshal("[Previous conversation summary (chunked)]\n" + combinedSummary) compacted := make([]CompactMessage, 0, 1+len(recentMessages)) compacted = append(compacted, CompactMessage{ Role: "user", Content: summaryContent, }) compacted = append(compacted, recentMessages...) return &CompactResult{ Messages: compacted, Summary: combinedSummary, KeptMessages: recentMessages, Strategy: StrategyFull, TokensBefore: tokensBefore, TokensAfter: EstimateTokens(compacted), DurationMs: time.Since(start).Milliseconds(), }, nil } // chunkByTokenBudget 按轮次边界和 token 预算将消息组分块. // // 精妙之处(CLEVER): 切分点永远在完整对话轮次边界, // 不会把一个 user+assistant+tool_result 拆到两个块里. // 这保证每块内部的语义是完整的. func chunkByTokenBudget(groups []MessageGroup, budget int) [][]MessageGroup { if len(groups) == 0 { return nil } var chunks [][]MessageGroup var current []MessageGroup currentTokens := 0 for _, g := range groups { // 如果单个组就超预算,独立成一块 // 精妙之处(CLEVER): 不跳过超大组,而是让它单独成块. // 超大组通常包含大量工具输出,摘要后可以大幅压缩. if g.Tokens > budget && len(current) > 0 { chunks = append(chunks, current) current = nil currentTokens = 0 } if currentTokens+g.Tokens > budget && len(current) > 0 { chunks = append(chunks, current) current = nil currentTokens = 0 } current = append(current, g) currentTokens += g.Tokens } // 最后一块 if len(current) > 0 { chunks = append(chunks, current) } return chunks } // getContextWindow 获取压缩模型的上下文窗口大小. // 升华改进(ELEVATED): 使用实例级函数取代全局--多 Engine 场景各自用各自的模型配置. func (c *Compressor) getContextWindow() int { model := c.effectiveCompactModel() return c.effectiveContextWindow(model) } // isPromptTooLong 检测是否是 PTL(Prompt Too Long)错误. // 精妙之处(CLEVER): 检查多种 PTL 错误模式--不同 API 实现 // 返回的错误消息格式不同(原生 Anthropic vs 网关代理), // 需要宽泛匹配. func isPromptTooLong(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "prompt is too long") || strings.Contains(msg, "prompt too long") || strings.Contains(msg, "too many tokens") || strings.Contains(msg, "context length exceeded") || strings.Contains(msg, "maximum context length") || strings.Contains(msg, "http 413") } // isRateLimit 检测是否是 rate limit 错误 (429/529). // 精妙之处(CLEVER): 429 是标准 HTTP rate limit,529 是 Anthropic 自定义的过载状态码. // 两者都表示临时性限流,不应计入断路器失败次数. func isRateLimit(err error) bool { if err == nil { return false } msg := err.Error() return strings.Contains(msg, "HTTP 429") || strings.Contains(msg, "HTTP 529") || strings.Contains(strings.ToLower(msg), "rate limit") || strings.Contains(strings.ToLower(msg), "overloaded") }