// audit_observer_l1223_test.go - L1223 工具 Input 审计 opt-in 测试. // // 覆盖范围: // 1. 默认 (Config.AuditIncludeToolInput=false) - Extra 不含 tool_input // 2. 开启 + tool_input 在 maxBytes 内 - Extra["tool_input"] 写入完整内容 // 3. 开启 + tool_input 超 maxBytes - 截断 + Extra["tool_input_truncated"]="true" // 4. 开启 + maxBytes<=0 - 不截断, 原样写入 // 5. 开启但 data 无 tool_input 字段 - 不写入 (防御空值) // // 这些测试不跨 OperationLog, 直接通过 obs.Event 灌入 operation_recorded 事件模拟 // OperationLog.Record 触发路径 (同 audit_local_test.go 既有惯例). // 真端到端 (SecretStore.Redact → OperationLog → AuditObserver) 见 integration 层. package engine import ( "strings" "testing" ) func TestAuditObserver_L1223_DefaultOff(t *testing.T) { sink := &captureSink{} obs := NewAuditObserver(sink, "sess-l1223") // 不调 SetInputAudit, 等同 Config.AuditIncludeToolInput=false 默认路径. obs.Event("operation_recorded", map[string]any{ "tool": "Bash", "status": "success", "tool_input": `{"command":"echo hello"}`, }) if len(sink.entries) != 1 { t.Fatalf("expected 1 entry, got %d", len(sink.entries)) } if _, ok := sink.entries[0].Extra["tool_input"]; ok { t.Errorf("default off: Extra should NOT contain tool_input, got %q", sink.entries[0].Extra["tool_input"]) } } func TestAuditObserver_L1223_EnabledNoTruncation(t *testing.T) { sink := &captureSink{} obs := NewAuditObserver(sink, "sess-l1223") obs.SetInputAudit(true, 0) // maxBytes<=0 = 不截断 input := `{"command":"echo hello","env":{"FOO":"bar"}}` obs.Event("operation_recorded", map[string]any{ "tool": "Bash", "status": "success", "tool_input": input, }) if len(sink.entries) != 1 { t.Fatalf("expected 1 entry, got %d", len(sink.entries)) } got := sink.entries[0].Extra["tool_input"] if got != input { t.Errorf("tool_input: got %q, want %q", got, input) } if _, truncated := sink.entries[0].Extra["tool_input_truncated"]; truncated { t.Error("should not be truncated when maxBytes<=0") } } func TestAuditObserver_L1223_Truncated(t *testing.T) { sink := &captureSink{} obs := NewAuditObserver(sink, "sess-l1223") obs.SetInputAudit(true, 10) // 极小上限触发截断 input := strings.Repeat("X", 50) obs.Event("operation_recorded", map[string]any{ "tool": "FileWrite", "status": "success", "tool_input": input, }) if len(sink.entries) != 1 { t.Fatalf("expected 1 entry, got %d", len(sink.entries)) } got := sink.entries[0].Extra["tool_input"] if len(got) != 10 { t.Errorf("truncated length: got %d, want 10", len(got)) } if got != strings.Repeat("X", 10) { t.Errorf("truncated content unexpected: %q", got) } if sink.entries[0].Extra["tool_input_truncated"] != "true" { t.Errorf("tool_input_truncated: got %q, want \"true\"", sink.entries[0].Extra["tool_input_truncated"]) } } func TestAuditObserver_L1223_EnabledAtBoundary(t *testing.T) { // 边界测试: 长度恰好等于 maxBytes 不应触发截断. // 说明不变式: `len(input) > max` 才截断, `len(input) == max` 不截断. sink := &captureSink{} obs := NewAuditObserver(sink, "") obs.SetInputAudit(true, 10) input := strings.Repeat("Y", 10) obs.Event("operation_recorded", map[string]any{ "tool": "FileEdit", "status": "success", "tool_input": input, }) if len(sink.entries) != 1 { t.Fatalf("expected 1 entry, got %d", len(sink.entries)) } if sink.entries[0].Extra["tool_input"] != input { t.Errorf("boundary tool_input: got %q, want %q", sink.entries[0].Extra["tool_input"], input) } if _, truncated := sink.entries[0].Extra["tool_input_truncated"]; truncated { t.Error("boundary case: should NOT be truncated when len == maxBytes") } } func TestAuditObserver_L1223_EnabledButNoInputField(t *testing.T) { // 防御: 开启后若 event data 缺 tool_input 字段 (旧路径/兼容), 不写入 Extra // (避免 Extra["tool_input"]="" 造成"我明明开了怎么是空的"困惑). sink := &captureSink{} obs := NewAuditObserver(sink, "") obs.SetInputAudit(true, 100) obs.Event("operation_recorded", map[string]any{ "tool": "SomeTool", "status": "success", // 无 tool_input }) if len(sink.entries) != 1 { t.Fatalf("expected 1 entry, got %d", len(sink.entries)) } if _, ok := sink.entries[0].Extra["tool_input"]; ok { t.Error("missing tool_input field: Extra should not contain empty tool_input") } }