116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional
|
|
from planner.mineru_client import MinerUClient
|
|
import dotenv
|
|
|
|
_ENV_PATH = Path(__file__).resolve().parents[2] / ".env"
|
|
dotenv.load_dotenv(dotenv_path=_ENV_PATH, override=False)
|
|
|
|
TEXT_EXTS = {".txt", ".md", ".markdown"}
|
|
DOC_EXTS = {".pdf", ".docx", ".doc", ".xlsx", ".xls", ".pptx"}
|
|
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff", ".webp"}
|
|
|
|
|
|
def _get_mineru_token() -> str:
|
|
token = os.getenv("MinerU_API") or os.getenv("MinerU_API_KEY")
|
|
if token:
|
|
return token
|
|
if os.getenv("MinerU_URL"):
|
|
return ""
|
|
raise ValueError("MinerU_API 环境变量未设置")
|
|
|
|
|
|
def _read_text_file(file_path: Path) -> str:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return f.read().strip()
|
|
|
|
|
|
def _extract_title_and_content(text: str, fallback_title: str) -> Dict[str, str]:
|
|
if not text:
|
|
return {"title": fallback_title, "content": ""}
|
|
|
|
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
|
if not lines:
|
|
return {"title": fallback_title, "content": text}
|
|
|
|
first = lines[0]
|
|
if first.startswith("#"):
|
|
title = first.lstrip("#").strip() or fallback_title
|
|
content_start_index = text.find(lines[1]) if len(lines) > 1 else -1
|
|
content = text[content_start_index:].strip() if content_start_index != -1 else ""
|
|
return {"title": title, "content": content or text}
|
|
|
|
title = first
|
|
content = "\n".join(lines[1:]).strip()
|
|
return {"title": title, "content": content}
|
|
|
|
|
|
def _ensure_markdown(text: str, title: str) -> str:
|
|
if not text:
|
|
return f"# {title}\n"
|
|
stripped = text.lstrip()
|
|
if stripped.startswith("#"):
|
|
return text
|
|
return f"# {title}\n\n{text}"
|
|
|
|
|
|
def parse_intent_file(input_path: str, save_dir: str = "planner\\mineru_result") -> Dict[str, Any]:
|
|
"""
|
|
解析意图编制导入文件:
|
|
- txt/md: 直接读取
|
|
- pdf/图片: 使用 MinerU 解析,返回 JSON 与解析文本
|
|
"""
|
|
path = Path(input_path)
|
|
if not path.exists() or not path.is_file():
|
|
raise FileNotFoundError(f"输入文件不存在: {input_path}")
|
|
|
|
suffix = path.suffix.lower()
|
|
file_title = path.stem
|
|
|
|
if suffix in TEXT_EXTS:
|
|
content = _read_text_file(path)
|
|
result = _extract_title_and_content(content, file_title)
|
|
return {"source": str(path), "type": "text", **result, "raw_result": None}
|
|
|
|
if suffix in IMAGE_EXTS or suffix == ".pdf":
|
|
mineru_token = _get_mineru_token()
|
|
client = MinerUClient(token=mineru_token)
|
|
|
|
result_json = client.parse_file(str(path))
|
|
|
|
content: Optional[str] = None
|
|
if os.getenv("MinerU_URL"):
|
|
file_key = path.stem
|
|
result_obj = (result_json or {}).get("results", {}).get(file_key, {})
|
|
content = result_obj.get("md_content") or ""
|
|
else:
|
|
file_result_dir = os.path.join(save_dir, path.stem)
|
|
folder = client.download_result(save_dir=file_result_dir)
|
|
|
|
md_path = Path(folder) / "full.md"
|
|
txt_path = Path(folder) / "merged_text.txt"
|
|
|
|
if md_path.exists():
|
|
content = _read_text_file(md_path)
|
|
elif txt_path.exists():
|
|
content = _read_text_file(txt_path)
|
|
else:
|
|
for tf in Path(folder).rglob("*.*"):
|
|
if tf.suffix.lower() in [".md", ".txt"]:
|
|
content = _read_text_file(tf)
|
|
break
|
|
|
|
if suffix == ".pdf":
|
|
content = _ensure_markdown(content or "", file_title)
|
|
|
|
parsed = _extract_title_and_content(content or "", file_title)
|
|
return {
|
|
"source": str(path),
|
|
"type": "mineru",
|
|
**parsed,
|
|
"raw_result": result_json,
|
|
}
|
|
|
|
raise ValueError(f"不支持的文件类型: {path.name}") |