package builtin // 后台 Bash 执行 -- 支持 run_in_background 模式. // // 当 bashInput.RunInBackground 为 true 时,命令在后台执行, // 立即返回 task_id,输出实时累积到 BashOutput(线程安全). // // 后台任务集成到 TaskStore,可通过 TaskList/TaskUpdate 查询和管理. // 超时后自动终止,支持手动取消. import ( "context" "fmt" "strconv" "strings" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/execenv" "git.flytoex.net/yuanwei/flyto-agent/pkg/tools" ) // BackgroundBashTask 表示一个后台执行的 Bash 任务. // // 精妙之处(CLEVER): mu 保护 Status/ExitCode/EndTime 三个可变字段-- // ID/Command/Pid/StartTime 在创建后只写一次,不需要锁保护. // Output 有自己的 BashOutput.mu.cancel 通过 context 语义保护(多次调用安全). // 只给"竞态字段"加锁,避免过度同步带来的死锁风险. type BackgroundBashTask struct { ID string `json:"id"` Command string `json:"command"` Pid int `json:"pid"` StartTime time.Time `json:"start_time"` Output *BashOutput `json:"-"` cancel context.CancelFunc // mu 保护以下可变字段(多 goroutine 并发读写) mu sync.RWMutex Status string `json:"status"` // "running" / "completed" / "failed" / "killed" EndTime time.Time `json:"end_time,omitempty"` ExitCode int `json:"exit_code"` } // GetStatus 线程安全地返回任务当前状态. func (t *BackgroundBashTask) GetStatus() string { t.mu.RLock() defer t.mu.RUnlock() return t.Status } // setFinal 线程安全地设置任务终态(status + exitCode + endTime). // 只应在任务完成/取消/失败时调用一次. func (t *BackgroundBashTask) setFinal(status string, exitCode int) { t.mu.Lock() defer t.mu.Unlock() t.Status = status t.ExitCode = exitCode t.EndTime = time.Now() } // maxBackgroundTasks 是后台任务的最大并发数. // // 升华改进(ELEVATED): 早期方案 BackgroundTaskStore 无容量上限--恶意 LLM 或循环脚本 // 可不断调用 BashTool(run_in_background=true) 创建新 goroutine,导致内存耗尽. // 100 是经验值:覆盖正常 Agent 的并发需求(通常 < 10),同时阻止资源耗尽攻击. // 替代方案:<不限制,由 OS 自然 OOM> - 否决:OOM 会 kill 整个进程,不可控. const maxBackgroundTasks = 100 // defaultBackgroundTaskRetention 是已完成任务在 store 中保留的时间. // // 升华改进(ELEVATED): 早期方案 Remove() 已实现但**没有调用方**--长跑 Agent // 跑完一个 bg 任务后任务永远留在 map 里,导致内存稳步累积(每个 task 含输出 // 缓存 + 命令字符串 + 状态字段),在自动化场景(CI,监控脚本)下会成为隐性泄漏. // 10 分钟保留期让消费者有充足窗口拉取输出和退出码,超期后由 sweepStaleLocked // 自动回收. // 替代方案 1: - 否决:引入 goroutine 生命周期, // // 需要 Close 信号量级联到 Engine.Close(),跨 3 个文件改动. // // 替代方案 2:<显式让 consumer 调 Remove()> - 否决:前提就是没人调,强制 // // 调用方记得释放违反 Go 的"内存自动管理"用户预期. // // 替代方案 3: - 否决:lazy 清理对"任务跑完就再没人查" // // 的孤儿任务无效;sweep-on-Add 配合 hot path 才能持续回收. const defaultBackgroundTaskRetention = 10 * time.Minute // maxOutputBytes 是后台任务单次累积输出的最大字节数(10MB). // // 升华改进(ELEVATED): 早期方案 BashOutput 用 strings.Builder 无限积累-- // `cat /dev/urandom | head -c 100G` 类命令可耗尽内存(strings.Builder 底层是 []byte 扩容). // 10MB 覆盖绝大多数合法场景(构建日志,测试输出),超限后截断并继续运行(不 kill 进程). // 替代方案:<达到上限后 kill 进程> - 否决:kill 策略对用户侵入性太强,截断更友好. const maxOutputBytes = 10 * 1024 * 1024 // 10MB // BashOutput 是线程安全的输出累积器. type BashOutput struct { mu sync.Mutex stdout strings.Builder stderr strings.Builder dropped int // 超出上限后丢弃的字节数(用于告知用户输出被截断) } // WriteStdout 追加 stdout 内容(线程安全). // 超出 maxOutputBytes 的内容被丢弃,dropped 计数器记录丢弃量. func (o *BashOutput) WriteStdout(s string) { o.mu.Lock() defer o.mu.Unlock() current := o.stdout.Len() + o.stderr.Len() if current >= maxOutputBytes { o.dropped += len(s) + 1 return } o.stdout.WriteString(s) o.stdout.WriteByte('\n') } // WriteStderr 追加 stderr 内容(线程安全). // 超出 maxOutputBytes 的内容被丢弃,dropped 计数器记录丢弃量. func (o *BashOutput) WriteStderr(s string) { o.mu.Lock() defer o.mu.Unlock() current := o.stdout.Len() + o.stderr.Len() if current >= maxOutputBytes { o.dropped += len(s) + 1 return } o.stderr.WriteString(s) o.stderr.WriteByte('\n') } // Stdout 返回累积的 stdout 内容(线程安全). func (o *BashOutput) Stdout() string { o.mu.Lock() defer o.mu.Unlock() return o.stdout.String() } // Stderr 返回累积的 stderr 内容(线程安全). func (o *BashOutput) Stderr() string { o.mu.Lock() defer o.mu.Unlock() return o.stderr.String() } // CombinedOutput 返回合并的输出(stdout + stderr with prefix),线程安全. // 如果输出曾被截断,末尾追加截断提示. func (o *BashOutput) CombinedOutput() string { o.mu.Lock() defer o.mu.Unlock() var b strings.Builder stdout := o.stdout.String() stderr := o.stderr.String() if stdout != "" { b.WriteString(stdout) } if stderr != "" { lines := strings.Split(strings.TrimRight(stderr, "\n"), "\n") for _, line := range lines { b.WriteString("[stderr] ") b.WriteString(line) b.WriteByte('\n') } } if o.dropped > 0 { fmt.Fprintf(&b, "\n[output truncated: %d bytes dropped, exceeded %d MB limit]\n", o.dropped, maxOutputBytes/1024/1024) } return b.String() } // BackgroundTaskStore 存储后台 Bash 任务,线程安全. type BackgroundTaskStore struct { mu sync.RWMutex tasks map[string]*BackgroundBashTask nextID int retention time.Duration // 已完成任务保留时长(默认 defaultBackgroundTaskRetention) } // BackgroundStoreOption 是 NewBackgroundTaskStore 的可选配置. // // 升华改进(ELEVATED): functional options 而非 NewBackgroundTaskStoreWithRetention(d) // 这种命名变体--后续若加更多配置(如 maxTasks,自定义时钟),option 模式可平滑扩展, // 不会污染 New() 签名. type BackgroundStoreOption func(*BackgroundTaskStore) // WithRetention 覆盖默认的已完成任务保留时长. // // 主要用途:单元测试设置很短的保留期(如 50ms)以快速验证 GC 行为, // 避免在测试中等待 10 分钟.生产代码通常使用默认值. func WithRetention(d time.Duration) BackgroundStoreOption { return func(s *BackgroundTaskStore) { s.retention = d } } // NewBackgroundTaskStore 创建一个新的后台任务存储. func NewBackgroundTaskStore(opts ...BackgroundStoreOption) *BackgroundTaskStore { s := &BackgroundTaskStore{ tasks: make(map[string]*BackgroundBashTask), nextID: 1, retention: defaultBackgroundTaskRetention, } for _, opt := range opts { opt(s) } return s } // Add 添加一个后台任务. // 返回 error 当任务数已达 maxBackgroundTasks 上限. // // 精妙之处(CLEVER): Add 顺手清理一次 stale 任务-- // 新任务到来时正好是一个机会窗口去回收老任务,无需独立 goroutine. // 即使 Add 极少被调用,最终态任务也会在下次 Add 时被清理; // 完全没有 Add 的场景下,进程也很快会退出(不是真正的 leak). func (s *BackgroundTaskStore) Add(task *BackgroundBashTask) error { s.mu.Lock() defer s.mu.Unlock() s.sweepStaleLocked() if len(s.tasks) >= maxBackgroundTasks { return fmt.Errorf("bash_background: max concurrent tasks reached (%d)", maxBackgroundTasks) } s.tasks[task.ID] = task return nil } // sweepStaleLocked 清理已完成且超过 retention 时长的任务. // // 调用方必须持有 s.mu (写锁). // // 精妙之处(CLEVER): 锁顺序设计--先持 s.mu (W),循环内对每个 task 持 task.mu (R). // 单向锁顺序确保无死锁:runBackground/setFinal 只持 task.mu,从不反向获取 s.mu, // 因此 "store.mu → task.mu" 是唯一的获取顺序. // // 反向思维:是否应该 Phase1 (snapshot under RLock) + Phase2 (delete under WLock) 拆分? // 否决--本路径已经持 s.mu (W) 进入,没必要降级再升级; // 一次遍历 + delete-during-range(Go 规范允许)足够. func (s *BackgroundTaskStore) sweepStaleLocked() int { if s.retention <= 0 { return 0 // 0 或负值表示不清理(用于禁用 GC 的特殊场景) } cutoff := time.Now().Add(-s.retention) var removed int for id, task := range s.tasks { task.mu.RLock() status := task.Status end := task.EndTime task.mu.RUnlock() if status == "running" { continue // 运行中的任务永不回收 } if !end.IsZero() && end.Before(cutoff) { delete(s.tasks, id) removed++ } } return removed } // Remove 删除一个已完成的后台任务,释放内存. func (s *BackgroundTaskStore) Remove(id string) { s.mu.Lock() defer s.mu.Unlock() delete(s.tasks, id) } // Get 获取一个后台任务. func (s *BackgroundTaskStore) Get(id string) (*BackgroundBashTask, bool) { s.mu.RLock() defer s.mu.RUnlock() task, ok := s.tasks[id] return task, ok } // List 列出所有后台任务. func (s *BackgroundTaskStore) List() []*BackgroundBashTask { s.mu.RLock() defer s.mu.RUnlock() result := make([]*BackgroundBashTask, 0, len(s.tasks)) for _, task := range s.tasks { result = append(result, task) } return result } // NextID 生成下一个任务 ID. func (s *BackgroundTaskStore) NextID() string { s.mu.Lock() defer s.mu.Unlock() id := fmt.Sprintf("bg_task_%d", s.nextID) s.nextID++ return id } // executeBackground 在后台执行命令,立即返回 task_id. func (t *BashTool) executeBackground(ctx context.Context, input bashInput) (*BashResult, error) { if t.bgStore == nil { return &BashResult{ Output: "error: background task store not initialized", ExitCode: 1, }, nil } // cwd resolution: capture from request ctx at dispatch time (SubAgent // worktree override). bgCtx below is context.Background() derived, so // the override cannot be re-read there; we close over `cwd` instead. // // cwd 解析: 在分派时从请求 ctx 捕获 (SubAgent worktree 覆盖). 下面 // bgCtx 从 context.Background() 派生, 无法再读 override, 所以闭包捕获 // cwd 值传入 runBackground. cwd := tools.WorkdirFromContext(ctx) if cwd == "" { cwd = t.cwd } // 计算超时 timeout := t.defaultTimeout if input.Timeout > 0 { timeout = time.Duration(input.Timeout) * time.Millisecond if timeout > 600*time.Second { timeout = 600 * time.Second } } taskID := t.bgStore.NextID() bgOutput := &BashOutput{} // 精妙之处(CLEVER): 使用 context.Background() 而非请求 ctx--后台任务需要独立于 // 创建它的 HTTP 请求生存.如果绑定到请求 ctx,客户端断开连接就会杀死后台任务. // 独立 context + 自带超时保证了后台任务既能独立运行,又不会无限期存活. bgCtx, cancel := context.WithTimeout(context.Background(), timeout) bgTask := &BackgroundBashTask{ ID: taskID, Command: input.Command, Status: "running", StartTime: time.Now(), Output: bgOutput, cancel: cancel, } if err := t.bgStore.Add(bgTask); err != nil { cancel() return nil, fmt.Errorf("bash_background: %w", err) } // 同时在 TaskStore 中注册(如果可用) if t.taskStore != nil { t.taskStore.Create( fmt.Sprintf("Background: %s", truncateStr(input.Command, 60)), fmt.Sprintf("task_id: %s", taskID), ) } // 启动后台 goroutine 执行命令 go t.runBackground(bgCtx, cwd, bgTask, bgOutput) return &BashResult{ Output: fmt.Sprintf("Background task started.\ntask_id: %s\ncommand: %s\nUse task tools to check status.", taskID, input.Command), ExitCode: 0, CommandClass: ClassifyShellCommand(input.Command), }, nil } // runBackground 在 goroutine 中执行后台命令. // // M1 commit 7c+7d: 走 executor.Command(ctx, Spec{ClassBash, // IsolateProcessGroup=true}) 统一抽象, 删手工 SysProcAttr. Env 对齐前台 // 全继承 (类 B 决策, 详见 bash.go ExecuteBash L513 设计注释). task.Pid // 从 cmd.Process.Pid 改为 strconv.Atoi(proc.ID()) fallback 0 — Pid 字段 // 仅用于 JSON 展示, 不参与 kill 路径 (kill 走 task.cancel() context cancel). func (t *BashTool) runBackground(ctx context.Context, cwd string, task *BackgroundBashTask, output *BashOutput) { defer task.cancel() proc := t.executor.Command(ctx, execenv.Spec{ Class: execenv.ClassBash, Path: "bash", Args: []string{"-c", task.Command}, Env: execenv.FullInheritMap(nil), WorkDir: cwd, IsolateProcessGroup: true, }) stdoutPipe, err := proc.StdoutPipe() if err != nil { task.setFinal("failed", -1) return } stderrPipe, err := proc.StderrPipe() if err != nil { task.setFinal("failed", -1) return } if err := proc.Start(); err != nil { task.setFinal("failed", -1) return } // proc.ID() 是 opaque 字符串, 本地 DefaultExecutor 填 pid 数字串, // 云端 backend 可能返回 microVM_id. Atoi 失败回落到 0, Pid 字段 // 仅用于 JSON 展示, 不参与 kill 路径. task.Pid, _ = strconv.Atoi(proc.ID()) // 使用 goroutine 读取 stdout 和 stderr var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() buf := make([]byte, 4096) for { n, err := stdoutPipe.Read(buf) if n > 0 { lines := strings.Split(string(buf[:n]), "\n") for _, line := range lines { if line != "" { output.WriteStdout(line) } } } if err != nil { return } } }() go func() { defer wg.Done() buf := make([]byte, 4096) for { n, err := stderrPipe.Read(buf) if n > 0 { lines := strings.Split(string(buf[:n]), "\n") for _, line := range lines { if line != "" { output.WriteStderr(line) } } } if err != nil { return } } }() // 等待管道读取完成或上下文取消 done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: // 正常完成 case <-ctx.Done(): // 超时或取消 - 优雅终止 gracefulKill(proc) <-done } cmdErr := proc.Wait() if ctx.Err() != nil { task.setFinal("killed", -1) } else if cmdErr != nil { // 尝试提取退出码 (duck-type, 不假设底层 *exec.ExitError) exitCode := -1 if ec, ok := cmdErr.(interface{ ExitCode() int }); ok { exitCode = ec.ExitCode() } task.setFinal("failed", exitCode) } else { task.setFinal("completed", 0) } } // CancelBackgroundTask 取消一个后台任务. func (t *BashTool) CancelBackgroundTask(taskID string) (bool, error) { if t.bgStore == nil { return false, fmt.Errorf("background task store not initialized") } task, ok := t.bgStore.Get(taskID) if !ok { return false, fmt.Errorf("task not found: %s", taskID) } if task.GetStatus() != "running" { return false, fmt.Errorf("task is not running (status: %s)", task.GetStatus()) } // 取消上下文,触发 gracefulKill task.cancel() return true, nil } // GetBackgroundTask 获取后台任务信息. func (t *BashTool) GetBackgroundTask(taskID string) (*BackgroundBashTask, bool) { if t.bgStore == nil { return nil, false } return t.bgStore.Get(taskID) } // truncateStr 截断字符串到指定长度. func truncateStr(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen-3] + "..." }