151 lines
5.3 KiB
Python
151 lines
5.3 KiB
Python
import os
|
||
import io
|
||
import time
|
||
import zipfile
|
||
import requests
|
||
from pathlib import Path
|
||
|
||
API_BASE = "https://mineru.net/api/v4"
|
||
DEFAULT_MINERU_FIELDS = {
|
||
"return_middle_json": "false",
|
||
"return_model_output": "false",
|
||
"return_md": "true",
|
||
"return_images": "false",
|
||
"end_page_id": "99999",
|
||
"parse_method": "auto",
|
||
"start_page_id": "0",
|
||
"lang_list": "ch",
|
||
"output_dir": "./output",
|
||
"server_url": "string",
|
||
"return_content_list": "false",
|
||
"backend": "hybrid-auto-engine",
|
||
"table_enable": "true",
|
||
"response_format_zip": "false",
|
||
"formula_enable": "true",
|
||
}
|
||
|
||
class MinerUClient:
|
||
def __init__(self, token: str | None = None):
|
||
self.token = token or ""
|
||
self.headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
if self.token:
|
||
self.headers["Authorization"] = f"Bearer {self.token}"
|
||
self.result = None
|
||
|
||
def parse_file(self, file_path: str, model_version="vlm"):
|
||
"""上传单个文件到 MinerU 并启动解析"""
|
||
if os.getenv("MinerU_URL"):
|
||
return self.parse_file_api(file_path)
|
||
|
||
resp = requests.post(
|
||
f"{API_BASE}/file-urls/batch",
|
||
headers=self.headers,
|
||
json={"files": [{"name": os.path.basename(file_path)}], "model_version": model_version}
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()["data"]
|
||
batch_id = data["batch_id"]
|
||
upload_url = data["file_urls"][0]
|
||
|
||
print(f"[INFO] 上传文件: {file_path}")
|
||
with open(file_path, "rb") as f:
|
||
put_resp = requests.put(upload_url, data=f)
|
||
put_resp.raise_for_status()
|
||
|
||
print(f"[INFO] 等待 MinerU 解析文件 {os.path.basename(file_path)} ...")
|
||
while True:
|
||
res = requests.get(
|
||
f"{API_BASE}/extract-results/batch/{batch_id}",
|
||
headers=self.headers
|
||
).json()
|
||
states = [r["state"] for r in res["data"]["extract_result"]]
|
||
if all(s == "done" for s in states):
|
||
print(f"[INFO] 文件解析完成: {os.path.basename(file_path)}")
|
||
self.result = res
|
||
return res
|
||
elif any(s == "failed" for s in states):
|
||
raise RuntimeError(f"[INFO] MinerU 解析失败: {file_path}")
|
||
time.sleep(10)
|
||
|
||
def parse_file_api(self, file_path: str, fields: dict | None = None):
|
||
"""通过本地/私有 MinerU API 解析文件(/file_parse)。"""
|
||
api_url = os.getenv("MinerU_URL")
|
||
if not api_url:
|
||
raise RuntimeError("MinerU_URL 未配置")
|
||
|
||
data = {**DEFAULT_MINERU_FIELDS, **(fields or {})}
|
||
file_name = os.path.basename(file_path)
|
||
with open(file_path, "rb") as f:
|
||
files = {
|
||
"files": (file_name, f, "application/octet-stream")
|
||
}
|
||
resp = requests.post(api_url, data=data, files=files, headers={"accept": "application/json"})
|
||
resp.raise_for_status()
|
||
self.result = resp.json()
|
||
return self.result
|
||
|
||
def download_result(self, save_dir="mineru_result"):
|
||
"""下载并解压 MinerU 结果"""
|
||
if os.getenv("MinerU_URL"):
|
||
raise RuntimeError("MinerU_URL 模式不支持下载 zip 结果")
|
||
|
||
zip_url = self.result["data"]["extract_result"][0]["full_zip_url"]
|
||
print(f"[INFO] 下载解析结果: {zip_url}")
|
||
res = requests.get(zip_url)
|
||
res.raise_for_status()
|
||
zf = zipfile.ZipFile(io.BytesIO(res.content))
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
zf.extractall(save_dir)
|
||
print(f"[INFO] 已解压到 {save_dir}/")
|
||
return save_dir
|
||
|
||
|
||
# ✅ 新增函数:统一批量解析接口
|
||
def extract_texts_with_mineru(input_dir: str, save_dir: str = "./mineru_result") -> str:
|
||
"""
|
||
批量调用 MinerU 解析 input_dir 下的所有文件,并合并结果为纯文本。
|
||
返回合并后的文本文件路径。
|
||
"""
|
||
mineru_token = os.getenv("MinerU_API_KEY") # 可从环境变量读取
|
||
client = MinerUClient(token=mineru_token)
|
||
|
||
input_path = Path(input_dir)
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
output_file = Path(save_dir) / "all_knowledge.txt"
|
||
|
||
print(f"[INFO] 开始批量解析目录: {input_path}")
|
||
all_texts = []
|
||
|
||
for file in input_path.iterdir():
|
||
if not file.is_file():
|
||
continue
|
||
if file.suffix.lower() not in [".pdf", ".docx", ".xlsx", ".pptx", ".txt"]:
|
||
print(f"[INFO] 跳过不支持的文件类型: {file.name}")
|
||
continue
|
||
|
||
try:
|
||
# 调用 MinerU 解析
|
||
client.parse_file(str(file))
|
||
result_dir = client.download_result(save_dir)
|
||
|
||
# 查找 full.md 文件
|
||
for md_path in Path(result_dir).rglob("full.md"):
|
||
with open(md_path, "r", encoding="utf-8") as f:
|
||
content = f.read().strip()
|
||
all_texts.append(f"\n# {file.name}\n{content}")
|
||
|
||
except Exception as e:
|
||
print(f"[INFO] 解析失败: {file.name} ({type(e).__name__}: {e})")
|
||
|
||
# 合并所有内容
|
||
if all_texts:
|
||
with open(output_file, "w", encoding="utf-8") as f:
|
||
f.write("\n\n".join(all_texts))
|
||
print(f"[INFO] 已合并所有 MinerU 输出 -> {output_file}")
|
||
else:
|
||
print(f"[INFO] 未成功解析任何文件。")
|
||
|
||
return str(output_file)
|