// fileread_test.go -- FileRead 工具的单元测试. // // 覆盖场景: // - 正常读取文件(带行号 cat -n 格式) // - offset 和 limit 参数 // - 空文件处理 // - 文件不存在错误 // - 二进制文件检测 // - 图片文件检测(含 base64 内联,尺寸检测) // - PDF 文件检测(含 magic bytes,pages 参数验证) // - Jupyter Notebook 解析 // - 目录检测 // - 设备文件阻止 // - 编码检测(UTF-8 BOM,UTF-16) // - 符号链接处理 // - 路径验证 // - 文件状态缓存集成 // - formatFileSize 辅助函数 // - PDF 页码范围解析 package builtin import ( "context" "encoding/json" "os" "path/filepath" "strings" "testing" ) // ───────────────────────────────────────────────────────────────────── // 测试 Mock // ───────────────────────────────────────────────────────────────────── // mockFileCache 是 FileCacheRecorder 的测试 mock. type mockFileCacheForRead struct { recorded map[string][]byte } func newMockFileCacheForRead() *mockFileCacheForRead { return &mockFileCacheForRead{recorded: make(map[string][]byte)} } func (m *mockFileCacheForRead) Record(path string, content []byte) { m.recorded[path] = content } // mockStateCache 是 FileStateCacheRecorder 的测试 mock. type mockStateCache struct { entries map[string]FileStateCacheEntry } func newMockStateCache() *mockStateCache { return &mockStateCache{entries: make(map[string]FileStateCacheEntry)} } func (m *mockStateCache) RecordState(path string, entry FileStateCacheEntry) { m.entries[path] = entry } // ───────────────────────────────────────────────────────────────────── // 基础读取测试 // ───────────────────────────────────────────────────────────────────── // TestFileReadTool_BasicRead 测试基本文件读取 func TestFileReadTool_BasicRead(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "test.txt") os.WriteFile(filePath, []byte("line one\nline two\nline three\n"), 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Fatalf("不应标记为错误: %s", result.Output) } // 验证 cat -n 格式的行号 if !strings.Contains(result.Output, "1\t") { t.Errorf("输出应包含行号, 实际: %q", result.Output) } if !strings.Contains(result.Output, "line one") { t.Errorf("输出应包含文件内容, 实际: %q", result.Output) } } // TestFileReadTool_OffsetAndLimit 测试 offset 和 limit 参数 func TestFileReadTool_OffsetAndLimit(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "test.txt") content := "line1\nline2\nline3\nline4\nline5\n" os.WriteFile(filePath, []byte(content), 0644) // offset=2 表示跳过前 2 行,limit=2 表示读 2 行 input, _ := json.Marshal(fileReadInput{FilePath: filePath, Offset: 2, Limit: 2}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Fatalf("不应标记为错误: %s", result.Output) } // 应该包含 line3 和 line4(第 3,4 行) if !strings.Contains(result.Output, "line3") { t.Errorf("应包含 line3: %q", result.Output) } if !strings.Contains(result.Output, "line4") { t.Errorf("应包含 line4: %q", result.Output) } // 不应包含 line1, line2, line5 if strings.Contains(result.Output, "line1") { t.Errorf("不应包含 line1: %q", result.Output) } if strings.Contains(result.Output, "line5") { t.Errorf("不应包含 line5: %q", result.Output) } } // TestFileReadTool_EmptyFile 测试空文件 func TestFileReadTool_EmptyFile(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "empty.txt") os.WriteFile(filePath, []byte{}, 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Error("空文件不应标记为错误") } if !strings.Contains(result.Output, "empty") { t.Errorf("应提示文件为空: %q", result.Output) } } // TestFileReadTool_FileNotFound 测试文件不存在 func TestFileReadTool_FileNotFound(t *testing.T) { tool := NewFileReadTool() input, _ := json.Marshal(fileReadInput{FilePath: "/nonexistent/file.txt"}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("不应返回 Go error: %v", err) } if !result.IsError { t.Error("文件不存在应标记为错误") } if !strings.Contains(result.Output, "not found") { t.Errorf("错误信息不匹配: %s", result.Output) } } // TestFileReadTool_BinaryFile 测试二进制文件检测 func TestFileReadTool_BinaryFile(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "binary.dat") // 写入含有 null 字节的二进制数据(不以图片 magic bytes 开头) os.WriteFile(filePath, []byte{0x01, 0x02, 0x00, 0x03, 0x04, 0x05, 0x06}, 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Error("二进制文件检测不应标记为错误") } if !strings.Contains(result.Output, "Binary file") { t.Errorf("应提示为二进制文件: %q", result.Output) } } // TestFileReadTool_Directory 测试读取目录报错 func TestFileReadTool_Directory(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() input, _ := json.Marshal(fileReadInput{FilePath: dir}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("不应返回 Go error: %v", err) } if !result.IsError { t.Error("读取目录应报错") } if !strings.Contains(result.Output, "directory") { t.Errorf("应提示为目录: %s", result.Output) } } // TestFileReadTool_EmptyFilePath 测试空文件路径 func TestFileReadTool_EmptyFilePath(t *testing.T) { tool := NewFileReadTool() input, _ := json.Marshal(fileReadInput{FilePath: ""}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("不应返回 Go error: %v", err) } if !result.IsError { t.Error("空路径应报错") } } // ───────────────────────────────────────────────────────────────────── // 图片文件测试 // ───────────────────────────────────────────────────────────────────── // TestFileReadTool_ImageFile 测试图片文件返回 base64 内联 func TestFileReadTool_ImageFile(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "photo.png") // 写入一个最小的 1x1 PNG // PNG magic + IHDR chunk (1x1, 8-bit RGBA) pngData := []byte{ 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, // PNG magic 0x00, 0x00, 0x00, 0x0D, // IHDR length 0x49, 0x48, 0x44, 0x52, // "IHDR" 0x00, 0x00, 0x00, 0x01, // width: 1 0x00, 0x00, 0x00, 0x01, // height: 1 0x08, 0x02, // 8-bit RGB 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xDE, // CRC 0x00, 0x00, 0x00, 0x0C, // IDAT length 0x49, 0x44, 0x41, 0x54, // "IDAT" 0x08, 0xD7, 0x63, 0xF8, 0xCF, 0xC0, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0xE2, 0x21, 0xBC, 0x33, // CRC 0x00, 0x00, 0x00, 0x00, // IEND length 0x49, 0x45, 0x4E, 0x44, // "IEND" 0xAE, 0x42, 0x60, 0x82, // CRC } os.WriteFile(filePath, pngData, 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Errorf("图片文件不应标记为错误: %s", result.Output) } if !strings.Contains(result.Output, "Image file") { t.Errorf("应识别为图片文件: %q", result.Output) } if !strings.Contains(result.Output, "image/png") { t.Errorf("应检测到 PNG 类型: %q", result.Output) } if !strings.Contains(result.Output, "base64") { t.Errorf("应包含 base64 数据: %q", result.Output) } // 验证 Data 中的 ImageResult if result.Data == nil { t.Fatal("Data 不应为 nil") } imgResult, ok := result.Data.(*ImageResult) if !ok { t.Fatalf("Data 类型应为 *ImageResult, 实际: %T", result.Data) } if imgResult.MediaType != "image/png" { t.Errorf("MediaType 应为 image/png, 实际: %s", imgResult.MediaType) } if imgResult.Base64 == "" { t.Error("Base64 不应为空") } } // TestFileReadTool_JPEGImage 测试 JPEG 图片文件 func TestFileReadTool_JPEGImage(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "photo.jpg") // JPEG magic bytes jpegData := []byte{0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0x4A, 0x46, 0x49, 0x46} os.WriteFile(filePath, jpegData, 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if !strings.Contains(result.Output, "image/jpeg") { t.Errorf("应检测到 JPEG 类型: %q", result.Output) } } // TestFileReadTool_SVGAsText 测试 SVG 文件作为文本读取 func TestFileReadTool_SVGAsText(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "icon.svg") svgContent := ` ` os.WriteFile(filePath, []byte(svgContent), 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Errorf("SVG 读取不应标记为错误: %s", result.Output) } // SVG 应该作为文本返回,包含 SVG 源码 if !strings.Contains(result.Output, "/fd/N 阻止 func TestIsBlockedDevicePath_ProcFd(t *testing.T) { if !isBlockedDevicePath("/proc/self/fd/0") { t.Error("/proc/self/fd/0 应被阻止") } if !isBlockedDevicePath("/proc/12345/fd/1") { t.Error("/proc/12345/fd/1 应被阻止") } if isBlockedDevicePath("/proc/self/fd/3") { t.Error("/proc/self/fd/3 不应被阻止") } if isBlockedDevicePath("/proc/self/status") { t.Error("/proc/self/status 不应被阻止") } } // ───────────────────────────────────────────────────────────────────── // 编码检测测试 // ───────────────────────────────────────────────────────────────────── // TestDetectEncoding 测试各种编码检测 func TestDetectEncoding(t *testing.T) { tests := []struct { name string header []byte encoding string bomSize int }{ { name: "空数据", header: []byte{}, encoding: "utf-8", bomSize: 0, }, { name: "UTF-8 BOM", header: []byte{0xEF, 0xBB, 0xBF, 0x68, 0x65, 0x6C, 0x6C, 0x6F}, encoding: "utf-8", bomSize: 3, }, { name: "UTF-16 LE BOM", header: []byte{0xFF, 0xFE, 0x68, 0x00, 0x65, 0x00}, encoding: "utf-16le", bomSize: 2, }, { name: "UTF-16 BE BOM", header: []byte{0xFE, 0xFF, 0x00, 0x68, 0x00, 0x65}, encoding: "utf-16be", bomSize: 2, }, { name: "二进制 (null bytes)", header: []byte{0x01, 0x02, 0x00, 0x03, 0x04}, encoding: "binary", bomSize: 0, }, { name: "普通 UTF-8", header: []byte("hello world"), encoding: "utf-8", bomSize: 0, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := detectEncoding(tt.header) if result.encoding != tt.encoding { t.Errorf("编码: 期望 %s, 实际 %s", tt.encoding, result.encoding) } if result.bomSize != tt.bomSize { t.Errorf("BOM 大小: 期望 %d, 实际 %d", tt.bomSize, result.bomSize) } }) } } // TestFileReadTool_UTF8BOM 测试 UTF-8 BOM 跳过 func TestFileReadTool_UTF8BOM(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "bom.txt") // UTF-8 BOM + content data := append([]byte{0xEF, 0xBB, 0xBF}, []byte("hello world")...) os.WriteFile(filePath, data, 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Fatalf("不应标记为错误: %s", result.Output) } if !strings.Contains(result.Output, "hello world") { t.Errorf("应包含文件内容: %q", result.Output) } // BOM 不应出现在输出中(BOM 字符是 U+FEFF) if strings.Contains(result.Output, "\uFEFF") { t.Error("BOM 字符不应出现在输出中") } } // TestFileReadTool_UTF16Detection 测试 UTF-16 文件检测 func TestFileReadTool_UTF16Detection(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "utf16.txt") // UTF-16 LE BOM + "hi" data := []byte{0xFF, 0xFE, 0x68, 0x00, 0x69, 0x00} os.WriteFile(filePath, data, 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Error("UTF-16 检测不应标记为错误") } if !strings.Contains(result.Output, "UTF-16") { t.Errorf("应提示 UTF-16 编码: %q", result.Output) } } // ───────────────────────────────────────────────────────────────────── // 路径验证测试 // ───────────────────────────────────────────────────────────────────── // TestValidatePath 测试路径验证 func TestValidatePath(t *testing.T) { tests := []struct { name string input string wantErr bool }{ {"空路径", "", true}, {"相对路径", "relative/path.txt", true}, {"绝对路径", "/absolute/path.txt", false}, {"带 .. 的绝对路径", "/home/user/../admin/file.txt", false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := validatePath(tt.input) if (err != nil) != tt.wantErr { t.Errorf("validatePath(%q): err=%v, wantErr=%v", tt.input, err, tt.wantErr) } }) } } // TestFileReadTool_RelativePath 测试相对路径报错 func TestFileReadTool_RelativePath(t *testing.T) { tool := NewFileReadTool() input, _ := json.Marshal(fileReadInput{FilePath: "relative/path.txt"}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("不应返回 Go error: %v", err) } if !result.IsError { t.Error("相对路径应报错") } if !strings.Contains(result.Output, "absolute path") { t.Errorf("应提示需要绝对路径: %q", result.Output) } } // TestFileReadTool_Symlink 测试符号链接读取 func TestFileReadTool_Symlink(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() realFile := filepath.Join(dir, "real.txt") os.WriteFile(realFile, []byte("real content"), 0644) linkFile := filepath.Join(dir, "link.txt") os.Symlink(realFile, linkFile) input, _ := json.Marshal(fileReadInput{FilePath: linkFile}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Fatalf("符号链接读取不应报错: %s", result.Output) } if !strings.Contains(result.Output, "real content") { t.Errorf("应读取到真实文件内容: %q", result.Output) } } // ───────────────────────────────────────────────────────────────────── // 缓存集成测试 // ───────────────────────────────────────────────────────────────────── // TestFileReadTool_FileCacheRecording 测试文件缓存记录 func TestFileReadTool_FileCacheRecording(t *testing.T) { cache := newMockFileCacheForRead() tool := NewFileReadToolWithCache(cache) dir := t.TempDir() filePath := filepath.Join(dir, "cached.txt") os.WriteFile(filePath, []byte("cached content\nline 2"), 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) _, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } // 验证缓存被记录 if _, exists := cache.recorded[filePath]; !exists { t.Error("文件内容应被缓存记录") } } // TestFileReadTool_StateCacheRecording 测试文件状态缓存记录 func TestFileReadTool_StateCacheRecording(t *testing.T) { stateCache := newMockStateCache() tool := NewFileReadToolFull(nil, stateCache) dir := t.TempDir() filePath := filepath.Join(dir, "state.txt") os.WriteFile(filePath, []byte("line 1\nline 2\nline 3"), 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) _, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } // 验证状态缓存被记录 entry, exists := stateCache.entries[filePath] if !exists { t.Fatal("文件状态应被缓存记录") } if entry.ContentHash == "" { t.Error("ContentHash 不应为空") } if entry.LineCount != 3 { t.Errorf("LineCount 应为 3, 实际: %d", entry.LineCount) } if entry.IsPartialView { t.Error("完整读取不应标记为 IsPartialView") } } // TestFileReadTool_StateCachePartialView 测试 partial read 的状态标记 func TestFileReadTool_StateCachePartialView(t *testing.T) { stateCache := newMockStateCache() tool := NewFileReadToolFull(nil, stateCache) dir := t.TempDir() filePath := filepath.Join(dir, "partial.txt") os.WriteFile(filePath, []byte("line 1\nline 2\nline 3\nline 4\nline 5"), 0644) // offset > 0 表示 partial read input, _ := json.Marshal(fileReadInput{FilePath: filePath, Offset: 2}) _, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } entry, exists := stateCache.entries[filePath] if !exists { t.Fatal("文件状态应被缓存记录") } if !entry.IsPartialView { t.Error("offset > 0 的读取应标记为 IsPartialView") } } // ───────────────────────────────────────────────────────────────────── // PDF 页码范围解析测试 // ───────────────────────────────────────────────────────────────────── // TestParsePDFPageRange 测试 PDF 页码范围解析 func TestParsePDFPageRange(t *testing.T) { tests := []struct { input string first int last int valid bool }{ {"5", 5, 5, true}, {"1-10", 1, 10, true}, {"3-", 3, -1, true}, {"", 0, 0, false}, {"abc", 0, 0, false}, {"0", 0, 0, false}, // 0 is invalid (1-indexed) {"-5", 0, 0, false}, // negative start {"10-5", 0, 0, false}, // inverted range {" 5 ", 5, 5, true}, // whitespace trimmed {" 1-10 ", 1, 10, true}, // whitespace trimmed } for _, tt := range tests { t.Run(tt.input, func(t *testing.T) { result := parsePDFPageRange(tt.input) if tt.valid { if result == nil { t.Fatalf("应返回有效结果, 输入: %q", tt.input) } if result.FirstPage != tt.first { t.Errorf("FirstPage: 期望 %d, 实际 %d", tt.first, result.FirstPage) } if result.LastPage != tt.last { t.Errorf("LastPage: 期望 %d, 实际 %d", tt.last, result.LastPage) } } else { if result != nil { t.Errorf("应返回 nil, 输入: %q, 实际: %+v", tt.input, result) } } }) } } // TestValidatePagesParam 测试 pages 参数验证 func TestValidatePagesParam(t *testing.T) { // 有效参数 if errResult := validatePagesParam("1-5"); errResult != nil { t.Errorf("1-5 应有效: %s", errResult.Output) } if errResult := validatePagesParam("3"); errResult != nil { t.Errorf("3 应有效: %s", errResult.Output) } if errResult := validatePagesParam(""); errResult != nil { t.Errorf("空字符串应有效: %s", errResult.Output) } // 无效参数 if errResult := validatePagesParam("abc"); errResult == nil { t.Error("abc 应无效") } // 超过最大页数 if errResult := validatePagesParam("1-25"); errResult == nil { t.Error("1-25 应超出限制") } // open-ended range if errResult := validatePagesParam("5-"); errResult == nil { t.Error("5- 应被拒绝(open-ended)") } } // ───────────────────────────────────────────────────────────────────── // 辅助函数测试 // ───────────────────────────────────────────────────────────────────── // TestIsImageFile 测试图片文件扩展名判断 func TestIsImageFile(t *testing.T) { tests := []struct { path string expected bool }{ {"/path/to/image.png", true}, {"/path/to/image.jpg", true}, {"/path/to/image.JPEG", true}, {"/path/to/image.gif", true}, {"/path/to/image.webp", true}, {"/path/to/image.svg", false}, // SVG 走文本路径 {"/path/to/file.go", false}, {"/path/to/file.txt", false}, } for _, tt := range tests { if isImageFile(tt.path) != tt.expected { t.Errorf("isImageFile(%q) = %v, 期望 %v", tt.path, !tt.expected, tt.expected) } } } // TestIsSVGFile 测试 SVG 文件判断 func TestIsSVGFile(t *testing.T) { if !isSVGFile("/path/to/image.svg") { t.Error("应识别为 SVG 文件") } if !isSVGFile("/path/to/image.SVG") { t.Error("应识别大写 SVG") } if isSVGFile("/path/to/image.png") { t.Error("不应将 PNG 识别为 SVG") } } // TestIsPDFFile 测试 PDF 文件判断 func TestIsPDFFile(t *testing.T) { if !isPDFFile("/path/to/file.pdf") { t.Error("应识别为 PDF 文件") } if !isPDFFile("/path/to/file.PDF") { t.Error("应识别大写 PDF 后缀") } if isPDFFile("/path/to/file.txt") { t.Error("不应将 .txt 识别为 PDF") } } // TestIsNotebookFile 测试 Notebook 文件判断 func TestIsNotebookFile(t *testing.T) { if !isNotebookFile("/path/to/nb.ipynb") { t.Error("应识别为 Notebook 文件") } if !isNotebookFile("/path/to/nb.IPYNB") { t.Error("应识别大写扩展名") } if isNotebookFile("/path/to/file.py") { t.Error("不应将 .py 识别为 Notebook") } } // TestFormatFileSize 测试文件大小格式化 func TestFormatFileSize(t *testing.T) { tests := []struct { size int64 expected string }{ {100, "100 bytes"}, {1024, "1.0 KB"}, {2048, "2.0 KB"}, {1048576, "1.0 MB"}, {1073741824, "1.0 GB"}, } for _, tt := range tests { result := formatFileSize(tt.size) if result != tt.expected { t.Errorf("formatFileSize(%d) = %q, 期望 %q", tt.size, result, tt.expected) } } } // ───────────────────────────────────────────────────────────────────── // 图片格式检测测试 // ───────────────────────────────────────────────────────────────────── // TestDetectImageMediaType 测试图片 magic bytes 检测 func TestDetectImageMediaType(t *testing.T) { tests := []struct { name string data []byte expected string }{ { "PNG", []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}, "image/png", }, { "JPEG", []byte{0xFF, 0xD8, 0xFF, 0xE0}, "image/jpeg", }, { "GIF87a", []byte("GIF87a"), "image/gif", }, { "GIF89a", []byte("GIF89a"), "image/gif", }, { "WEBP", []byte{'R', 'I', 'F', 'F', 0x00, 0x00, 0x00, 0x00, 'W', 'E', 'B', 'P'}, "image/webp", }, { "未知格式默认 PNG", []byte{0x00, 0x01, 0x02, 0x03}, "image/png", }, { "太短默认 PNG", []byte{0x89}, "image/png", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := detectImageMediaType(tt.data) if result != tt.expected { t.Errorf("detectImageMediaType: 期望 %s, 实际 %s", tt.expected, result) } }) } } // ───────────────────────────────────────────────────────────────────── // PDF magic bytes 测试 // ───────────────────────────────────────────────────────────────────── // TestIsPDFMagicBytes 测试 PDF magic bytes 检测 func TestIsPDFMagicBytes(t *testing.T) { if !isPDFMagicBytes([]byte("%PDF-1.4")) { t.Error("应识别标准 PDF magic bytes") } if !isPDFMagicBytes([]byte("%PDF-2.0 some data")) { t.Error("应识别 PDF 2.0") } if isPDFMagicBytes([]byte("not a pdf")) { t.Error("不应误识别非 PDF") } if isPDFMagicBytes([]byte{}) { t.Error("空数据不应识别为 PDF") } } // ───────────────────────────────────────────────────────────────────── // Notebook 格式化测试 // ───────────────────────────────────────────────────────────────────── // TestFormatNotebook_ErrorOutput 测试 Notebook error 输出格式化 func TestFormatNotebook_ErrorOutput(t *testing.T) { notebook := `{ "cells": [ { "cell_type": "code", "source": ["1/0"], "outputs": [ { "output_type": "error", "ename": "ZeroDivisionError", "evalue": "division by zero", "traceback": ["Traceback...", " File ..."] } ], "execution_count": 1 } ], "metadata": {"language_info": {"name": "python"}}, "nbformat": 4 }` output, err := formatNotebook([]byte(notebook)) if err != nil { t.Fatalf("格式化失败: %v", err) } if !strings.Contains(output, "ZeroDivisionError") { t.Errorf("应包含错误名: %q", output) } if !strings.Contains(output, "division by zero") { t.Errorf("应包含错误值: %q", output) } } // TestFormatNotebook_InvalidJSON 测试无效 JSON func TestFormatNotebook_InvalidJSON(t *testing.T) { _, err := formatNotebook([]byte("not json")) if err == nil { t.Error("无效 JSON 应返回错误") } } // ───────────────────────────────────────────────────────────────────── // Metadata 测试 // ───────────────────────────────────────────────────────────────────── // TestFileReadTool_OffsetBeyondFile 测试 offset 超出文件行数 func TestFileReadTool_OffsetBeyondFile(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "short.txt") os.WriteFile(filePath, []byte("one\ntwo\n"), 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath, Offset: 100}) result, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } if result.IsError { t.Error("offset 超出不应标记为错误") } if !strings.Contains(result.Output, "fewer than") { t.Errorf("应提示文件行数不足: %q", result.Output) } } // TestFileReadTool_Metadata 测试工具元数据 func TestFileReadTool_Metadata(t *testing.T) { tool := NewFileReadTool() meta := tool.Metadata() if !meta.ConcurrencySafe { t.Error("FileRead 应标记为 ConcurrencySafe") } if !meta.ReadOnly { t.Error("FileRead 应标记为 ReadOnly") } if meta.Destructive { t.Error("FileRead 不应标记为 Destructive") } } // ───────────────────────────────────────────────────────────────────── // 上下文取消测试 // ───────────────────────────────────────────────────────────────────── // TestFileReadTool_ContextCancelled 测试上下文取消 func TestFileReadTool_ContextCancelled(t *testing.T) { tool := NewFileReadTool() dir := t.TempDir() filePath := filepath.Join(dir, "large.txt") // 创建一个有很多行的文件 var content strings.Builder for i := 0; i < 10000; i++ { content.WriteString("line of content\n") } os.WriteFile(filePath, []byte(content.String()), 0644) // 创建已取消的上下文 ctx, cancel := context.WithCancel(context.Background()) cancel() // 立即取消 input, _ := json.Marshal(fileReadInput{FilePath: filePath}) result, err := tool.Execute(ctx, input, nil) if err != nil { t.Fatalf("不应返回 Go error: %v", err) } // 由于上下文已取消,应该返回取消错误或者在第一次 ctx.Done() 检查时就退出 // 注意:如果文件很小,可能在检查 ctx.Done() 之前就读完了 // 所以这里我们只验证不会 panic _ = result } // ───────────────────────────────────────────────────────────────────── // Notebook 缓存测试 // ───────────────────────────────────────────────────────────────────── // TestFileReadTool_NotebookCacheRecording 测试 Notebook 文件缓存记录 func TestFileReadTool_NotebookCacheRecording(t *testing.T) { cache := newMockFileCacheForRead() tool := NewFileReadToolWithCache(cache) dir := t.TempDir() filePath := filepath.Join(dir, "test.ipynb") notebook := `{ "cells": [{"cell_type": "code", "source": "x = 1", "outputs": []}], "metadata": {}, "nbformat": 4 }` os.WriteFile(filePath, []byte(notebook), 0644) input, _ := json.Marshal(fileReadInput{FilePath: filePath}) _, err := tool.Execute(context.Background(), input, nil) if err != nil { t.Fatalf("执行失败: %v", err) } // Notebook 内容应被缓存 if _, exists := cache.recorded[filePath]; !exists { t.Error("Notebook 内容应被缓存记录") } } // ───────────────────────────────────────────────────────────────────── // computeContentHash 测试 // ───────────────────────────────────────────────────────────────────── func TestComputeContentHash(t *testing.T) { hash1 := computeContentHash([]byte("hello")) hash2 := computeContentHash([]byte("hello")) hash3 := computeContentHash([]byte("world")) if hash1 != hash2 { t.Error("相同内容应产生相同哈希") } if hash1 == hash3 { t.Error("不同内容应产生不同哈希") } if len(hash1) != 64 { t.Errorf("SHA-256 hex 应为 64 字符, 实际: %d", len(hash1)) } }