package engine import ( "bytes" "errors" "strings" "sync" "testing" "time" ) // ============================================================ // MockObserver - 测试用 mock // ============================================================ // MockObserver 用于测试的 mock observer,记录所有接收到的事件和错误. type MockObserver struct { mu sync.Mutex Events []MockEvent Errors []MockError Metrics []MockMetric } // MockEvent 记录的事件. type MockEvent struct { Name string Data map[string]any } // MockError 记录的错误. type MockError struct { Err error Context map[string]any } // MockMetric 记录的指标. type MockMetric struct { Name string Value float64 Tags map[string]string } func (m *MockObserver) Event(name string, data map[string]any) { m.mu.Lock() defer m.mu.Unlock() m.Events = append(m.Events, MockEvent{Name: name, Data: data}) } func (m *MockObserver) Error(err error, context map[string]any) { m.mu.Lock() defer m.mu.Unlock() m.Errors = append(m.Errors, MockError{Err: err, Context: context}) } func (m *MockObserver) Metric(name string, value float64, tags map[string]string) { m.mu.Lock() defer m.mu.Unlock() m.Metrics = append(m.Metrics, MockMetric{Name: name, Value: value, Tags: tags}) } // EventCount 返回指定名称的事件数量. func (m *MockObserver) EventCount(name string) int { m.mu.Lock() defer m.mu.Unlock() count := 0 for _, e := range m.Events { if e.Name == name { count++ } } return count } // LastEvent 返回最后一个指定名称的事件. func (m *MockObserver) LastEvent(name string) *MockEvent { m.mu.Lock() defer m.mu.Unlock() for i := len(m.Events) - 1; i >= 0; i-- { if m.Events[i].Name == name { return &m.Events[i] } } return nil } // ============================================================ // NoopObserver 测试 // ============================================================ func TestNoopObserver_DoesNotPanic(t *testing.T) { obs := &NoopObserver{} // 所有方法都不应该 panic obs.Event("test", nil) obs.Event("test", map[string]any{"key": "value"}) obs.Error(errors.New("test"), nil) obs.Error(errors.New("test"), map[string]any{"key": "value"}) obs.Metric("test", 1.0, nil) obs.Metric("test", 1.0, map[string]string{"key": "value"}) } func TestNoopObserver_ImplementsAllInterfaces(t *testing.T) { obs := &NoopObserver{} // 验证接口实现 var _ EventObserver = obs var _ MetricObserver = obs } // ============================================================ // StderrObserver 测试 // ============================================================ func TestStderrObserver_Event(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ MinLevel: "info", Output: &buf, } obs.Event("test_event", map[string]any{"count": 42}) output := buf.String() if !strings.Contains(output, "[OBSERVE]") { t.Error("expected [OBSERVE] prefix") } if !strings.Contains(output, "event=test_event") { t.Error("expected event name in output") } if !strings.Contains(output, "count=42") { t.Error("expected data in output") } } func TestStderrObserver_Error(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ MinLevel: "error", Output: &buf, } obs.Error(errors.New("something broke"), map[string]any{"module": "compact"}) output := buf.String() if !strings.Contains(output, "[OBSERVE:ERROR]") { t.Error("expected [OBSERVE:ERROR] prefix") } if !strings.Contains(output, "something broke") { t.Error("expected error message in output") } } func TestStderrObserver_Metric(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ MinLevel: "debug", Output: &buf, } obs.Metric("api_latency_ms", 123.4567, map[string]string{"model": "test"}) output := buf.String() if !strings.Contains(output, "[OBSERVE:METRIC]") { t.Error("expected [OBSERVE:METRIC] prefix") } if !strings.Contains(output, "api_latency_ms=123.4567") { t.Error("expected metric name and value in output") } if !strings.Contains(output, "model=test") { t.Error("expected tags in output") } } func TestStderrObserver_LevelFiltering(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ MinLevel: "error", Output: &buf, } // info 级别的 Event 应该被过滤 obs.Event("should_be_filtered", nil) if buf.Len() > 0 { t.Error("info-level event should be filtered when min level is error") } // error 级别的 Error 应该输出 obs.Error(errors.New("visible"), nil) if buf.Len() == 0 { t.Error("error should be visible when min level is error") } } func TestStderrObserver_MetricFilteredByLevel(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ MinLevel: "warn", Output: &buf, } // Metric 是 debug 级别,应该被 warn 过滤 obs.Metric("filtered", 1.0, nil) if buf.Len() > 0 { t.Error("debug-level metric should be filtered when min level is warn") } } func TestStderrObserver_DefaultLevel(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ Output: &buf, } // 默认级别是 info,Event 应该输出 obs.Event("visible", nil) if buf.Len() == 0 { t.Error("event should be visible at default level (info)") } } func TestStderrObserver_NilData(t *testing.T) { var buf bytes.Buffer obs := &StderrObserver{ Output: &buf, } // nil data 不应该 panic obs.Event("test", nil) if buf.Len() == 0 { t.Error("event with nil data should still output") } } // ============================================================ // CompositeObserver 测试 // ============================================================ func TestCompositeObserver_ForwardsToAll(t *testing.T) { obs1 := &MockObserver{} obs2 := &MockObserver{} composite := NewCompositeObserver(obs1, obs2) composite.Event("test", map[string]any{"key": "value"}) if len(obs1.Events) != 1 { t.Error("obs1 should have received event") } if len(obs2.Events) != 1 { t.Error("obs2 should have received event") } } func TestCompositeObserver_ErrorForwardsToAll(t *testing.T) { obs1 := &MockObserver{} obs2 := &MockObserver{} composite := NewCompositeObserver(obs1, obs2) composite.Error(errors.New("test"), nil) if len(obs1.Errors) != 1 { t.Error("obs1 should have received error") } if len(obs2.Errors) != 1 { t.Error("obs2 should have received error") } } func TestCompositeObserver_MetricOnlyToMetricObservers(t *testing.T) { mock := &MockObserver{} // 实现了 MetricObserver noop := &NoopObserver{} // 也实现了 MetricObserver plainObs := &plainEventObserver{} // 不实现 MetricObserver composite := NewCompositeObserver(mock, noop, plainObs) composite.Metric("test", 1.0, nil) if len(mock.Metrics) != 1 { t.Error("MockObserver should have received metric") } // plainObs 不会收到 metric(编译时无法验证,运行时通过 type assertion 跳过) } func TestCompositeObserver_Empty(t *testing.T) { composite := NewCompositeObserver() // 空 composite 不应该 panic composite.Event("test", nil) composite.Error(errors.New("test"), nil) composite.Metric("test", 1.0, nil) } // plainEventObserver 只实现 EventObserver,不实现 MetricObserver. type plainEventObserver struct { events []MockEvent } func (p *plainEventObserver) Event(name string, data map[string]any) { p.events = append(p.events, MockEvent{Name: name, Data: data}) } func (p *plainEventObserver) Error(err error, context map[string]any) {} // ============================================================ // BufferedObserver 测试 // ============================================================ func TestBufferedObserver_EventsAreForwarded(t *testing.T) { inner := &MockObserver{} buf := NewBufferedObserver(inner, 10, 50*time.Millisecond, 100) defer buf.Close() buf.Event("test1", map[string]any{"k": "v"}) buf.Event("test2", nil) // 等待刷新 time.Sleep(150 * time.Millisecond) inner.mu.Lock() count := len(inner.Events) inner.mu.Unlock() if count != 2 { t.Errorf("expected 2 events forwarded, got %d", count) } } func TestBufferedObserver_ErrorsAreForwarded(t *testing.T) { inner := &MockObserver{} buf := NewBufferedObserver(inner, 10, 50*time.Millisecond, 100) defer buf.Close() buf.Error(errors.New("test"), map[string]any{"ctx": "test"}) // 等待刷新 time.Sleep(150 * time.Millisecond) inner.mu.Lock() count := len(inner.Errors) inner.mu.Unlock() if count != 1 { t.Errorf("expected 1 error forwarded, got %d", count) } } func TestBufferedObserver_BatchFlush(t *testing.T) { inner := &MockObserver{} // batchSize=3,interval 很长(不触发时间刷新) buf := NewBufferedObserver(inner, 3, 10*time.Second, 100) defer buf.Close() // 发送 3 条事件(达到 batch size) buf.Event("e1", nil) buf.Event("e2", nil) buf.Event("e3", nil) // batch size 触发刷新,稍等让 goroutine 处理 time.Sleep(50 * time.Millisecond) inner.mu.Lock() count := len(inner.Events) inner.mu.Unlock() if count != 3 { t.Errorf("expected 3 events after batch flush, got %d", count) } } func TestBufferedObserver_CloseFlushesRemaining(t *testing.T) { inner := &MockObserver{} buf := NewBufferedObserver(inner, 100, 10*time.Second, 100) buf.Event("e1", nil) buf.Event("e2", nil) // Close 应该刷新剩余事件 buf.Close() if len(inner.Events) != 2 { t.Errorf("expected 2 events after close, got %d", len(inner.Events)) } } func TestBufferedObserver_DoubleCloseNoPanic(t *testing.T) { inner := &MockObserver{} buf := NewBufferedObserver(inner, 10, time.Second, 100) buf.Close() buf.Close() // 不应该 panic } func TestBufferedObserver_NonBlockingOnFullBuffer(t *testing.T) { inner := &MockObserver{} // 极小的缓冲区 buf := NewBufferedObserver(inner, 100, 10*time.Second, 2) // 快速填满缓冲区 for i := 0; i < 10; i++ { buf.Event("flood", nil) // 不应该阻塞 } buf.Close() // 不 panic 就说明非阻塞生效了 } // ============================================================ // formatData / formatTags 测试 // ============================================================ func TestFormatData_Empty(t *testing.T) { if formatData(nil) != "" { t.Error("nil data should return empty string") } if formatData(map[string]any{}) != "" { t.Error("empty data should return empty string") } } func TestFormatData_WithValues(t *testing.T) { data := map[string]any{ "count": 42, } result := formatData(data) if !strings.Contains(result, "count=42") { t.Errorf("expected count=42 in '%s'", result) } } func TestFormatTags_Empty(t *testing.T) { if formatTags(nil) != "" { t.Error("nil tags should return empty string") } } func TestFormatTags_WithValues(t *testing.T) { tags := map[string]string{"model": "test"} result := formatTags(tags) if !strings.Contains(result, "model=test") { t.Errorf("expected model=test in '%s'", result) } }