Files
FlexibleTestPlatform/backend/planner/mineru_client.py
2026-02-05 16:25:52 +08:00

151 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import io
import time
import zipfile
import requests
from pathlib import Path
API_BASE = "https://mineru.net/api/v4"
DEFAULT_MINERU_FIELDS = {
"return_middle_json": "false",
"return_model_output": "false",
"return_md": "true",
"return_images": "false",
"end_page_id": "99999",
"parse_method": "auto",
"start_page_id": "0",
"lang_list": "ch",
"output_dir": "./output",
"server_url": "string",
"return_content_list": "false",
"backend": "hybrid-auto-engine",
"table_enable": "true",
"response_format_zip": "false",
"formula_enable": "true",
}
class MinerUClient:
def __init__(self, token: str | None = None):
self.token = token or ""
self.headers = {
"Content-Type": "application/json"
}
if self.token:
self.headers["Authorization"] = f"Bearer {self.token}"
self.result = None
def parse_file(self, file_path: str, model_version="vlm"):
"""上传单个文件到 MinerU 并启动解析"""
if os.getenv("MinerU_URL"):
return self.parse_file_api(file_path)
resp = requests.post(
f"{API_BASE}/file-urls/batch",
headers=self.headers,
json={"files": [{"name": os.path.basename(file_path)}], "model_version": model_version}
)
resp.raise_for_status()
data = resp.json()["data"]
batch_id = data["batch_id"]
upload_url = data["file_urls"][0]
print(f"[INFO] 上传文件: {file_path}")
with open(file_path, "rb") as f:
put_resp = requests.put(upload_url, data=f)
put_resp.raise_for_status()
print(f"[INFO] 等待 MinerU 解析文件 {os.path.basename(file_path)} ...")
while True:
res = requests.get(
f"{API_BASE}/extract-results/batch/{batch_id}",
headers=self.headers
).json()
states = [r["state"] for r in res["data"]["extract_result"]]
if all(s == "done" for s in states):
print(f"[INFO] 文件解析完成: {os.path.basename(file_path)}")
self.result = res
return res
elif any(s == "failed" for s in states):
raise RuntimeError(f"[INFO] MinerU 解析失败: {file_path}")
time.sleep(10)
def parse_file_api(self, file_path: str, fields: dict | None = None):
"""通过本地/私有 MinerU API 解析文件(/file_parse"""
api_url = os.getenv("MinerU_URL")
if not api_url:
raise RuntimeError("MinerU_URL 未配置")
data = {**DEFAULT_MINERU_FIELDS, **(fields or {})}
file_name = os.path.basename(file_path)
with open(file_path, "rb") as f:
files = {
"files": (file_name, f, "application/octet-stream")
}
resp = requests.post(api_url, data=data, files=files, headers={"accept": "application/json"})
resp.raise_for_status()
self.result = resp.json()
return self.result
def download_result(self, save_dir="mineru_result"):
"""下载并解压 MinerU 结果"""
if os.getenv("MinerU_URL"):
raise RuntimeError("MinerU_URL 模式不支持下载 zip 结果")
zip_url = self.result["data"]["extract_result"][0]["full_zip_url"]
print(f"[INFO] 下载解析结果: {zip_url}")
res = requests.get(zip_url)
res.raise_for_status()
zf = zipfile.ZipFile(io.BytesIO(res.content))
os.makedirs(save_dir, exist_ok=True)
zf.extractall(save_dir)
print(f"[INFO] 已解压到 {save_dir}/")
return save_dir
# ✅ 新增函数:统一批量解析接口
def extract_texts_with_mineru(input_dir: str, save_dir: str = "./mineru_result") -> str:
"""
批量调用 MinerU 解析 input_dir 下的所有文件,并合并结果为纯文本。
返回合并后的文本文件路径。
"""
mineru_token = os.getenv("MinerU_API_KEY") # 可从环境变量读取
client = MinerUClient(token=mineru_token)
input_path = Path(input_dir)
os.makedirs(save_dir, exist_ok=True)
output_file = Path(save_dir) / "all_knowledge.txt"
print(f"[INFO] 开始批量解析目录: {input_path}")
all_texts = []
for file in input_path.iterdir():
if not file.is_file():
continue
if file.suffix.lower() not in [".pdf", ".docx", ".xlsx", ".pptx", ".txt"]:
print(f"[INFO] 跳过不支持的文件类型: {file.name}")
continue
try:
# 调用 MinerU 解析
client.parse_file(str(file))
result_dir = client.download_result(save_dir)
# 查找 full.md 文件
for md_path in Path(result_dir).rglob("full.md"):
with open(md_path, "r", encoding="utf-8") as f:
content = f.read().strip()
all_texts.append(f"\n# {file.name}\n{content}")
except Exception as e:
print(f"[INFO] 解析失败: {file.name} ({type(e).__name__}: {e})")
# 合并所有内容
if all_texts:
with open(output_file, "w", encoding="utf-8") as f:
f.write("\n\n".join(all_texts))
print(f"[INFO] 已合并所有 MinerU 输出 -> {output_file}")
else:
print(f"[INFO] 未成功解析任何文件。")
return str(output_file)