// plan_progress.go - UltraPlan 步骤进度追踪 // // 定位:模块 17.2 补完.实现计划步骤执行状态机 + 进度报告机制. // // 核心设计决策: // // 1. PlanProgress 作为独立结构体,而非嵌入 PlanModeManager // PlanModeManager 负责"模式生命周期",PlanProgress 负责"执行进度". // 分离关注点:计划审批通过后,PlanProgress 可以独立存活到所有步骤完成. // 替代方案:<在 PlanModeManager 里直接加 steps 字段> - // 否决原因:PlanModeManager.active=false(已退出计划模式)时进度仍需追踪, // 两个生命周期不同,混在一起会导致状态机混乱. // // 2. StepStatus 状态机(pending → running → done/failed/skipped) // 状态转换单向(不可倒退),消费方可以安全地 diff 前后状态. // 精妙之处(CLEVER): skipped 是 done 的软变体--步骤因依赖失败而跳过, // 不是"计划失败",而是"这条路不走了".消费方据此决定是否触发重规划. // // 3. OnProgress 回调 + Observer 双通道 // OnProgress 供 SDK/CLI 消费层实时 UI 刷新(低延迟); // Observer 供监控系统结构化指标(高可靠). // 二者互补,不重复--OnProgress 是调用者关心的业务语义,Observer 是系统可观测性. // // 4. 可选绑定:PlanModeManager.AttachProgress // ExitPlanMode 审批通过时调用 AttachProgress,不强制要求有进度追踪器. // SDK 嵌入场景只关心审批结果可以完全忽略进度(NoopApprovalPolicy + 不调用 AttachProgress). // // 升华改进(ELEVATED): 早期实现 计划执行无任何进度追踪--模型执行时只有流式文本, // 用户看不到"当前在执行第几步".我们引入结构化进度,让消费层可以渲染进度条, // 步骤列表,并行执行图,甚至按步骤粒度汇报失败和重规划原因. // 跨行业价值:仓储调度系统可以把每个步骤映射为"货位操作任务", // 并向 WMS 实时同步任务状态. // 替代方案:<纯靠流式文本描述进度> - 否决原因:文本无法机器解析, // 多 Agent 并行时步骤归属不明,失败粒度丢失. package engine import ( "fmt" "sync" "time" ) // ───────────────────────────────────────────── // StepStatus - 步骤状态枚举 // ───────────────────────────────────────────── // StepStatus 是计划步骤的执行状态. // // 精妙之处(CLEVER): 只有 5 个状态,不引入"paused"等复杂中间态-- // 引擎不做调度(那是消费方的责任),所以不需要暂停概念. // 消费方可以在 pending 状态的步骤上"不调用 Start"来实现暂停语义. type StepStatus string const ( // StepStatusPending 步骤尚未开始(初始状态). StepStatusPending StepStatus = "pending" // StepStatusRunning 步骤正在执行(至少有一个 agent 在处理). StepStatusRunning StepStatus = "running" // StepStatusDone 步骤成功完成. StepStatusDone StepStatus = "done" // StepStatusFailed 步骤执行失败(触发重规划或上报错误). StepStatusFailed StepStatus = "failed" // StepStatusSkipped 步骤因依赖失败而跳过(不是执行失败,是路径规划放弃). StepStatusSkipped StepStatus = "skipped" ) // ───────────────────────────────────────────── // StepProgress - 单步骤进度快照 // ───────────────────────────────────────────── // StepProgress 是单个计划步骤的执行进度快照. // // 升华改进(ELEVATED): StepProgress 是不可变快照(每次状态变更产生新快照)-- // 消费方可以对比前后两个快照来决定 UI 怎么刷新,也可以把快照序列存入数据库做审计. // 替代方案:<步骤直接持有可变状态> - 否决原因:并发时消费方读到中间态,需要额外加锁. type StepProgress struct { // Step 原始步骤定义(来自 PlanApprovalEvent.Steps 或手动注册). Step PlanStep // Status 当前执行状态. Status StepStatus // StartedAt 步骤开始时间(Status 变为 Running 时记录,零值表示未开始). StartedAt time.Time // FinishedAt 步骤结束时间(Status 变为 Done/Failed/Skipped 时记录). FinishedAt time.Time // ErrorMessage 失败原因(仅 StatusFailed 时非空). ErrorMessage string // AgentID 执行此步骤的 Agent ID(可选,SubAgent 场景中有意义). AgentID string } // Duration 返回步骤已执行的时间. // 未开始时返回 0;运行中时返回已经过的时间;完成后返回总耗时. func (s StepProgress) Duration() time.Duration { if s.StartedAt.IsZero() { return 0 } if !s.FinishedAt.IsZero() { return s.FinishedAt.Sub(s.StartedAt) } return time.Since(s.StartedAt) } // IsTerminal 返回步骤是否处于终态(done/failed/skipped). func (s StepProgress) IsTerminal() bool { return s.Status == StepStatusDone || s.Status == StepStatusFailed || s.Status == StepStatusSkipped } // ───────────────────────────────────────────── // PlanProgress - 计划进度追踪器 // ───────────────────────────────────────────── // PlanProgress 追踪整个计划的步骤执行进度. // // 精妙之处(CLEVER): PlanProgress 独立于 PlanModeManager 生存-- // PlanModeManager.active 变为 false 之后(计划已批准,开始执行), // PlanProgress 继续追踪每个步骤直到全部完成. // 这符合"生命周期分离"原则:审批 ≠ 执行. // // 线程安全:所有状态通过 sync.RWMutex 保护. type PlanProgress struct { mu sync.RWMutex // sessionID 关联的会话 ID(用于 observer 标签). sessionID string // steps 步骤有序列表(按注册顺序). steps []StepProgress // index stepID → steps 切片下标,O(1) 查找. // 精妙之处(CLEVER): 用下标而不是指针--切片扩容时指针会失效. index map[string]int // startedAt 第一个步骤开始时的时间(用于计算总耗时). startedAt time.Time // finishedAt 最后一个步骤结束时的时间. finishedAt time.Time // onProgress 进度变更回调(可选). // 每次步骤状态变更时同步调用,SDK/CLI 用此驱动 UI 刷新. onProgress func(snapshot PlanProgressSnapshot) // observer 可观测性接口(永不为 nil). observer EventObserver } // NewPlanProgress 创建计划进度追踪器. // // steps 是初始步骤列表(通常来自 PlanApprovalEvent.Steps). // observer 用于结构化事件上报,传 nil 则使用 NoopObserver. func NewPlanProgress(sessionID string, steps []PlanStep, observer EventObserver) *PlanProgress { if observer == nil { observer = &NoopObserver{} } p := &PlanProgress{ sessionID: sessionID, steps: make([]StepProgress, 0, len(steps)), index: make(map[string]int, len(steps)), observer: observer, } for _, step := range steps { p.registerStep(step) } return p } // SetOnProgress 设置进度变更回调. // 每次步骤状态变更时同步调用(在锁外调用,避免死锁). // // 精妙之处(CLEVER): 允许事后设置,而不要求构造时就传入-- // SDK 嵌入场景常见模式:先创建 PlanProgress,再绑定 UI 回调. func (p *PlanProgress) SetOnProgress(fn func(snapshot PlanProgressSnapshot)) { p.mu.Lock() defer p.mu.Unlock() p.onProgress = fn } // RegisterStep 动态添加步骤(重规划时追加新步骤). // // 升华改进(ELEVATED): 早期方案没有动态添加步骤的概念(计划是静态文件). // 我们允许"步骤失败时的重规划"场景:模型生成补救步骤,消费方调用 RegisterStep 追加, // 然后 Start/Finish 继续执行. // 替代方案:<重规划时创建新 PlanProgress> - 否决原因:丢失之前步骤的历史状态, // 消费方无法展示"原计划 5 步,失败后增加了 2 个补救步骤"的完整视图. func (p *PlanProgress) RegisterStep(step PlanStep) error { p.mu.Lock() defer p.mu.Unlock() if _, exists := p.index[step.ID]; exists { return fmt.Errorf("plan_progress: step %q already registered", step.ID) } p.registerStep(step) p.observer.Event("plan_step_registered", map[string]any{ "session_id": p.sessionID, "step_id": step.ID, "total": len(p.steps), }) return nil } // registerStep 内部无锁版本(调用者持有锁). func (p *PlanProgress) registerStep(step PlanStep) { idx := len(p.steps) p.steps = append(p.steps, StepProgress{ Step: step, Status: StepStatusPending, }) p.index[step.ID] = idx } // StartStep 将步骤标记为运行中. // // agentID 是执行此步骤的 Agent ID(可选,传空字符串表示未知). // 返回 error 当步骤 ID 不存在或步骤不在 Pending 状态. func (p *PlanProgress) StartStep(stepID, agentID string) error { p.mu.Lock() idx, ok := p.index[stepID] if !ok { p.mu.Unlock() return fmt.Errorf("plan_progress: unknown step %q", stepID) } step := p.steps[idx] if step.Status != StepStatusPending { p.mu.Unlock() return fmt.Errorf("plan_progress: step %q is %s, cannot start", stepID, step.Status) } now := time.Now() p.steps[idx].Status = StepStatusRunning p.steps[idx].StartedAt = now p.steps[idx].AgentID = agentID // 记录整体开始时间(首次 start) if p.startedAt.IsZero() { p.startedAt = now } snapshot := p.snapshotLocked() onProgress := p.onProgress p.mu.Unlock() // 在锁外触发回调(防止消费方持锁回调造成死锁) if onProgress != nil { onProgress(snapshot) } p.observer.Event("plan_step_started", map[string]any{ "session_id": p.sessionID, "step_id": stepID, "agent_id": agentID, "running": snapshot.RunningCount, }) return nil } // FinishStep 将步骤标记为完成(done/failed/skipped). // // errorMsg 仅 statusFailed 时有意义. func (p *PlanProgress) FinishStep(stepID string, status StepStatus, errorMsg string) error { if status != StepStatusDone && status != StepStatusFailed && status != StepStatusSkipped { return fmt.Errorf("plan_progress: FinishStep requires terminal status (done/failed/skipped), got %q", status) } p.mu.Lock() idx, ok := p.index[stepID] if !ok { p.mu.Unlock() return fmt.Errorf("plan_progress: unknown step %q", stepID) } step := p.steps[idx] if step.IsTerminal() { p.mu.Unlock() return fmt.Errorf("plan_progress: step %q already in terminal state %s", stepID, step.Status) } now := time.Now() // 如果直接从 pending 跳到 terminal(比如 skip 依赖失败的步骤,未曾 start),不覆盖空值 if p.steps[idx].StartedAt.IsZero() && status == StepStatusSkipped { // skipped 步骤可能未曾 start } else if p.steps[idx].StartedAt.IsZero() { p.steps[idx].StartedAt = now } p.steps[idx].Status = status p.steps[idx].FinishedAt = now p.steps[idx].ErrorMessage = errorMsg // 如果全部步骤已完成,记录整体结束时间 allDone := true for _, s := range p.steps { if !s.IsTerminal() { allDone = false break } } if allDone && p.finishedAt.IsZero() { p.finishedAt = now } snapshot := p.snapshotLocked() onProgress := p.onProgress p.mu.Unlock() if onProgress != nil { onProgress(snapshot) } p.observer.Event("plan_step_finished", map[string]any{ "session_id": p.sessionID, "step_id": stepID, "status": string(status), "error": errorMsg, "done": snapshot.DoneCount, "failed": snapshot.FailedCount, "total": snapshot.TotalCount, "all_done": snapshot.IsComplete(), }) return nil } // SkipDependents 将所有依赖 failedStepID 的步骤递归标记为 skipped. // // 精妙之处(CLEVER): 拓扑传播--步骤 A 失败后,所有直接/间接依赖 A 的步骤 // 都应该 skip,而不是等待然后超时. // 消费方不需要自己遍历 Deps 图,调用一次 SkipDependents 即可. // 时间复杂度:O(n²) 最坏情况(全链依赖),但计划步骤数通常 < 20,可接受. // 替代方案:<消费方自己遍历 Deps> - 否决原因:每个消费层都要实现一遍图遍历, // 且"什么时候 skip"的业务语义理应在引擎层统一,不散落到各消费层. func (p *PlanProgress) SkipDependents(failedStepID string) []string { p.mu.Lock() // 收集所有需要 skip 的步骤 ID(BFS) toSkip := p.collectDependents(failedStepID) // 批量更新状态:只 skip pending 步骤. // // 精妙之处(CLEVER): 不 skip running 步骤--运行中的步骤已经在消耗资源, // 强制 skip 会造成"状态不一致"(实际 agent 还在跑但引擎认为已跳过). // 正确策略:让 running 步骤自然结束(done/failed),消费方收到失败后再决策. // Pending 步骤无此问题--尚未分配执行资源,安全中止. var skipped []string now := time.Now() for _, id := range toSkip { idx := p.index[id] if p.steps[idx].Status == StepStatusPending { p.steps[idx].Status = StepStatusSkipped p.steps[idx].FinishedAt = now p.steps[idx].ErrorMessage = fmt.Sprintf("skipped: dependency %q failed", failedStepID) skipped = append(skipped, id) } } snapshot := p.snapshotLocked() onProgress := p.onProgress p.mu.Unlock() if len(skipped) > 0 && onProgress != nil { onProgress(snapshot) } for _, id := range skipped { p.observer.Event("plan_step_skipped", map[string]any{ "session_id": p.sessionID, "step_id": id, "failed_dependency": failedStepID, }) } return skipped } // collectDependents 收集所有(直接+间接)依赖 rootID 的步骤 ID(BFS,调用者持有锁). func (p *PlanProgress) collectDependents(rootID string) []string { var result []string visited := map[string]bool{rootID: true} queue := []string{rootID} for len(queue) > 0 { current := queue[0] queue = queue[1:] for _, s := range p.steps { if visited[s.Step.ID] { continue } for _, dep := range s.Step.Deps { if dep == current { visited[s.Step.ID] = true result = append(result, s.Step.ID) queue = append(queue, s.Step.ID) break } } } } return result } // Snapshot 返回当前进度的完整快照(线程安全). func (p *PlanProgress) Snapshot() PlanProgressSnapshot { p.mu.RLock() defer p.mu.RUnlock() return p.snapshotLocked() } // snapshotLocked 生成快照(调用者持有任意锁). func (p *PlanProgress) snapshotLocked() PlanProgressSnapshot { steps := make([]StepProgress, len(p.steps)) copy(steps, p.steps) snap := PlanProgressSnapshot{ SessionID: p.sessionID, Steps: steps, StartedAt: p.startedAt, FinishedAt: p.finishedAt, } for _, s := range steps { snap.TotalCount++ switch s.Status { case StepStatusPending: snap.PendingCount++ case StepStatusRunning: snap.RunningCount++ case StepStatusDone: snap.DoneCount++ case StepStatusFailed: snap.FailedCount++ case StepStatusSkipped: snap.SkippedCount++ } } return snap } // ───────────────────────────────────────────── // PlanProgressSnapshot - 进度快照(不可变) // ───────────────────────────────────────────── // PlanProgressSnapshot 是某一时刻的计划进度快照. // 不可变:创建后不可修改,消费方可以安全地并发读取. // // 精妙之处(CLEVER): 快照携带所有聚合计数,消费方不需要自己遍历 Steps 统计-- // 在热路径 UI 刷新时(每个步骤状态变更都可能触发重渲染),省去 O(n) 遍历. type PlanProgressSnapshot struct { // SessionID 关联的会话 ID. SessionID string // Steps 步骤进度列表(有序,与注册顺序一致). Steps []StepProgress // 聚合计数(冗余但高频使用,快照生成时预计算) TotalCount int PendingCount int RunningCount int DoneCount int FailedCount int SkippedCount int // StartedAt 第一个步骤开始的时间(所有步骤未开始时为零值). StartedAt time.Time // FinishedAt 所有步骤进入终态的时间(尚未完成时为零值). FinishedAt time.Time } // IsComplete 返回所有步骤是否均已进入终态(done/failed/skipped). func (s PlanProgressSnapshot) IsComplete() bool { return s.TotalCount > 0 && s.PendingCount == 0 && s.RunningCount == 0 } // IsSuccess 返回计划是否成功完成(所有步骤 done 或 skipped,无失败步骤). func (s PlanProgressSnapshot) IsSuccess() bool { return s.IsComplete() && s.FailedCount == 0 } // HasFailed 返回是否有步骤失败. func (s PlanProgressSnapshot) HasFailed() bool { return s.FailedCount > 0 } // ProgressPercent 返回已完成(含 failed/skipped)的步骤百分比(0-100). func (s PlanProgressSnapshot) ProgressPercent() float64 { if s.TotalCount == 0 { return 0 } completed := s.DoneCount + s.FailedCount + s.SkippedCount return float64(completed) / float64(s.TotalCount) * 100 } // Duration 返回整体执行耗时. // 尚未开始时返回 0;执行中返回已经过时间;完成后返回总耗时. func (s PlanProgressSnapshot) Duration() time.Duration { if s.StartedAt.IsZero() { return 0 } if !s.FinishedAt.IsZero() { return s.FinishedAt.Sub(s.StartedAt) } return time.Since(s.StartedAt) } // ReadySteps 返回依赖已满足,可以立即执行的 pending 步骤列表. // // 精妙之处(CLEVER): 消费方不需要自己做拓扑排序-- // ReadySteps 每次返回当前可以并行启动的步骤集合. // 消费方只需循环:获取 ReadySteps → 并行 Start → 等待完成 → 再次调用 ReadySteps. // 这就是 Kahn 算法的消费者端封装. func (s PlanProgressSnapshot) ReadySteps() []StepProgress { // 收集所有已完成(done 或 skipped)的步骤 ID done := make(map[string]bool) for _, sp := range s.Steps { if sp.Status == StepStatusDone || sp.Status == StepStatusSkipped { done[sp.Step.ID] = true } } var ready []StepProgress for _, sp := range s.Steps { if sp.Status != StepStatusPending { continue } // 检查所有依赖是否已满足 allDepsDone := true for _, dep := range sp.Step.Deps { if !done[dep] { allDepsDone = false break } } if allDepsDone { ready = append(ready, sp) } } return ready } // ───────────────────────────────────────────── // PlanProgressEvent - 事件系统集成 // ───────────────────────────────────────────── // PlanProgressEvent 是计划进度变更事件,通过 Engine 事件流广播. // // 升华改进(ELEVATED): 早期方案无此事件--计划执行完全黑盒. // 我们把进度变更接入 Engine 事件系统,CLI 可以渲染进度列表, // SDK 可以推送 SSE 事件,监控系统可以记录执行轨迹. // 替代方案:<只用 OnProgress 回调> - 否决原因:回调是 push 模式, // 事件流是 pull 模式(消费方按需订阅),两种消费模式满足不同场景. type PlanProgressEvent struct { // Snapshot 当前进度快照. Snapshot PlanProgressSnapshot } func (e *PlanProgressEvent) EventType() string { return "plan_progress" }