client.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. """Docling raw bundle downloader.
  2. Talks to Docling Serve v1 over HTTP:
  3. - ``POST /v1/convert/file/async`` — multipart upload, returns ``task_id``,
  4. - ``GET /v1/status/poll/{task_id}?wait=5`` — long-poll for terminal state,
  5. - ``GET /v1/result/{task_id}`` — zip download (only on ``success``).
  6. The zip is extracted safely under ``raw_dir/`` (refusing path traversal /
  7. absolute entries). A success manifest is written atomically at the very
  8. end; mid-run crashes therefore leave the directory in a state the cache
  9. layer marks as invalid (no manifest → miss → re-download).
  10. Pipeline constants (``pipeline``, ``target_type``, ``to_formats``,
  11. ``image_export_mode``) are intentionally **not** env-driven — the sidecar
  12. flow depends on them — and are recorded inside the manifest so a future
  13. code change automatically invalidates pre-existing caches.
  14. """
  15. from __future__ import annotations
  16. import asyncio
  17. import json
  18. import os
  19. import time
  20. from pathlib import Path
  21. from typing import TYPE_CHECKING, Any
  22. from lightrag.parser.external._common import (
  23. env_bool,
  24. env_int,
  25. raise_for_status_with_detail,
  26. )
  27. from lightrag.parser.external._zip import safe_extract_zip
  28. from lightrag.parser.external.docling.cache import (
  29. compute_options_signature,
  30. current_endpoint_signature,
  31. snapshot_tunable_env,
  32. )
  33. from lightrag.parser.external.docling.manifest import (
  34. build_and_write_docling_manifest,
  35. select_main_json,
  36. )
  37. from lightrag.utils import logger
  38. if TYPE_CHECKING:
  39. import httpx
  40. else:
  41. try:
  42. import httpx
  43. except ImportError: # pragma: no cover
  44. httpx = None
  45. # ---------------------------------------------------------------------------
  46. # Fixed pipeline constants (NOT env-driven)
  47. # ---------------------------------------------------------------------------
  48. PIPELINE = "standard"
  49. TARGET_TYPE = "zip"
  50. TO_FORMATS: tuple[str, ...] = ("json", "md")
  51. IMAGE_EXPORT_MODE = "referenced"
  52. FIXED_CONSTANTS: dict[str, object] = {
  53. "pipeline": PIPELINE,
  54. "target_type": TARGET_TYPE,
  55. "to_formats": list(TO_FORMATS),
  56. "image_export_mode": IMAGE_EXPORT_MODE,
  57. }
  58. CONVERT_PATH = "/v1/convert/file/async"
  59. POLL_PATH = "/v1/status/poll/{task_id}"
  60. RESULT_PATH = "/v1/result/{task_id}"
  61. DEFAULT_POLL_WAIT_SECONDS = 5
  62. DEFAULT_MAX_POLLS = 240 # 240 * 5s long-poll ≈ 20 min worst case
  63. # ConversionStatus enum from the docling-serve OpenAPI
  64. SUCCESS_STATES = {"success"}
  65. FAILURE_STATES = {"failure", "partial_success", "skipped"}
  66. IN_PROGRESS_STATES = {"pending", "started"}
  67. class DoclingRawClient:
  68. """Downloads docling-serve bundles into ``raw_dir``.
  69. Construct once per parse call (cheap). Reads ``DOCLING_*`` envs at
  70. ``__init__`` time, so callers can flip env between calls and pick up
  71. the new values without holding a stale instance.
  72. """
  73. def __init__(self) -> None:
  74. self.endpoint = current_endpoint_signature()
  75. if not self.endpoint:
  76. raise ValueError("DOCLING_ENDPOINT is required")
  77. self.engine_version = os.getenv("DOCLING_ENGINE_VERSION", "").strip()
  78. self.do_ocr = env_bool("DOCLING_DO_OCR", True)
  79. self.force_ocr = env_bool("DOCLING_FORCE_OCR", True)
  80. self.ocr_engine = os.getenv("DOCLING_OCR_ENGINE", "auto").strip() or "auto"
  81. self.ocr_preset = os.getenv("DOCLING_OCR_PRESET", "auto").strip() or "auto"
  82. self.ocr_lang_raw = os.getenv("DOCLING_OCR_LANG", "").strip()
  83. self.do_formula_enrichment = env_bool("DOCLING_DO_FORMULA_ENRICHMENT", False)
  84. # Poll cadence: docling-serve's ``?wait=N`` is a server-side long-poll
  85. # window. ``DOCLING_POLL_INTERVAL_SECONDS`` sets that window; the
  86. # client does NOT add its own sleep between polls. ``DOCLING_MAX_POLLS``
  87. # bounds the total polling budget — exceeding it raises ``TimeoutError``.
  88. wait = env_int("DOCLING_POLL_INTERVAL_SECONDS", DEFAULT_POLL_WAIT_SECONDS)
  89. self.poll_wait_seconds = wait if wait > 0 else DEFAULT_POLL_WAIT_SECONDS
  90. max_polls = env_int("DOCLING_MAX_POLLS", DEFAULT_MAX_POLLS)
  91. self.max_poll_attempts = max_polls if max_polls > 0 else DEFAULT_MAX_POLLS
  92. # ------------------------------------------------------------------
  93. # Public API
  94. # ------------------------------------------------------------------
  95. async def download_into(
  96. self,
  97. raw_dir: Path,
  98. source_file_path: Path,
  99. *,
  100. upload_filename: str | None = None,
  101. ):
  102. """Upload, poll, download, extract, and write the manifest.
  103. ``upload_filename`` overrides the multipart filename sent to
  104. docling-serve (defaults to ``source_file_path.name``). The pipeline
  105. passes the canonical, hint-stripped document name here so the
  106. bundle's ``<stem>.json`` ends up canonical too — otherwise a file
  107. named ``report.[docling].pdf`` would produce ``report.[docling].json``
  108. inside the bundle, and the adapter (which only knows the canonical
  109. ``report.pdf``) would not be able to locate it via the preferred
  110. ``<stem>.json`` lookup.
  111. Pre-condition: caller cleared ``raw_dir`` (e.g. via
  112. :func:`lightrag.parser.external.clear_dir_contents`). This method
  113. does not clean the directory itself — keeping that explicit at the
  114. ``parse_docling`` entry point.
  115. """
  116. if httpx is None:
  117. raise RuntimeError(
  118. "httpx is required for Docling parsing but is not installed"
  119. )
  120. raw_dir.mkdir(parents=True, exist_ok=True)
  121. effective_filename = upload_filename or source_file_path.name
  122. timeout = httpx.Timeout(120.0, connect=30.0)
  123. async with httpx.AsyncClient(timeout=timeout) as client:
  124. task_id = await self._submit(
  125. client, source_file_path, filename=effective_filename
  126. )
  127. await self._poll_until_done(client, task_id)
  128. payload = await self._download_zip_bytes(client, task_id)
  129. safe_extract_zip(payload, raw_dir)
  130. # Defensive: confirm the main JSON exists before anyone reads the
  131. # bundle. Look it up by the *uploaded* filename's stem — that's
  132. # what docling-serve uses to name the JSON inside the zip.
  133. select_main_json(raw_dir, Path(effective_filename))
  134. options_signature = compute_options_signature(
  135. tunable_env=snapshot_tunable_env(),
  136. fixed_constants=FIXED_CONSTANTS,
  137. )
  138. return build_and_write_docling_manifest(
  139. raw_dir,
  140. source_file_path=source_file_path,
  141. task_id=task_id,
  142. endpoint_signature=self.endpoint,
  143. engine_version=self.engine_version,
  144. options_signature=options_signature,
  145. fixed_constants=FIXED_CONSTANTS,
  146. recorded_filename=effective_filename,
  147. )
  148. # ------------------------------------------------------------------
  149. # Upload + poll + download
  150. # ------------------------------------------------------------------
  151. def _build_multipart_data(self) -> dict[str, str | list[str]]:
  152. """Form fields (everything except the file payload).
  153. Returns a ``dict`` (not a list of tuples): httpx ≥ 0.28 short-circuits
  154. non-``Mapping`` ``data`` into raw-content encoding and ignores
  155. ``files=`` entirely, producing a sync-only stream that an
  156. ``AsyncClient`` then rejects. List-valued entries are emitted as
  157. repeated form keys by ``MultipartStream``, matching docling-serve's
  158. pydantic ``List[Enum]`` form parsing. ``ocr_lang`` is omitted entirely
  159. when empty so the engine uses its own default.
  160. """
  161. data: dict[str, str | list[str]] = {
  162. "pipeline": PIPELINE,
  163. "target_type": TARGET_TYPE,
  164. "image_export_mode": IMAGE_EXPORT_MODE,
  165. "do_ocr": _bool_form(self.do_ocr),
  166. "force_ocr": _bool_form(self.force_ocr),
  167. "ocr_engine": self.ocr_engine,
  168. "ocr_preset": self.ocr_preset,
  169. "do_formula_enrichment": _bool_form(self.do_formula_enrichment),
  170. "to_formats": list(TO_FORMATS),
  171. }
  172. if self.ocr_lang_raw:
  173. langs = _parse_ocr_lang(self.ocr_lang_raw)
  174. if langs:
  175. data["ocr_lang"] = langs
  176. return data
  177. async def _submit(
  178. self,
  179. client: "httpx.AsyncClient",
  180. source_file_path: Path,
  181. *,
  182. filename: str,
  183. ) -> str:
  184. url = f"{self.endpoint}{CONVERT_PATH}"
  185. # Hand httpx a file object so its MultipartStream reads the body in
  186. # chunks instead of materializing the whole PDF/PPTX in worker memory.
  187. # With ``max_parallel_parse_docling > 1`` a per-doc bytes copy can
  188. # OOM the worker before docling-serve ever sees the request.
  189. with source_file_path.open("rb") as fh:
  190. files = {"files": (filename, fh, "application/octet-stream")}
  191. resp = await client.post(
  192. url, data=self._build_multipart_data(), files=files
  193. )
  194. raise_for_status_with_detail(resp, f"Docling upload for {filename!r}")
  195. payload = resp.json() if resp.text else {}
  196. task_id = str(payload.get("task_id") or payload.get("id") or "").strip()
  197. if not task_id:
  198. raise RuntimeError(f"Docling upload response missing task_id: {payload!r}")
  199. return task_id
  200. async def _poll_until_done(
  201. self,
  202. client: "httpx.AsyncClient",
  203. task_id: str,
  204. ) -> None:
  205. url = f"{self.endpoint}{POLL_PATH.format(task_id=task_id)}"
  206. params = {"wait": self.poll_wait_seconds}
  207. for _ in range(self.max_poll_attempts):
  208. iteration_started = time.monotonic()
  209. resp = await client.get(url, params=params)
  210. raise_for_status_with_detail(resp, f"Docling task {task_id} poll")
  211. payload = resp.json() if resp.text else {}
  212. status = str(
  213. payload.get("task_status") or payload.get("status") or ""
  214. ).lower()
  215. if status in SUCCESS_STATES:
  216. return
  217. if status in FAILURE_STATES:
  218. raise RuntimeError(_format_failure(task_id, status, payload))
  219. if status not in IN_PROGRESS_STATES:
  220. # Unknown status: keep polling, but surface it so operators notice.
  221. logger.warning(
  222. "[docling] unknown task status %r for task %s; continuing to poll",
  223. status,
  224. task_id,
  225. )
  226. # The intended cadence is one poll per ``poll_wait_seconds`` — the
  227. # design relies on docling-serve's ``?wait=N`` long-polling for
  228. # that. Some deployments return immediately instead, which would
  229. # burn through ``max_poll_attempts`` in milliseconds and fail
  230. # with a spurious timeout. Cap each iteration at the configured
  231. # interval ourselves so the total budget holds either way.
  232. elapsed = time.monotonic() - iteration_started
  233. remaining = self.poll_wait_seconds - elapsed
  234. if remaining > 0:
  235. await asyncio.sleep(remaining)
  236. raise TimeoutError(f"Docling task {task_id} polling timeout")
  237. async def _download_zip_bytes(
  238. self,
  239. client: "httpx.AsyncClient",
  240. task_id: str,
  241. ) -> bytes:
  242. url = f"{self.endpoint}{RESULT_PATH.format(task_id=task_id)}"
  243. resp = await client.get(url)
  244. raise_for_status_with_detail(resp, f"Docling result {task_id} download")
  245. ctype = resp.headers.get("content-type", "")
  246. if "zip" not in ctype.lower():
  247. raise RuntimeError(
  248. f"Docling result {task_id} returned non-zip content-type "
  249. f"{ctype!r}; body prefix={resp.text[:400]!r}"
  250. )
  251. return resp.content
  252. # ---------------------------------------------------------------------------
  253. # Helpers
  254. # ---------------------------------------------------------------------------
  255. def _bool_form(v: bool) -> str:
  256. return "true" if v else "false"
  257. def _parse_ocr_lang(raw: str) -> list[str]:
  258. """Best-effort parser for ``DOCLING_OCR_LANG``.
  259. Accepts a JSON array (``["en","zh"]``) or a comma-separated list
  260. (``en,zh``). Returns a list of stripped non-empty strings; empty in →
  261. empty out.
  262. """
  263. try:
  264. parsed = json.loads(raw)
  265. except json.JSONDecodeError:
  266. parsed = None
  267. if isinstance(parsed, list):
  268. return [str(x).strip() for x in parsed if str(x).strip()]
  269. return [item.strip() for item in raw.split(",") if item.strip()]
  270. def _format_failure(task_id: str, status: str, payload: Any) -> str:
  271. if isinstance(payload, dict):
  272. err = (
  273. payload.get("error_message")
  274. or payload.get("error")
  275. or payload.get("message")
  276. or "<no error_message>"
  277. )
  278. else:
  279. err = "<no error_message>"
  280. truncated = json.dumps(payload, ensure_ascii=False)[:400]
  281. return f"Docling task {task_id} ended in {status}: {err}; payload={truncated}"
  282. __all__ = [
  283. "DoclingRawClient",
  284. "CONVERT_PATH",
  285. "DEFAULT_MAX_POLLS",
  286. "DEFAULT_POLL_WAIT_SECONDS",
  287. "FIXED_CONSTANTS",
  288. "IMAGE_EXPORT_MODE",
  289. "PIPELINE",
  290. "POLL_PATH",
  291. "RESULT_PATH",
  292. "SUCCESS_STATES",
  293. "FAILURE_STATES",
  294. "TARGET_TYPE",
  295. "TO_FORMATS",
  296. ]