// Package memory 的相关性评分器接口及实现. // // 从包级函数 Score() 提升为接口 RelevanceScorer, // 支持不同场景注入不同的评分策略. package memory import ( "bufio" "context" "encoding/json" "fmt" "io" "sync" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" ) // RelevanceScorer 记忆相关性评分器接口. // // 升华改进(ELEVATED): 从包级函数提升为接口,支持场景可插拔. // 编程场景用文本相似度,仓储场景可能按 SKU/订单号匹配, // 法律场景可能按法条编号匹配.不同场景注册不同评分器. // 替代方案:硬编码文本相似度函数(原始设计,锁死评分算法). // // 参数传 *MemoryHeader 而非 name+description: // 宽接口缩窄容易,窄接口拓宽要改所有实现. // 评分器可以利用 Type,ModTime 等额外信息做更精准的评分. // // Shape: synchronous callback. Store calls Score synchronously during // FindRelevant to rank memory candidates for the current prompt; consumer // can plug keyword-based / embedding / LLM scorers. // // 形态: 同步回调. Store 在 FindRelevant 期间同步调 Score, 对当前 prompt // 排序记忆候选; 消费者可插入关键词 / embedding / LLM 等 scorer. type RelevanceScorer interface { Name() string Score(query string, header *MemoryHeader) float64 } // TextScorer 基于文本相似度的评分器(默认实现). // 算法:Jaccard-like + token 权重 + 子串匹配. // 这是从原始包级函数 Score() 重构而来,算法完全不变. type TextScorer struct{} func (s *TextScorer) Name() string { return "text" } // Score 计算查询与记忆头信息之间的文本相似度. // 复用现有逻辑:description 权重 0.7 + name 权重 0.3. func (s *TextScorer) Score(query string, header *MemoryHeader) float64 { if header == nil { return 0 } descScore := textScore(query, header.Frontmatter.Description) nameScore := textScore(query, header.Frontmatter.Name) return 0.7*descScore + 0.3*nameScore } // defaultScorer 是全局默认评分器实例. // 历史包袱(LEGACY): 保留为包级变量,供向后兼容的包装函数使用. var defaultScorer RelevanceScorer = &TextScorer{} // WeightedScorer 将评分器和权重绑定在一起. type WeightedScorer struct { Scorer RelevanceScorer Weight float64 } // CompositeScorer 将多个评分器加权组合. // // 升华改进(ELEVATED): 支持叠加而非替换(宪法第 8 条). // 编程评分器 + 仓储评分器可以共存,加权融合. // 替代方案:单评分器(锁定单场景). // // 使用示例: // // composite := NewCompositeScorer( // WeightedScorer{Scorer: &TextScorer{}, Weight: 0.7}, // WeightedScorer{Scorer: &WarehouseScorer{}, Weight: 0.3}, // ) type CompositeScorer struct { mu sync.RWMutex scorers []WeightedScorer } // NewCompositeScorer 创建一个新的组合评分器. func NewCompositeScorer(scorers ...WeightedScorer) *CompositeScorer { return &CompositeScorer{ scorers: append([]WeightedScorer{}, scorers...), } } func (cs *CompositeScorer) Name() string { return "composite" } // Score 计算加权平均分数(权重归一化). func (cs *CompositeScorer) Score(query string, header *MemoryHeader) float64 { cs.mu.RLock() defer cs.mu.RUnlock() var totalWeight, totalScore float64 for _, ws := range cs.scorers { totalWeight += ws.Weight totalScore += ws.Weight * ws.Scorer.Score(query, header) } if totalWeight == 0 { return 0 } return totalScore / totalWeight } // Add 动态添加一个加权评分器. func (cs *CompositeScorer) Add(ws WeightedScorer) { cs.mu.Lock() defer cs.mu.Unlock() cs.scorers = append(cs.scorers, ws) } // Remove 按名称移除评分器.返回是否找到并移除了. func (cs *CompositeScorer) Remove(name string) bool { cs.mu.Lock() defer cs.mu.Unlock() for i, ws := range cs.scorers { if ws.Scorer.Name() == name { cs.scorers = append(cs.scorers[:i], cs.scorers[i+1:]...) return true } } return false } // externalScorerRequest 是发送给外部进程的 JSON 请求. type externalScorerRequest struct { Query string `json:"query"` Name string `json:"name"` Description string `json:"description"` Type string `json:"type"` } // externalScorerResponse 是外部进程返回的 JSON 响应. type externalScorerResponse struct { Score float64 `json:"score"` } // ExternalScorer 桥接外部进程实现的评分器(跨语言支持). // // 升华改进(ELEVATED): 仓储团队可以用 Python 写评分器, // 通过 stdin/stdout JSON 通信.引擎内部看到的仍然是 Go 接口. // 替代方案:要求所有评分器用 Go 实现(锁定语言生态). // // 使用示例(Python 端): // // # warehouse_scorer.py // import json, sys // for line in sys.stdin: // req = json.loads(line) // score = my_scoring_logic(req["query"], req["name"], req["type"]) // print(json.dumps({"score": score})) // // 使用示例(Go 端): // // scorer, _ := NewExternalScorer(ctx, ExternalScorerOptions{ // Name: "warehouse", // Command: "python3", // Args: []string{"warehouse_scorer.py"}, // Executor: execenv.DefaultExecutor{}, // }) // composite.Add(WeightedScorer{Scorer: scorer, Weight: 0.3}) // // 通信协议(JSON Lines): // // 请求:{"query": "数据库配置", "name": "db_config", "description": "...", "type": "project"} // 响应:{"score": 0.85} // // Score 失败时返回 0.0(不阻断主流程). type ExternalScorer struct { name string proc execenv.Process stdin io.WriteCloser stdout *bufio.Scanner mu sync.Mutex } // ExternalScorerOptions 是 NewExternalScorer 的构造参数. // // 对齐 GitSyncOptions 风格 (opts struct DI), 便于未来扩容 (如 WorkDir, // 自定义 Env) 不破 API. type ExternalScorerOptions struct { // Name 是评分器名称, 用于日志和 CompositeScorer.Remove. Name string // Command 是外部进程的可执行路径 (python3 / node / 自定义 binary). Command string // Args 是传给 Command 的参数列表, 不含 Command 本身. Args []string // Executor 是子进程启动抽象 (M1 方案 β 严格 DI). 必填, nil 即 panic. // 本地 CLI 传 execenv.DefaultExecutor{}, 云端 SaaS 由 platform 层传 // sandbox.Backend. ClassPluginTool 告诉 backend "这是第三方 plugin 代码, // 零信任隔离" — 和 plugin shell tool 同策略. Executor execenv.Executor } // NewExternalScorer 创建一个外部进程评分器. // // ctx 是评分器的生命周期上下文: 从构造到 Close 整段时间内有效, ctx cancel // 会终止子进程 (由 Executor 实现负责转换为 Kill). 长驻进程场景里 ctx 属 // 于 scorer 实例本身, 而不是单次 Score 调用. // // opts.Executor 必填, nil 会 panic. 严格 DI 契约 (M1 方案 β). func NewExternalScorer(ctx context.Context, opts ExternalScorerOptions) (*ExternalScorer, error) { if opts.Executor == nil { panic("memory.NewExternalScorer: opts.Executor is required (M1 strict DI, no fallback)") } // Env 走 MinimalEnvMap (零信任, 第三方评分器代码). 对齐 plugin shell // tool 白名单策略: 只透传 PATH / HOME / LANG / LC_ALL, 不向 python3 脚本 // 泄漏 ANTHROPIC_API_KEY 等敏感宿主 env. proc := opts.Executor.Command(ctx, execenv.Spec{ Class: execenv.ClassPluginTool, Path: opts.Command, Args: opts.Args, Env: execenv.MinimalEnvMap(nil), }) stdin, err := proc.StdinPipe() if err != nil { return nil, fmt.Errorf("memory: external scorer stdin pipe: %w", err) } stdoutReader, err := proc.StdoutPipe() if err != nil { stdin.Close() return nil, fmt.Errorf("memory: external scorer stdout pipe: %w", err) } if err := proc.Start(); err != nil { stdin.Close() return nil, fmt.Errorf("memory: external scorer start: %w", err) } return &ExternalScorer{ name: opts.Name, proc: proc, stdin: stdin, stdout: bufio.NewScanner(stdoutReader), }, nil } func (s *ExternalScorer) Name() string { return s.name } // Score 向外部进程发送请求并读取评分结果. // 精妙之处(CLEVER): 失败时返回 0.0 而非 error--评分器是可选增强, // 不应因外部进程故障阻断整个记忆检索流程. func (s *ExternalScorer) Score(query string, header *MemoryHeader) float64 { s.mu.Lock() defer s.mu.Unlock() req := externalScorerRequest{ Query: query, Name: header.Frontmatter.Name, Description: header.Frontmatter.Description, Type: string(header.Frontmatter.Type), } data, err := json.Marshal(req) if err != nil { return 0 } // 写入请求(JSON Lines: 一行一个 JSON 对象) if _, err := fmt.Fprintf(s.stdin, "%s\n", data); err != nil { return 0 } // 读取响应 if !s.stdout.Scan() { return 0 } var resp externalScorerResponse if err := json.Unmarshal(s.stdout.Bytes(), &resp); err != nil { return 0 } return resp.Score } // Close 关闭外部进程. // // 语义: 先关 stdin (触发子进程 EOF 自主退出), 再 Wait 回收进程资源. // Process interface 刻意不暴露 "进程是否已启动" 状态 (红线 1 不暴露 // *os.Process), 所以只要 s.proc 非 nil (即 NewExternalScorer 成功返回) // 就可以直接 Wait. Wait 的幂等性 / 重复调用保护由 Executor 实现负责. func (s *ExternalScorer) Close() error { s.mu.Lock() defer s.mu.Unlock() if s.stdin != nil { s.stdin.Close() } if s.proc != nil { return s.proc.Wait() } return nil }