package engine // observer.go -- 引擎可观测性接口与默认实现. // // 升华改进(ELEVATED): 这不是日志系统--日志是给人看的文本. // 这是结构化的事件/指标流,给监控系统,数据仓库,告警平台消费. // 对于跨行业引擎平台,可观测性比功能本身更重要-- // 没有它,生产问题无法定位,SLA 无法证明,容量无法规划. // // 早期方案的教训:每个配对修复都有事故编号(CC-1212, inc-4977), // 没有可观测性就不会知道这些问题存在,直到用户投诉. // // 替代方案:直接用 fmt.Fprintf(os.Stderr)(当前状态,黑盒,无法分析). // // 使用示例: // // // 编程场景:接 DataDog // observer := &DataDogObserver{apiKey: "..."} // engine := engine.New(&Config{Observer: observer}) // // // 仓储场景:接企业监控 // observer := &EnterpriseObserver{endpoint: "..."} // // // 开发调试:接 stderr // observer := &StderrObserver{MinLevel: "debug"} import ( "fmt" "io" "os" "strings" "sync" "time" "git.flytoex.net/yuanwei/flyto-agent/pkg/flyto" ) // ============================================================ // 核心接口(类型别名,源头在 pkg/flyto/observer.go) // ============================================================ // EventObserver 是引擎可观测性核心接口(flyto.EventObserver 的类型别名). // // 升华改进(ELEVATED): 早期方案在 pkg/engine/observer.go 重复定义了与 pkg/flyto/observer.go // 完全相同的接口--消费层要实现 Observer 就必须 import engine 包,产生反向依赖. // 改为类型别名后:消费层只需 import flyto(轻量契约包),不需要 import engine. // 替代方案:<保留重复定义,靠 Go 结构化类型兼容> - 否决:维护两份接口定义, // 任何方法变更都要改两处,且 Config.Observer 字段类型不明确(engine 还是 flyto?). type EventObserver = flyto.EventObserver type MetricObserver = flyto.MetricObserver type TraceObserver = flyto.TraceObserver // ============================================================ // NoopObserver - 空实现 // ============================================================ // NoopObserver 空实现(不做任何事,生产环境未配置时用). // // 精妙之处(CLEVER): 引擎内部不需要 nil 检查 observer, // 未配置时统一用 NoopObserver--所有调用都是空操作但不会 panic. // 替代方案:每次调用前 `if observer != nil`(散布在几十个地方,容易遗漏). type NoopObserver struct{} func (n *NoopObserver) Event(name string, data map[string]any) {} func (n *NoopObserver) Error(err error, context map[string]any) {} func (n *NoopObserver) Metric(name string, value float64, tags map[string]string) {} // ============================================================ // StderrObserver - 开发调试用的 stderr 输出 // ============================================================ // StderrObserver 开发调试用的 stderr 输出(默认). // // 升华改进(ELEVATED): 与 internal/logger 互补而非替代. // logger 是给开发者看的文本日志(调试用),Observer 是给系统消费的结构化事件流. // StderrObserver 在两者之间架桥--开发时可以在 stderr 看到 Observer 事件. // 替代方案:直接用 logger.Info()(丢失结构化数据,无法被监控系统消费). type StderrObserver struct { MinLevel string // "debug" / "info" / "warn" / "error",默认 "info" Output io.Writer // 输出目标,默认 os.Stderr mu sync.Mutex } func (s *StderrObserver) output() io.Writer { if s.Output != nil { return s.Output } return os.Stderr } func (s *StderrObserver) level() string { if s.MinLevel != "" { return strings.ToLower(s.MinLevel) } return "info" } func (s *StderrObserver) shouldLog(eventLevel string) bool { levels := map[string]int{"debug": 0, "info": 1, "warn": 2, "error": 3} minLvl, ok := levels[s.level()] if !ok { minLvl = 1 } evtLvl, ok := levels[eventLevel] if !ok { evtLvl = 1 } return evtLvl >= minLvl } func (s *StderrObserver) Event(name string, data map[string]any) { if !s.shouldLog("info") { return } s.mu.Lock() defer s.mu.Unlock() fmt.Fprintf(s.output(), "%s [OBSERVE] event=%s %s\n", time.Now().UTC().Format(time.RFC3339), name, formatData(data)) } func (s *StderrObserver) Error(err error, context map[string]any) { if !s.shouldLog("error") { return } s.mu.Lock() defer s.mu.Unlock() fmt.Fprintf(s.output(), "%s [OBSERVE:ERROR] err=%q %s\n", time.Now().UTC().Format(time.RFC3339), err.Error(), formatData(context)) } func (s *StderrObserver) Metric(name string, value float64, tags map[string]string) { if !s.shouldLog("debug") { return } s.mu.Lock() defer s.mu.Unlock() fmt.Fprintf(s.output(), "%s [OBSERVE:METRIC] %s=%.4f %s\n", time.Now().UTC().Format(time.RFC3339), name, value, formatTags(tags)) } // ============================================================ // CompositeObserver - 多 observer 叠加 // ============================================================ // CompositeObserver 多 observer 叠加. // // 升华改进(ELEVATED): 同时发到 DataDog + 审计日志 + stderr. // 生产环境通常需要多路输出:实时告警走 PagerDuty,指标走 Prometheus, // 审计走合规日志,调试走 stderr.CompositeObserver 让它们各司其职. // 替代方案:让每个 Observer 内部做多路分发(每个实现都要做,重复劳动). type CompositeObserver struct { observers []EventObserver } // NewCompositeObserver 创建多路复合 Observer. func NewCompositeObserver(observers ...EventObserver) *CompositeObserver { return &CompositeObserver{observers: observers} } // Observers 返回内部 observer 列表(用于 Close 时遍历刷新 BufferedObserver). func (c *CompositeObserver) Observers() []EventObserver { return c.observers } func (c *CompositeObserver) Event(name string, data map[string]any) { for _, obs := range c.observers { obs.Event(name, data) } } func (c *CompositeObserver) Error(err error, context map[string]any) { for _, obs := range c.observers { obs.Error(err, context) } } // Metric 转发指标到所有实现了 MetricObserver 的子 observer. // // 精妙之处(CLEVER): 只有实现了 MetricObserver 的子 observer 才会收到指标, // 其他的静默跳过.这样 CompositeObserver 可以混合不同能力的 observer. func (c *CompositeObserver) Metric(name string, value float64, tags map[string]string) { for _, obs := range c.observers { if m, ok := obs.(MetricObserver); ok { m.Metric(name, value, tags) } } } // SpanStart 转发到所有实现了 TraceObserver 的子 observer,返回第一个有效的 spanID. func (c *CompositeObserver) SpanStart(name string, tags map[string]string) string { var spanID string for _, obs := range c.observers { if t, ok := obs.(TraceObserver); ok { id := t.SpanStart(name, tags) if spanID == "" { spanID = id } } } return spanID } // SpanEnd 转发到所有实现了 TraceObserver 的子 observer. func (c *CompositeObserver) SpanEnd(spanID string, err error) { for _, obs := range c.observers { if t, ok := obs.(TraceObserver); ok { t.SpanEnd(spanID, err) } } } // ============================================================ // BufferedObserver - 缓冲异步 observer // ============================================================ // observerEntry 是缓冲区中的一条条目. type observerEntry struct { isEvent bool name string data map[string]any err error context map[string]any } // BufferedObserver 缓冲 observer(异步批量发送,不阻塞热路径). // // 升华改进(ELEVATED): 热路径上的 Observer 调用必须不阻塞. // 如果 inner observer 是网络发送(DataDog API,Kafka,gRPC), // 同步调用会拖慢每次 API 调用和工具执行. // BufferedObserver 用 channel 缓冲,后台 goroutine 批量发送. // 替代方案:每次都 go func() 发送(goroutine 爆炸,无背压控制). type BufferedObserver struct { inner EventObserver buffer chan observerEntry batchSize int interval time.Duration done chan struct{} once sync.Once } // NewBufferedObserver 创建缓冲 observer. // batchSize: 批量大小(0 默认 100) // interval: 刷新间隔(0 默认 1s) // bufferSize: channel 缓冲区大小(0 默认 1000) func NewBufferedObserver(inner EventObserver, batchSize int, interval time.Duration, bufferSize int) *BufferedObserver { if batchSize <= 0 { batchSize = 100 } if interval <= 0 { interval = time.Second } if bufferSize <= 0 { bufferSize = 1000 } b := &BufferedObserver{ inner: inner, buffer: make(chan observerEntry, bufferSize), batchSize: batchSize, interval: interval, done: make(chan struct{}), } go b.flushLoop() return b } func (b *BufferedObserver) Event(name string, data map[string]any) { // 精妙之处(CLEVER): 非阻塞发送--如果缓冲区满,丢弃事件而不是阻塞调用者. // 在热路径上,阻塞比丢事件更危险(会拖慢用户交互). // 丢弃的事件可以通过 buffer_overflow 指标发现. select { case b.buffer <- observerEntry{isEvent: true, name: name, data: data}: default: // 缓冲区满,丢弃事件(不阻塞热路径) } } func (b *BufferedObserver) Error(err error, context map[string]any) { // Error 用阻塞发送--错误事件不能丢. // 如果缓冲区满说明有严重问题,阻塞等待是正确的策略. select { case b.buffer <- observerEntry{isEvent: false, err: err, context: context}: default: // 兜底:缓冲区满也不能无限阻塞,直接同步发送 b.inner.Error(err, context) } } func (b *BufferedObserver) flushLoop() { ticker := time.NewTicker(b.interval) defer ticker.Stop() batch := make([]observerEntry, 0, b.batchSize) for { select { case entry, ok := <-b.buffer: if !ok { // channel 关闭,刷新剩余 b.flushBatch(batch) close(b.done) return } batch = append(batch, entry) if len(batch) >= b.batchSize { b.flushBatch(batch) batch = batch[:0] } case <-ticker.C: if len(batch) > 0 { b.flushBatch(batch) batch = batch[:0] } } } } func (b *BufferedObserver) flushBatch(batch []observerEntry) { for _, entry := range batch { if entry.isEvent { b.inner.Event(entry.name, entry.data) } else { b.inner.Error(entry.err, entry.context) } } } // Close 关闭缓冲 observer,等待所有缓冲事件刷新完毕. func (b *BufferedObserver) Close() { b.once.Do(func() { close(b.buffer) <-b.done }) } // ============================================================ // 辅助函数 // ============================================================ // formatData 将 map[string]any 格式化为 key=value 字符串. func formatData(data map[string]any) string { if len(data) == 0 { return "" } parts := make([]string, 0, len(data)) for k, v := range data { parts = append(parts, fmt.Sprintf("%s=%v", k, v)) } return strings.Join(parts, " ") } // formatTags 将 map[string]string 格式化为 key=value 字符串. func formatTags(tags map[string]string) string { if len(tags) == 0 { return "" } parts := make([]string, 0, len(tags)) for k, v := range tags { parts = append(parts, fmt.Sprintf("%s=%s", k, v)) } return strings.Join(parts, " ") }