package builtin // fileread_special.go -- FileRead 工具的特殊文件处理逻辑. // // 职责: // - PDF 文件:magic bytes 检测,页数估算 // - Jupyter Notebook:JSON 解析,cell 格式化 // // 这些逻辑从 fileread.go 分离出来,因为: // 1. 各有独立的数据结构和复杂度 // 2. 独立文件便于单独测试 // 3. 避免 fileread.go 膨胀到不可维护的程度 import ( "bytes" "encoding/json" "fmt" "io" "os" "strings" ) // ───────────────────────────────────────────────────────────────────── // PDF 处理 // ───────────────────────────────────────────────────────────────────── // PDF magic bytes: %PDF- var pdfMagicPrefix = []byte("%PDF-") // isPDFMagicBytes 检查文件头部是否包含 PDF magic bytes. // // 精妙之处(CLEVER): PDF 规范要求文件以 %PDF- 开头,但某些 PDF 生成器 // 会在前面加几个空白字节.我们检查前 1024 字节中是否包含此标记, // 而不是严格要求在第 0 字节. func isPDFMagicBytes(header []byte) bool { // 大多数 PDF 在前几字节就有 %PDF- if len(header) >= 5 && bytes.HasPrefix(header, pdfMagicPrefix) { return true } // 宽松检查:在前 1024 字节中搜索 return bytes.Contains(header, pdfMagicPrefix) } // estimatePDFPageCount 估算 PDF 的页数. // // 历史包袱(LEGACY): 没有 Go 标准库的 PDF 解析器,我们用启发式方法: // 搜索 "/Type /Page" 或 "/Type/Page" 标记的出现次数. // 这不是精确的--压缩的 PDF,加密的 PDF 都可能导致估算不准. // 但对于"文件太大需要用 pages 参数"的判断来说,粗略估算足够了. // // 为了避免读取整个大文件,只读取前 1MB 和后 1MB 进行估算. // 大多数 PDF 的页面对象定义集中在文件的后半部分(交叉引用表之前). func estimatePDFPageCount(f *os.File, fileSize int64) int { const sampleSize = 1024 * 1024 // 1MB // 历史包袱(LEGACY): 这个估算方法很粗糙,但比不提供信息好. // 已知失败场景: // - 压缩的对象流(/Type /Page 被压缩在流里,搜不到) // - 线性化 PDF(页面对象可能分散在文件各处) // - 加密 PDF(所有内容加密,搜不到任何标记) // 在这些情况下返回 0(未知页数),调用方会跳过页数显示. var data []byte if fileSize <= int64(sampleSize*2) { // 小文件:全部读取 if _, err := f.Seek(0, io.SeekStart); err != nil { return 0 } var err error data, err = io.ReadAll(f) if err != nil { return 0 } } else { // 大文件:读取前 1MB + 后 1MB frontBuf := make([]byte, sampleSize) if _, err := f.Seek(0, io.SeekStart); err != nil { return 0 } n1, err := f.Read(frontBuf) if err != nil && err != io.EOF { return 0 } backBuf := make([]byte, sampleSize) if _, err := f.Seek(-int64(sampleSize), io.SeekEnd); err != nil { return 0 } n2, err := f.Read(backBuf) if err != nil && err != io.EOF { return 0 } data = make([]byte, 0, n1+n2) data = append(data, frontBuf[:n1]...) data = append(data, backBuf[:n2]...) } // 搜索 /Type /Page 和 /Type/Page(不含 /Pages 和 /PageLabel 等) // 精妙之处(CLEVER): 使用两种变体匹配--有空格和无空格版本. // PDF 规范允许 /Type 和 /Page 之间有可变的空白(空格,换行,回车). // 我们用简单的字符串搜索而非正则,性能更好. count := 0 markers := [][]byte{ []byte("/Type /Page\n"), []byte("/Type /Page\r"), []byte("/Type /Page "), []byte("/Type /Page/"), []byte("/Type/Page\n"), []byte("/Type/Page\r"), []byte("/Type/Page "), []byte("/Type/Page/"), // 也匹配行尾的情况 []byte("/Type /Page>"), []byte("/Type/Page>"), } for _, marker := range markers { count += bytes.Count(data, marker) } return count } // ───────────────────────────────────────────────────────────────────── // Jupyter Notebook 处理 // ───────────────────────────────────────────────────────────────────── // notebookContent 是 .ipynb 文件的顶层 JSON 结构. type notebookContent struct { Cells []notebookCell `json:"cells"` Metadata notebookMetadata `json:"metadata"` Nbformat int `json:"nbformat"` } // notebookMetadata 是 notebook 的元数据. type notebookMetadata struct { LanguageInfo *languageInfo `json:"language_info,omitempty"` KernelSpec *kernelSpec `json:"kernelspec,omitempty"` } type languageInfo struct { Name string `json:"name"` } type kernelSpec struct { DisplayName string `json:"display_name"` Language string `json:"language"` Name string `json:"name"` } // notebookCell 是 notebook 中的一个 cell. type notebookCell struct { CellType string `json:"cell_type"` // "code", "markdown", "raw" Source json.RawMessage `json:"source"` // string 或 string[] Outputs []notebookOutput `json:"outputs,omitempty"` ExecutionCount *int `json:"execution_count,omitempty"` ID string `json:"id,omitempty"` } // notebookOutput 是 cell 的一个输出. type notebookOutput struct { OutputType string `json:"output_type"` // "stream", "execute_result", "display_data", "error" Text json.RawMessage `json:"text,omitempty"` Data map[string]any `json:"data,omitempty"` Name string `json:"name,omitempty"` // stream 类型: "stdout", "stderr" ExecutionCount *int `json:"execution_count,omitempty"` Ename string `json:"ename,omitempty"` // error 类型 Evalue string `json:"evalue,omitempty"` // error 类型 Traceback []string `json:"traceback,omitempty"` // error 类型 } // ───────────────────────────────────────────────────────────────────── // Notebook 解析和格式化 // ───────────────────────────────────────────────────────────────────── // formatNotebook 解析 .ipynb JSON 并格式化为可读的文本输出. // // 精妙之处(CLEVER): 输出格式模仿终端中的 Jupyter 显示: // // In [1]: // Out[1]: // // 这对 LLM 很友好,因为它与训练数据中常见的 Jupyter 格式一致. // Markdown cell 不带 In/Out 标记,直接输出内容. func formatNotebook(data []byte) (string, error) { var nb notebookContent if err := json.Unmarshal(data, &nb); err != nil { return "", fmt.Errorf("invalid notebook JSON: %w", err) } // 确定语言 language := "python" // 默认 if nb.Metadata.LanguageInfo != nil && nb.Metadata.LanguageInfo.Name != "" { language = nb.Metadata.LanguageInfo.Name } else if nb.Metadata.KernelSpec != nil && nb.Metadata.KernelSpec.Language != "" { language = nb.Metadata.KernelSpec.Language } var sb strings.Builder fmt.Fprintf(&sb, "Jupyter Notebook (%s, %d cells)\n", language, len(nb.Cells)) sb.WriteString(strings.Repeat("─", 60)) sb.WriteString("\n") for i, cell := range nb.Cells { source := extractCellSource(cell.Source) switch cell.CellType { case "code": // In [N]: 标记 execNum := i + 1 if cell.ExecutionCount != nil { execNum = *cell.ExecutionCount } fmt.Fprintf(&sb, "\nIn [%d]:\n", execNum) fmt.Fprintf(&sb, "```%s\n%s\n```\n", language, source) // 输出 for _, output := range cell.Outputs { outText := formatCellOutput(output) if outText != "" { fmt.Fprintf(&sb, "Out[%d]:\n%s\n", execNum, outText) } } case "markdown": sb.WriteString("\n[Markdown]\n") sb.WriteString(source) sb.WriteString("\n") case "raw": sb.WriteString("\n[Raw]\n") sb.WriteString(source) sb.WriteString("\n") default: fmt.Fprintf(&sb, "\n[%s]\n%s\n", cell.CellType, source) } } return sb.String(), nil } // extractCellSource 从 cell 的 source 字段提取文本. // source 可以是 string 或 string[](JSON 格式). // // 历史包袱(LEGACY): nbformat v3 用 string[],v4 也允许 string[]. // 大多数现代 notebook 用 string[](每行一个元素), // 但也有工具生成单个 string 的情况.两种都要处理. func extractCellSource(raw json.RawMessage) string { // 先尝试 string var s string if err := json.Unmarshal(raw, &s); err == nil { return s } // 再尝试 string[] var arr []string if err := json.Unmarshal(raw, &arr); err == nil { return strings.Join(arr, "") } return string(raw) } // extractOutputText 从 output 的 text 字段提取文本. // text 同样可以是 string 或 string[]. func extractOutputText(raw json.RawMessage) string { if raw == nil { return "" } var s string if err := json.Unmarshal(raw, &s); err == nil { return s } var arr []string if err := json.Unmarshal(raw, &arr); err == nil { return strings.Join(arr, "") } return "" } // formatCellOutput 格式化单个 cell output. func formatCellOutput(output notebookOutput) string { switch output.OutputType { case "stream": text := extractOutputText(output.Text) if text == "" { return "" } return text case "execute_result", "display_data": // 优先取 text/plain if output.Data != nil { if textPlain, ok := output.Data["text/plain"]; ok { switch v := textPlain.(type) { case string: return v case []any: parts := make([]string, 0, len(v)) for _, item := range v { if s, ok := item.(string); ok { parts = append(parts, s) } } return strings.Join(parts, "") } } // 检查是否有图片数据 if _, hasImg := output.Data["image/png"]; hasImg { return "[image/png output]" } if _, hasImg := output.Data["image/jpeg"]; hasImg { return "[image/jpeg output]" } } return extractOutputText(output.Text) case "error": var sb strings.Builder fmt.Fprintf(&sb, "%s: %s", output.Ename, output.Evalue) if len(output.Traceback) > 0 { sb.WriteString("\n") sb.WriteString(strings.Join(output.Traceback, "\n")) } return sb.String() } return "" }