package security import ( "strings" "testing" ) // ─── Scan 基础功能 ──────────────────────────────────────────────────────────── func TestScan_NoSecrets(t *testing.T) { g := NewDefaultSecretGuard() matches, err := g.Scan("/some/file.go", "this is totally safe content") if err != nil { t.Fatalf("unexpected error: %v", err) } if len(matches) != 0 { t.Errorf("expected 0 matches, got %d: %v", len(matches), matches) } } func TestScan_GitHubPAT(t *testing.T) { // 测试用假 token,格式符合 ghp_[36位字母数字] content := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" g := NewDefaultSecretGuard() matches, err := g.Scan("/config.yml", content) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(matches) != 1 { t.Fatalf("expected 1 match, got %d", len(matches)) } if matches[0].RuleID != "github-pat" { t.Errorf("expected rule github-pat, got %s", matches[0].RuleID) } if matches[0].Label != "GitHub PAT" { t.Errorf("expected label 'GitHub PAT', got %q", matches[0].Label) } } func TestScan_AWSAccessKey(t *testing.T) { // 测试用 AKIA 前缀的假 AWS key(16 位 A-Z2-7) content := "aws_access_key_id = AKIAIOSFODNN7EXAMPLE" g := NewDefaultSecretGuard() matches, err := g.Scan("/env/aws.conf", content) if err != nil { t.Fatalf("unexpected error: %v", err) } found := false for _, m := range matches { if m.RuleID == "aws-access-token" { found = true break } } if !found { t.Errorf("expected aws-access-token match, got: %v", matches) } } func TestScan_PrivateKey(t *testing.T) { // 规则要求 BEGIN/END 之间至少 64 字符(真实私钥远超此长度) content := "-----BEGIN RSA PRIVATE KEY-----\n" + "MIIEpAIBAAKCAQEAuH3GBmPOBhJxCVMT4xwlqRoZkBwLvbepS6sBVVwRXeZCXX\n" + "-----END RSA PRIVATE KEY-----" g := NewDefaultSecretGuard() matches, err := g.Scan("/id_rsa", content) if err != nil { t.Fatalf("unexpected error: %v", err) } found := false for _, m := range matches { if m.RuleID == "private-key" { found = true break } } if !found { t.Errorf("expected private-key match, got: %v", matches) } } func TestScan_DeduplicatesRules(t *testing.T) { // 同一规则在内容中出现多次,只应报告一次 content := "key1: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1\n" + "key2: ghp_BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB2" g := NewDefaultSecretGuard() matches, err := g.Scan("/double.yml", content) if err != nil { t.Fatalf("unexpected error: %v", err) } count := 0 for _, m := range matches { if m.RuleID == "github-pat" { count++ } } if count != 1 { t.Errorf("expected github-pat to appear exactly once, got %d times", count) } } // ─── ExemptPaths ───────────────────────────────────────────────────────────── func TestScan_ExemptPath_Skipped(t *testing.T) { content := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" g := &DefaultSecretGuard{ rules: builtinRules, ExemptPaths: []string{"testdata/", "/fixtures/"}, } // testdata/ 前缀应被豁免 matches, err := g.Scan("testdata/sample.yml", content) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(matches) != 0 { t.Errorf("exempt path should produce 0 matches, got %d", len(matches)) } } func TestScan_ExemptPath_NonExemptScanned(t *testing.T) { content := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" g := &DefaultSecretGuard{ rules: builtinRules, ExemptPaths: []string{"testdata/"}, } // 非豁免路径应正常扫描 matches, err := g.Scan("/production/config.yml", content) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(matches) == 0 { t.Error("non-exempt path should be scanned and find matches") } } // ─── MaxScanBytes ───────────────────────────────────────────────────────────── func TestScan_ContentTooLarge(t *testing.T) { // 构造超过默认限制的内容 large := strings.Repeat("a", MaxScanBytes+1) g := NewDefaultSecretGuard() _, err := g.Scan("/huge.bin", large) if err != ErrContentTooLarge { t.Errorf("expected ErrContentTooLarge, got %v", err) } } func TestScan_CustomMaxBytes(t *testing.T) { // 自定义较小的限制 content := strings.Repeat("x", 100) g := &DefaultSecretGuard{rules: builtinRules, MaxBytes: 50} _, err := g.Scan("/file", content) if err != ErrContentTooLarge { t.Errorf("expected ErrContentTooLarge with custom limit, got %v", err) } } func TestScan_ExactlyAtLimit(t *testing.T) { // 恰好等于限制的内容应通过(不超过) content := strings.Repeat("x", 50) g := &DefaultSecretGuard{rules: builtinRules, MaxBytes: 50} _, err := g.Scan("/file", content) if err != nil { t.Errorf("content exactly at limit should not error, got %v", err) } } // ─── Redact ─────────────────────────────────────────────────────────────────── func TestRedact_GitHubPAT(t *testing.T) { original := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1 here" g := NewDefaultSecretGuard() redacted := g.Redact(original) if strings.Contains(redacted, "ghp_") { t.Errorf("redacted content still contains github token: %s", redacted) } if !strings.Contains(redacted, "[REDACTED]") { t.Errorf("redacted content missing [REDACTED] marker: %s", redacted) } } func TestRedact_PreservesSurroundingText(t *testing.T) { // 脱敏只替换 token,不破坏周围的文本 original := "export GH_TOKEN=ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" g := NewDefaultSecretGuard() redacted := g.Redact(original) if !strings.HasPrefix(redacted, "export GH_TOKEN=") { t.Errorf("surrounding text was corrupted: %s", redacted) } } func TestRedact_TooLargePassthrough(t *testing.T) { // 超限内容原样返回,不脱敏(也不报错) large := strings.Repeat("a", MaxScanBytes+1) g := NewDefaultSecretGuard() result := g.Redact(large) if result != large { t.Error("too-large content should be returned unchanged") } } // ─── NoopSecretGuard ────────────────────────────────────────────────────────── func TestNoopSecretGuard(t *testing.T) { g := NoopSecretGuard{} content := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" matches, err := g.Scan("/file", content) if err != nil || len(matches) != 0 { t.Errorf("noop should always return no matches, got matches=%v err=%v", matches, err) } if g.Redact(content) != content { t.Error("noop Redact should return content unchanged") } } // ─── MatchLabels / MatchRuleIDs ─────────────────────────────────────────────── func TestMatchLabels(t *testing.T) { matches := []SecretMatch{ {RuleID: "github-pat", Label: "GitHub PAT"}, {RuleID: "aws-access-token", Label: "AWS Access Token"}, } labels := MatchLabels(matches) if len(labels) != 2 || labels[0] != "GitHub PAT" || labels[1] != "AWS Access Token" { t.Errorf("unexpected labels: %v", labels) } } func TestMatchRuleIDs(t *testing.T) { matches := []SecretMatch{ {RuleID: "github-pat", Label: "GitHub PAT"}, } ids := MatchRuleIDs(matches) if len(ids) != 1 || ids[0] != "github-pat" { t.Errorf("unexpected ids: %v", ids) } } // ─── ruleIDToLabel ──────────────────────────────────────────────────────────── func TestRuleIDToLabel(t *testing.T) { cases := []struct { id string want string }{ {"github-pat", "GitHub PAT"}, {"aws-access-token", "AWS Access Token"}, {"anthropic-api-key", "Anthropic API Key"}, {"openai-api-key", "OpenAI API Key"}, {"gitlab-deploy-token", "GitLab Deploy Token"}, {"private-key", "Private Key"}, {"stripe-access-token", "Stripe Access Token"}, } for _, tc := range cases { got := ruleIDToLabel(tc.id) if got != tc.want { t.Errorf("ruleIDToLabel(%q) = %q, want %q", tc.id, got, tc.want) } } } // ─── OnBlocked 回调 ─────────────────────────────────────────────────────────── func TestScan_OnBlocked_CalledWhenSecretDetected(t *testing.T) { var blockedPath string var blockedMatches []SecretMatch g := NewDefaultSecretGuard() g.OnBlocked = func(path string, matches []SecretMatch) { blockedPath = path blockedMatches = matches } content := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" matches, err := g.Scan("/config.yml", content) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(matches) == 0 { t.Fatal("expected matches") } // OnBlocked 应该已被调用 if blockedPath != "/config.yml" { t.Errorf("OnBlocked path = %q, want %q", blockedPath, "/config.yml") } if len(blockedMatches) == 0 { t.Error("OnBlocked should have received matches") } } func TestScan_OnBlocked_NotCalledWhenSafe(t *testing.T) { called := false g := NewDefaultSecretGuard() g.OnBlocked = func(path string, matches []SecretMatch) { called = true } _, _ = g.Scan("/safe.go", "no secrets here") if called { t.Error("OnBlocked should not be called when no secrets detected") } } func TestScan_OnBlocked_NotCalledOnExemptPath(t *testing.T) { called := false g := NewDefaultSecretGuard() g.ExemptPaths = []string{"testdata/"} g.OnBlocked = func(path string, matches []SecretMatch) { called = true } content := "token: ghp_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA1" _, _ = g.Scan("testdata/fixtures/fake.yml", content) if called { t.Error("OnBlocked should not be called for exempt paths") } } // ─── 并发安全 ───────────────────────────────────────────────────────────────── func TestScan_ConcurrentSafe(t *testing.T) { g := NewDefaultSecretGuard() content := "safe content without secrets" done := make(chan struct{}) for i := 0; i < 10; i++ { go func() { defer func() { done <- struct{}{} }() g.Scan("/file", content) //nolint:errcheck }() } for i := 0; i < 10; i++ { <-done } }