package engine // normalizer.go -- 可组合的消息规范化管道. // // 升华改进(ELEVATED): 从硬编码 3 步升级为可组合管道. // 早期方案有 3 个 pass 串联(互相依赖,顺序脆弱). // Pipeline 模式让每个步骤独立,可测试,可替换. // 场景可以注册自己的步骤(仓储:传感器数据格式化,法律:脱敏). // 替代方案:硬编码所有步骤在一个大函数里(原始设计,随步骤增多变得不可维护). import ( "sort" "git.flytoex.net/yuanwei/flyto-agent/pkg/query" ) // MessageNormalizer 单个消息规范化步骤. // // 升华改进(ELEVATED): 从硬编码 3 步升级为可组合管道. // 早期方案有 12 个 pass 串联(互相依赖,顺序脆弱). // Pipeline 模式让每个步骤独立,可测试,可替换. // 场景可以注册自己的步骤(仓储:传感器数据格式化,法律:脱敏). // 替代方案:硬编码所有步骤在一个大函数里(原始设计,400 行,12 个 pass 互相依赖). type MessageNormalizer interface { Name() string Priority() int // 执行优先级(越小越先执行) Normalize(messages []query.Message) []query.Message } // NormalizePipeline 可组合的规范化管道. // // 精妙之处(CLEVER): 步骤通过 Priority 隐式排序,不显式声明依赖. // 早期方案的 3 步有显式的顺序依赖("先 strip orphan 再 filter empty 再 merge"), // 改一步可能破坏另一步.Priority 让每个步骤只关心自己排在第几, // 新增步骤不需要知道其他步骤的存在. // // 使用示例: // // pipeline := NewNormalizePipeline() // // 内置步骤自动注册 // // 场景可以追加: // pipeline.Add(&SensorDataNormalizer{}) // 仓储 // pipeline.Add(&RedactionNormalizer{}) // 法律 // // normalized := pipeline.Run(messages) type NormalizePipeline struct { steps []MessageNormalizer sorted bool // 步骤是否已按 Priority 排序 } // NewNormalizePipeline 创建管道,可选传入初始步骤. func NewNormalizePipeline(steps ...MessageNormalizer) *NormalizePipeline { p := &NormalizePipeline{ steps: make([]MessageNormalizer, 0, len(steps)), } for _, s := range steps { p.steps = append(p.steps, s) } if len(p.steps) > 1 { sort.Slice(p.steps, func(i, j int) bool { return p.steps[i].Priority() < p.steps[j].Priority() }) } p.sorted = true return p } // Add 向管道追加一个步骤. func (p *NormalizePipeline) Add(step MessageNormalizer) { p.steps = append(p.steps, step) p.sorted = false } // Remove 按名称移除一个步骤.返回是否成功移除. func (p *NormalizePipeline) Remove(name string) bool { for i, s := range p.steps { if s.Name() == name { p.steps = append(p.steps[:i], p.steps[i+1:]...) // sorted 不变--移除一个元素不破坏已排序的序列 return true } } return false } // Run 按 Priority 排序后依次执行每个步骤. // // 精妙之处(CLEVER): sorted 标记实现按需排序--构造时和 Add 后各排一次, // Run 热路径零排序开销.步骤数通常 < 20,排序本身 O(n log n) 很快, // 但 Run 在每轮 API 调用前都执行,消除不必要的 copy+sort 积少成多. func (p *NormalizePipeline) Run(messages []query.Message) []query.Message { if len(messages) == 0 { return messages } if len(p.steps) == 0 { return messages } if !p.sorted { sort.Slice(p.steps, func(i, j int) bool { return p.steps[i].Priority() < p.steps[j].Priority() }) p.sorted = true } result := messages for _, step := range p.steps { result = step.Normalize(result) } return result }