package bridge // serial_uploader.go - 背压感知的串行批量事件上传器. // // 使用场景:平台需要将引擎事件批量上传到外部系统(审计后端,分析服务, // 远端 Bridge 服务器).SerialBatchEventUploader 保证: // 1. 串行:上传 N+1 批次在 N 批次完成后才开始(保证事件顺序) // 2. 背压自然形成:网络慢时批次累积更多事件,减少 API 调用次数 // 3. 无丢失:上传失败时调用 onError 回调,调用方决定重试策略 // // 升华改进(ELEVATED): 早期方案 SerialBatchEventUploader 用 // Promise 链实现串行,无法在 Go 直接翻译. // Go 版本用 Mutex + bool(inProgress)+ pending slice 实现等价语义, // 无需 goroutine 池或额外同步原语. // 替代方案:<无缓冲 channel 串行化> - 否决:channel 方案需要额外 goroutine 消费, // Mutex 方案更轻量,不引入 goroutine 生命周期管理的复杂性. import ( "context" "sync" "time" ) // UploadFunc 是批量上传函数的签名. // 实现层负责网络传输,失败时返回 error. type UploadFunc func(ctx context.Context, events []BridgeEvent) error // SerialBatchEventUploader 收集事件并串行批量上传. type SerialBatchEventUploader struct { mu sync.Mutex pending []BridgeEvent // 等待上传的事件(上传在途时累积) inProgress bool // 是否有上传正在进行 upload UploadFunc onError func(events []BridgeEvent, err error) ctx context.Context } // NewSerialBatchEventUploader 创建上传器. // // upload: 批量上传函数(调用方实现,如 HTTP POST) // onError: 上传失败回调(传 nil 则忽略失败) func NewSerialBatchEventUploader( ctx context.Context, upload UploadFunc, onError func(events []BridgeEvent, err error), ) *SerialBatchEventUploader { if onError == nil { onError = func(_ []BridgeEvent, _ error) {} } return &SerialBatchEventUploader{ ctx: ctx, upload: upload, onError: onError, } } // Enqueue 将事件加入上传队列. // // 精妙之处(CLEVER): 三种情况只有一条代码路径-- // 1. 无上传在途:立即启动 goroutine 上传(batch = [evt]) // 2. 上传在途:追加到 pending,当前批次完成后自动触发下一批 // 3. ctx 已取消:事件被丢弃(调用方已放弃) // // 状态检查在锁内,goroutine 在锁外启动-- // 避免在持锁状态下启动 goroutine(持锁期间 goroutine 调度可能死锁). func (u *SerialBatchEventUploader) Enqueue(evt BridgeEvent) { select { case <-u.ctx.Done(): return default: } u.mu.Lock() u.pending = append(u.pending, evt) if u.inProgress { u.mu.Unlock() return } batch := u.pending u.pending = nil u.inProgress = true u.mu.Unlock() go u.runUploadLoop(batch) } // runUploadLoop 串行执行上传,直到 pending 为空. // // 精妙之处(CLEVER): 用循环而非递归实现串行-- // Go 不优化尾递归,深度递归(大量小批次)会栈溢出. // 循环语义完全等价,不耗费额外栈帧. // 每次循环:上传当前批次 → 检查 pending → 有则继续,无则退出. func (u *SerialBatchEventUploader) runUploadLoop(batch []BridgeEvent) { for batch != nil { if err := u.upload(u.ctx, batch); err != nil { u.onError(batch, err) } u.mu.Lock() if len(u.pending) == 0 { u.inProgress = false u.mu.Unlock() return } batch = u.pending u.pending = nil u.mu.Unlock() } } // Flush 阻塞直到所有待上传事件完成或 ctx 取消. // 用于优雅关闭时确保事件不丢失. // // 历史包袱(LEGACY): 用轮询(10ms 间隔)而非 sync.Cond-- // sync.Cond.Wait() 不支持 context 取消,需要额外 goroutine 唤醒, // 代码复杂度提升但 Flush 是非热路径,轮询开销可接受. // 未来改进:用 sync.Cond + 独立的 ctx 监听 goroutine 消除轮询延迟. func (u *SerialBatchEventUploader) Flush(ctx context.Context) error { for { u.mu.Lock() done := !u.inProgress && len(u.pending) == 0 u.mu.Unlock() if done { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(10 * time.Millisecond): } } } // PendingCount 返回当前等待上传的事件数(调试/监控用). func (u *SerialBatchEventUploader) PendingCount() int { u.mu.Lock() defer u.mu.Unlock() return len(u.pending) }