import os import io import time import zipfile import requests from pathlib import Path API_BASE = "https://mineru.net/api/v4" DEFAULT_MINERU_FIELDS = { "return_middle_json": "false", "return_model_output": "false", "return_md": "true", "return_images": "false", "end_page_id": "99999", "parse_method": "auto", "start_page_id": "0", "lang_list": "ch", "output_dir": "./output", "server_url": "string", "return_content_list": "false", "backend": "hybrid-auto-engine", "table_enable": "true", "response_format_zip": "false", "formula_enable": "true", } class MinerUClient: def __init__(self, token: str | None = None): self.token = token or "" self.headers = { "Content-Type": "application/json" } if self.token: self.headers["Authorization"] = f"Bearer {self.token}" self.result = None def parse_file(self, file_path: str, model_version="vlm"): """上传单个文件到 MinerU 并启动解析""" if os.getenv("MinerU_URL"): return self.parse_file_api(file_path) resp = requests.post( f"{API_BASE}/file-urls/batch", headers=self.headers, json={"files": [{"name": os.path.basename(file_path)}], "model_version": model_version} ) resp.raise_for_status() data = resp.json()["data"] batch_id = data["batch_id"] upload_url = data["file_urls"][0] print(f"[INFO] 上传文件: {file_path}") with open(file_path, "rb") as f: put_resp = requests.put(upload_url, data=f) put_resp.raise_for_status() print(f"[INFO] 等待 MinerU 解析文件 {os.path.basename(file_path)} ...") while True: res = requests.get( f"{API_BASE}/extract-results/batch/{batch_id}", headers=self.headers ).json() states = [r["state"] for r in res["data"]["extract_result"]] if all(s == "done" for s in states): print(f"[INFO] 文件解析完成: {os.path.basename(file_path)}") self.result = res return res elif any(s == "failed" for s in states): raise RuntimeError(f"[INFO] MinerU 解析失败: {file_path}") time.sleep(10) def parse_file_api(self, file_path: str, fields: dict | None = None): """通过本地/私有 MinerU API 解析文件(/file_parse)。""" api_url = os.getenv("MinerU_URL") if not api_url: raise RuntimeError("MinerU_URL 未配置") data = {**DEFAULT_MINERU_FIELDS, **(fields or {})} file_name = os.path.basename(file_path) with open(file_path, "rb") as f: files = { "files": (file_name, f, "application/octet-stream") } resp = requests.post(api_url, data=data, files=files, headers={"accept": "application/json"}) resp.raise_for_status() self.result = resp.json() return self.result def download_result(self, save_dir="mineru_result"): """下载并解压 MinerU 结果""" if os.getenv("MinerU_URL"): raise RuntimeError("MinerU_URL 模式不支持下载 zip 结果") zip_url = self.result["data"]["extract_result"][0]["full_zip_url"] print(f"[INFO] 下载解析结果: {zip_url}") res = requests.get(zip_url) res.raise_for_status() zf = zipfile.ZipFile(io.BytesIO(res.content)) os.makedirs(save_dir, exist_ok=True) zf.extractall(save_dir) print(f"[INFO] 已解压到 {save_dir}/") return save_dir # ✅ 新增函数:统一批量解析接口 def extract_texts_with_mineru(input_dir: str, save_dir: str = "./mineru_result") -> str: """ 批量调用 MinerU 解析 input_dir 下的所有文件,并合并结果为纯文本。 返回合并后的文本文件路径。 """ mineru_token = os.getenv("MinerU_API_KEY") # 可从环境变量读取 client = MinerUClient(token=mineru_token) input_path = Path(input_dir) os.makedirs(save_dir, exist_ok=True) output_file = Path(save_dir) / "all_knowledge.txt" print(f"[INFO] 开始批量解析目录: {input_path}") all_texts = [] for file in input_path.iterdir(): if not file.is_file(): continue if file.suffix.lower() not in [".pdf", ".docx", ".xlsx", ".pptx", ".txt"]: print(f"[INFO] 跳过不支持的文件类型: {file.name}") continue try: # 调用 MinerU 解析 client.parse_file(str(file)) result_dir = client.download_result(save_dir) # 查找 full.md 文件 for md_path in Path(result_dir).rglob("full.md"): with open(md_path, "r", encoding="utf-8") as f: content = f.read().strip() all_texts.append(f"\n# {file.name}\n{content}") except Exception as e: print(f"[INFO] 解析失败: {file.name} ({type(e).__name__}: {e})") # 合并所有内容 if all_texts: with open(output_file, "w", encoding="utf-8") as f: f.write("\n\n".join(all_texts)) print(f"[INFO] 已合并所有 MinerU 输出 -> {output_file}") else: print(f"[INFO] 未成功解析任何文件。") return str(output_file)