package inbox // uds_server.go 实现 Unix Domain Socket 收件服务器. // // 模块定位: // // UDSServer 是工具子进程向引擎推送消息的通道. // 工具子进程(Bash,自定义工具)通过 UDS 连接发送 JSON 消息, // 引擎通过 Messages() channel 消费,再转发给观察者. // // 架构决策: // // 为什么用 UDS 而非 HTTP/gRPC? // - UDS 是本机 IPC 最快的方式(无 TCP 栈开销) // - 文件系统权限天然隔离(只有同 UID 的进程能连接) // - socket 文件落在用户私有 cache 目录(os.UserCacheDir),避免 /tmp 元数据泄露 // - 工具子进程无需任何库依赖就能发送(`echo '{}' | nc -U $FLYTO_SESSION_SOCK`) // // 精妙之处(CLEVER): fire-and-forget 协议-- // // 工具发完消息直接断开,不等待 ACK,无阻塞. // 服务端每条消息读完后立即关闭连接(不复用), // 这样即使工具异常退出也不会留下僵尸连接. // 替代方案:长连接复用(减少 syscall 但增加状态管理复杂度). // // 升华改进(ELEVATED): 会话隔离-- // // socket 文件名含 sessionID,多个并发会话不会互相干扰. // SaaS 多租户场景:每个工作区一个 Engine 实例,各自独立的 UDS. // 替代方案:全局单一 UDS(无法区分消息来源,租户 A 的消息可能到租户 B). import ( "encoding/json" "errors" "fmt" "io" "net" "os" "path/filepath" "sync" "sync/atomic" ) // UDSInboxMessage 是工具子进程发送给引擎的消息格式. // // 精妙之处(CLEVER): 这是独立于 inbox.Message 的新类型-- // // inbox.Message 是 Agent 间的富类型消息(含 From/To/ID 路由字段), // UDSInboxMessage 是工具进程的轻量消息(工具不知道路由,只说"我的进度是 X"). // 两者在不同层次服务不同目的,不能混用. // 替代方案:复用 inbox.Message(工具进程需要理解路由,耦合过重). type UDSInboxMessage struct { Type string `json:"type"` // "progress", "log", "result" ToolUseID string `json:"tool_use_id,omitempty"` // 关联的工具调用 ID(可选) Data string `json:"data"` // 消息主体内容 Meta json.RawMessage `json:"meta,omitempty"` // 额外元数据(可选) } // UDSServer 是 Unix Domain Socket 消息接收服务端. // // 生命周期: // 1. NewUDSServer(sessionID) → 创建实例,生成 socket 路径 // 2. Start() → 开始监听(后台 goroutine 处理连接) // 3. Messages() → 消费消息 channel // 4. Close() → 停止监听,删除 socket 文件 type UDSServer struct { sockPath string msgs chan UDSInboxMessage listener net.Listener mu sync.Mutex // 保护 listener 字段 started atomic.Bool closed atomic.Bool once sync.Once // 确保 Close 幂等 done chan struct{} } // maxSockPathLen 是 UDS socket 路径长度上限,对齐 macOS 最严值(104 字节). // Linux 是 108 字节,按 macOS 为准可以一次写对两个平台. // 超过此长度 net.Listen("unix", path) 会返回 "invalid argument" 而非清晰错误, // 所以我们主动检测并 fallback 到 /tmp 而非让内核炸. const maxSockPathLen = 104 // resolveSockPath 选择 socket 文件路径. // // 精妙之处(CLEVER): 两阶段策略--优先用户私有 cache 目录,越界 fallback /tmp. // // 阶段 1: os.UserCacheDir()/flyto/flyto-.sock // · Linux: ~/.cache/flyto/flyto-.sock // · macOS: ~/Library/Caches/flyto/flyto-.sock // 动机: 多用户机器上 /tmp 对所有用户可见(`ls /tmp` 能看到 sessionID // 元数据),用户私有 cache 目录只有当前用户能访问,符合最小暴露原则. // 一致性: session_persist.go / session_snapshot.go 已于 2026-04-13 做过 // 同类迁移(TODO L1182). // // 阶段 2: /tmp/flyto-.sock // 触发条件: UserCacheDir 不可用(无 HOME)或解析后路径 > 104 字节. // macOS 上 `/Users//Library/Caches/flyto/flyto-<40>.sock` 约 85~95 // 字节,长用户名或 /var/folders 前缀容器场景可能越界内核 sockaddr_un 限制. // 降级到 /tmp 保证功能正确性 > 最小暴露原则. // // 替代方案: // // (a) 总是用 /tmp - 丢失最小暴露原则,与 session_persist 不一致 // (b) 总是用 UserCacheDir - macOS 长路径场景内核报错难诊断 // (c) 读 FLYTO_SOCK_DIR 环境变量 - 增加配置面,运维复杂度上升,暂不做 func resolveSockPath(safeSuffix string) string { if cacheDir, err := os.UserCacheDir(); err == nil { flytoDir := filepath.Join(cacheDir, "flyto") // MkdirAll 幂等,权限 0700 确保只有当前用户可访问. // 失败不致命(可能是只读文件系统或权限问题),直接 fallback. if err := os.MkdirAll(flytoDir, 0700); err == nil { candidate := filepath.Join(flytoDir, "flyto-"+safeSuffix+".sock") if len(candidate) <= maxSockPathLen { return candidate } } } // Fallback: os.TempDir() respects $TMPDIR, unlike hardcoded /tmp. // Length is always ~51 bytes given safeSuffix <= 40. return filepath.Join(os.TempDir(), fmt.Sprintf("flyto-%s.sock", safeSuffix)) } // NewUDSServer 创建 UDS 服务端. // sessionID 用于生成唯一的 socket 文件名,避免多实例冲突. // // 精妙之处(CLEVER): 使用 sessionID 而非随机数-- // // sessionID 已经是全局唯一的(包含时间戳+随机后缀),复用它避免二次随机生成. // 同时 socket 路径可预测(知道 sessionID 就能找到 socket),方便调试和测试. // 替代方案:纯随机路径(每次都不一样,调试时需要读 FLYTO_SESSION_SOCK 才能找到). func NewUDSServer(sessionID string) (*UDSServer, error) { if sessionID == "" { return nil, errors.New("uds_server: sessionID must not be empty") } // 历史包袱(LEGACY): socket 文件名长度受内核限制(Linux ~108 字节,macOS ~104 字节). // sessionID 理论上可以很长,截取前 40 字节足够唯一且安全. safeSuffix := sessionID if len(safeSuffix) > 40 { safeSuffix = safeSuffix[:40] } sockPath := resolveSockPath(safeSuffix) return &UDSServer{ sockPath: sockPath, // 升华改进(ELEVATED): buffer 64 条消息-- // 工具并发爆发时(批量文件处理报告进度),不会阻塞发送方. // 消费者(引擎 goroutine)慢于生产者时,超过 64 条的消息将被丢弃(非阻塞推送). // 64 是经验值:覆盖批量场景的瞬时峰值,同时不过度占用内存. // 替代方案:无缓冲(任何生产者超速立刻阻塞). msgs: make(chan UDSInboxMessage, 64), done: make(chan struct{}), }, nil } // Start 开始监听 UDS. // 必须在 Close() 之前最多调用一次.后台 goroutine 处理所有连接. func (s *UDSServer) Start() error { if s.closed.Load() { return errors.New("uds_server: already closed") } if !s.started.CompareAndSwap(false, true) { return errors.New("uds_server: already started") } // 清理残留 socket 文件(上次崩溃可能遗留). // 精妙之处(CLEVER): 先 Remove 再 Listen,避免 "address already in use"-- // 正常情况:Close() 会删除文件,但崩溃时来不及清理. // 无法区分"当前进程的残留"和"另一个进程的活跃 socket",所以直接删除. // 如果有另一个进程在用,下面的 Listen 会成功但另一进程会继续在旧 socket 上. // 替代方案:尝试连接看是否活跃(太复杂,且对崩溃检测效果差). _ = os.Remove(s.sockPath) ln, err := net.Listen("unix", s.sockPath) if err != nil { s.started.Store(false) // 重置,允许重试 return fmt.Errorf("uds_server: listen %s: %w", s.sockPath, err) } // 升华改进(ELEVATED): 显式设置 socket 权限为 0600(仅当前用户可读写). // net.Listen("unix", ...) 创建的 socket 权限受 umask 影响, // 默认 umask 0022 下是 0755,同机所有用户都可连接-- // 攻击者可以伪造权限响应,欺骗 Agent 执行未授权操作. // 替代方案:<依赖 /tmp 的 sticky bit 隔离> - // 否决:sticky bit 只防删除,不防读写;多用户服务器上 /tmp 对所有用户可访问. if err := os.Chmod(s.sockPath, 0600); err != nil { _ = ln.Close() s.started.Store(false) return fmt.Errorf("uds_server: chmod %s: %w", s.sockPath, err) } s.mu.Lock() s.listener = ln s.mu.Unlock() go s.acceptLoop() return nil } // SockPath 返回 socket 文件路径. // 工具子进程需要通过 FLYTO_SESSION_SOCK 环境变量获得此路径. func (s *UDSServer) SockPath() string { return s.sockPath } // Messages 返回只读消息 channel. // 消费者从此 channel 接收工具子进程发来的消息. // // 精妙之处(CLEVER): 返回只读 channel(<-chan)-- // // 编译器保证消费者不能向 channel 写入,防止误用. // 服务端内部用双向 channel(chan),对外暴露只读视图. // 替代方案:暴露双向 channel(允许外部写入,可能破坏状态机). func (s *UDSServer) Messages() <-chan UDSInboxMessage { return s.msgs } // Send 向 msgs channel 投递一条消息(非阻塞). // 供引擎内部组件(如 MonitorTool)直接注入消息,无需经过 UDS 连接. // channel 满时静默丢弃(fire-and-forget 语义). // // 精妙之处(CLEVER): 只有 UDSServer 自身和引擎内部可以 Send-- // // 外部消费者只能通过 Messages() 读取; // 通过工具子进程(UDS 连接)发送的消息由 handleConn 调用内部同逻辑. // 替代方案:暴露双向 channel(任何人都能写,包括意外的外部调用). func (s *UDSServer) Send(msg UDSInboxMessage) { if s.closed.Load() { return // 已关闭,静默忽略 } select { case s.msgs <- msg: default: // channel 满,丢弃 } } // Close 停止服务器并清理 socket 文件.幂等,多次调用安全. func (s *UDSServer) Close() error { var closeErr error s.once.Do(func() { s.closed.Store(true) // 关闭监听器--触发 acceptLoop 的 Accept() 返回错误,退出循环 s.mu.Lock() ln := s.listener s.mu.Unlock() if ln != nil { if err := ln.Close(); err != nil { closeErr = fmt.Errorf("uds_server: close listener: %w", err) } } // 等待 acceptLoop goroutine 退出后再关闭 msgs channel <-s.done // 关闭 msgs channel,通知消费者不会再有新消息 close(s.msgs) // 清理 socket 文件(best-effort,失败不影响功能) _ = os.Remove(s.sockPath) }) return closeErr } // acceptLoop 是主接受循环,在独立 goroutine 中运行. // 每个连接在新 goroutine 中处理(连接很短暂,goroutine 开销可接受). func (s *UDSServer) acceptLoop() { defer close(s.done) s.mu.Lock() ln := s.listener s.mu.Unlock() for { conn, err := ln.Accept() if err != nil { // 正常关闭:listener 被 Close() 关闭,Accept 返回错误 if s.closed.Load() { return } // 异常错误:记录后继续(net.Error.Temporary 已废弃,直接继续) // 精妙之处(CLEVER): 不在 acceptLoop 中 panic-- // 一次 Accept 失败不应让整个服务宕机,继续接受后续连接. // 替代方案:return(导致 acceptLoop 退出,所有后续连接都无法处理). continue } go s.handleConn(conn) } } // handleConn 处理单个连接. // fire-and-forget:读完一条 JSON 消息后立即关闭连接. // // 升华改进(ELEVATED): 每条消息 = 一次连接-- // // 简化状态管理:无需处理消息边界,粘包/拆包问题. // 工具进程无需维护长连接状态,崩溃不影响服务端. // 连接建立/关闭 overhead 在本机 UDS 上可以忽略不计(~1μs). // 替代方案:持久连接 + 长度前缀分帧(实现复杂,工具进程也要处理重连). func (s *UDSServer) handleConn(conn net.Conn) { defer conn.Close() // 安全边界:单条消息最大 64KB,防止恶意/失控的工具发送超大消息 OOM. // 历史包袱(LEGACY): 早期方案无大小限制,生产中有工具误发超大 JSON 导致 OOM 的案例. // 64KB 覆盖所有合理的进度消息,超出则截断(后续 JSON 解析会失败,连接被丢弃). limited := io.LimitReader(conn, 64*1024) var msg UDSInboxMessage if err := json.NewDecoder(limited).Decode(&msg); err != nil { // 解码失败:忽略此连接(不反馈错误给工具,避免工具端处理逻辑复杂化) return } // 非阻塞推送到 msgs channel // 精妙之处(CLEVER): select + default 实现非阻塞-- // 如果消费者来不及消费(msgs channel 满),直接丢弃消息而非阻塞发送方. // 进度消息是"尽力而为"的,丢几条不会破坏系统正确性. // 替代方案:阻塞推送(会导致工具的 UDS 连接被阻塞,进而阻塞工具执行). select { case s.msgs <- msg: default: // channel 满,丢弃(fire-and-forget 语义) } }