package engine // flush_gate.go -- FlushGate 保证消息顺序. // // 当 transport 正在 flush(如压缩,重连)时,新产生的消息需要排队等待. // FlushGate 是一个泛型同步原语: // - Start() 标记 flush 开始,后续 Enqueue 的项会被暂存 // - End() flush 结束,返回暂存的项供调用者处理 // - Drop() 丢弃暂存项(如 flush 失败时) // - Deactivate() 清除激活状态但保留 pending(transport 替换时用) // // 线程安全:所有方法均通过 mutex 保护. import "sync" // FlushGate 是一个泛型的 flush 排队门. // 在 flush 期间,通过 Enqueue 提交的项会被暂存; // flush 结束后通过 End 取回所有暂存项. type FlushGate[T any] struct { mu sync.Mutex active bool // flush 是否正在进行 pending []T // flush 期间排队的项 } // NewFlushGate 创建一个新的 FlushGate. func NewFlushGate[T any]() *FlushGate[T] { return &FlushGate[T]{} } // Start 标记 flush 开始. // 调用后,Enqueue 会将项暂存到 pending 队列. // 如果已经在 flush 中,此操作为空操作. func (g *FlushGate[T]) Start() { g.mu.Lock() defer g.mu.Unlock() g.active = true } // Enqueue 在 flush 期间将项加入排队. // 返回 true 表示项已排队(flush 进行中); // 返回 false 表示当前不在 flush 中,调用者应直接处理该项. func (g *FlushGate[T]) Enqueue(items ...T) bool { g.mu.Lock() defer g.mu.Unlock() if !g.active { return false } g.pending = append(g.pending, items...) return true } // End flush 结束,返回所有排队的项并清空队列. // 调用后 active 变为 false,后续 Enqueue 将返回 false. func (g *FlushGate[T]) End() []T { g.mu.Lock() defer g.mu.Unlock() g.active = false result := g.pending g.pending = nil return result } // Drop 丢弃所有排队的项(例如 flush 失败时),返回丢弃的数量. // 不改变 active 状态. func (g *FlushGate[T]) Drop() int { g.mu.Lock() defer g.mu.Unlock() n := len(g.pending) g.pending = nil return n } // Deactivate 清除激活状态但保留 pending 项. // 用于 transport 替换场景:旧 transport 的 flush 不再等待, // 但排队的项需要由新 transport 处理. // 精妙之处(CLEVER): Deactivate 只清除 active 标记但保留 pending 队列-- // 这是为 transport 热替换设计的:旧 transport 的 flush 中断时, // 排队的消息不能丢(可能包含工具结果),需要由新 transport 接手处理. // 如果用 End() 会取走 pending 数据但可能没有消费者处理. func (g *FlushGate[T]) Deactivate() { g.mu.Lock() defer g.mu.Unlock() g.active = false } // IsActive 返回 flush 是否正在进行. func (g *FlushGate[T]) IsActive() bool { g.mu.Lock() defer g.mu.Unlock() return g.active } // PendingCount 返回当前排队的项数量. func (g *FlushGate[T]) PendingCount() int { g.mu.Lock() defer g.mu.Unlock() return len(g.pending) }