| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- """Docling raw bundle downloader.
- Talks to Docling Serve v1 over HTTP:
- - ``POST /v1/convert/file/async`` — multipart upload, returns ``task_id``,
- - ``GET /v1/status/poll/{task_id}?wait=5`` — long-poll for terminal state,
- - ``GET /v1/result/{task_id}`` — zip download (only on ``success``).
- The zip is extracted safely under ``raw_dir/`` (refusing path traversal /
- absolute entries). A success manifest is written atomically at the very
- end; mid-run crashes therefore leave the directory in a state the cache
- layer marks as invalid (no manifest → miss → re-download).
- Pipeline constants (``pipeline``, ``target_type``, ``to_formats``,
- ``image_export_mode``) are intentionally **not** env-driven — the sidecar
- flow depends on them — and are recorded inside the manifest so a future
- code change automatically invalidates pre-existing caches.
- """
- from __future__ import annotations
- import asyncio
- import json
- import os
- import time
- from pathlib import Path
- from typing import TYPE_CHECKING, Any
- from lightrag.parser.external._common import (
- env_bool,
- env_int,
- raise_for_status_with_detail,
- )
- from lightrag.parser.external._zip import safe_extract_zip
- from lightrag.parser.external.docling.cache import (
- compute_options_signature,
- current_endpoint_signature,
- snapshot_tunable_env,
- )
- from lightrag.parser.external.docling.manifest import (
- build_and_write_docling_manifest,
- select_main_json,
- )
- from lightrag.utils import logger
- if TYPE_CHECKING:
- import httpx
- else:
- try:
- import httpx
- except ImportError: # pragma: no cover
- httpx = None
- # ---------------------------------------------------------------------------
- # Fixed pipeline constants (NOT env-driven)
- # ---------------------------------------------------------------------------
- PIPELINE = "standard"
- TARGET_TYPE = "zip"
- TO_FORMATS: tuple[str, ...] = ("json", "md")
- IMAGE_EXPORT_MODE = "referenced"
- FIXED_CONSTANTS: dict[str, object] = {
- "pipeline": PIPELINE,
- "target_type": TARGET_TYPE,
- "to_formats": list(TO_FORMATS),
- "image_export_mode": IMAGE_EXPORT_MODE,
- }
- CONVERT_PATH = "/v1/convert/file/async"
- POLL_PATH = "/v1/status/poll/{task_id}"
- RESULT_PATH = "/v1/result/{task_id}"
- DEFAULT_POLL_WAIT_SECONDS = 5
- DEFAULT_MAX_POLLS = 240 # 240 * 5s long-poll ≈ 20 min worst case
- # ConversionStatus enum from the docling-serve OpenAPI
- SUCCESS_STATES = {"success"}
- FAILURE_STATES = {"failure", "partial_success", "skipped"}
- IN_PROGRESS_STATES = {"pending", "started"}
- class DoclingRawClient:
- """Downloads docling-serve bundles into ``raw_dir``.
- Construct once per parse call (cheap). Reads ``DOCLING_*`` envs at
- ``__init__`` time, so callers can flip env between calls and pick up
- the new values without holding a stale instance.
- """
- def __init__(self) -> None:
- self.endpoint = current_endpoint_signature()
- if not self.endpoint:
- raise ValueError("DOCLING_ENDPOINT is required")
- self.engine_version = os.getenv("DOCLING_ENGINE_VERSION", "").strip()
- self.do_ocr = env_bool("DOCLING_DO_OCR", True)
- self.force_ocr = env_bool("DOCLING_FORCE_OCR", True)
- self.ocr_engine = os.getenv("DOCLING_OCR_ENGINE", "auto").strip() or "auto"
- self.ocr_preset = os.getenv("DOCLING_OCR_PRESET", "auto").strip() or "auto"
- self.ocr_lang_raw = os.getenv("DOCLING_OCR_LANG", "").strip()
- self.do_formula_enrichment = env_bool("DOCLING_DO_FORMULA_ENRICHMENT", False)
- # Poll cadence: docling-serve's ``?wait=N`` is a server-side long-poll
- # window. ``DOCLING_POLL_INTERVAL_SECONDS`` sets that window; the
- # client does NOT add its own sleep between polls. ``DOCLING_MAX_POLLS``
- # bounds the total polling budget — exceeding it raises ``TimeoutError``.
- wait = env_int("DOCLING_POLL_INTERVAL_SECONDS", DEFAULT_POLL_WAIT_SECONDS)
- self.poll_wait_seconds = wait if wait > 0 else DEFAULT_POLL_WAIT_SECONDS
- max_polls = env_int("DOCLING_MAX_POLLS", DEFAULT_MAX_POLLS)
- self.max_poll_attempts = max_polls if max_polls > 0 else DEFAULT_MAX_POLLS
- # ------------------------------------------------------------------
- # Public API
- # ------------------------------------------------------------------
- async def download_into(
- self,
- raw_dir: Path,
- source_file_path: Path,
- *,
- upload_filename: str | None = None,
- ):
- """Upload, poll, download, extract, and write the manifest.
- ``upload_filename`` overrides the multipart filename sent to
- docling-serve (defaults to ``source_file_path.name``). The pipeline
- passes the canonical, hint-stripped document name here so the
- bundle's ``<stem>.json`` ends up canonical too — otherwise a file
- named ``report.[docling].pdf`` would produce ``report.[docling].json``
- inside the bundle, and the adapter (which only knows the canonical
- ``report.pdf``) would not be able to locate it via the preferred
- ``<stem>.json`` lookup.
- Pre-condition: caller cleared ``raw_dir`` (e.g. via
- :func:`lightrag.parser.external.clear_dir_contents`). This method
- does not clean the directory itself — keeping that explicit at the
- ``parse_docling`` entry point.
- """
- if httpx is None:
- raise RuntimeError(
- "httpx is required for Docling parsing but is not installed"
- )
- raw_dir.mkdir(parents=True, exist_ok=True)
- effective_filename = upload_filename or source_file_path.name
- timeout = httpx.Timeout(120.0, connect=30.0)
- async with httpx.AsyncClient(timeout=timeout) as client:
- task_id = await self._submit(
- client, source_file_path, filename=effective_filename
- )
- await self._poll_until_done(client, task_id)
- payload = await self._download_zip_bytes(client, task_id)
- safe_extract_zip(payload, raw_dir)
- # Defensive: confirm the main JSON exists before anyone reads the
- # bundle. Look it up by the *uploaded* filename's stem — that's
- # what docling-serve uses to name the JSON inside the zip.
- select_main_json(raw_dir, Path(effective_filename))
- options_signature = compute_options_signature(
- tunable_env=snapshot_tunable_env(),
- fixed_constants=FIXED_CONSTANTS,
- )
- return build_and_write_docling_manifest(
- raw_dir,
- source_file_path=source_file_path,
- task_id=task_id,
- endpoint_signature=self.endpoint,
- engine_version=self.engine_version,
- options_signature=options_signature,
- fixed_constants=FIXED_CONSTANTS,
- recorded_filename=effective_filename,
- )
- # ------------------------------------------------------------------
- # Upload + poll + download
- # ------------------------------------------------------------------
- def _build_multipart_data(self) -> dict[str, str | list[str]]:
- """Form fields (everything except the file payload).
- Returns a ``dict`` (not a list of tuples): httpx ≥ 0.28 short-circuits
- non-``Mapping`` ``data`` into raw-content encoding and ignores
- ``files=`` entirely, producing a sync-only stream that an
- ``AsyncClient`` then rejects. List-valued entries are emitted as
- repeated form keys by ``MultipartStream``, matching docling-serve's
- pydantic ``List[Enum]`` form parsing. ``ocr_lang`` is omitted entirely
- when empty so the engine uses its own default.
- """
- data: dict[str, str | list[str]] = {
- "pipeline": PIPELINE,
- "target_type": TARGET_TYPE,
- "image_export_mode": IMAGE_EXPORT_MODE,
- "do_ocr": _bool_form(self.do_ocr),
- "force_ocr": _bool_form(self.force_ocr),
- "ocr_engine": self.ocr_engine,
- "ocr_preset": self.ocr_preset,
- "do_formula_enrichment": _bool_form(self.do_formula_enrichment),
- "to_formats": list(TO_FORMATS),
- }
- if self.ocr_lang_raw:
- langs = _parse_ocr_lang(self.ocr_lang_raw)
- if langs:
- data["ocr_lang"] = langs
- return data
- async def _submit(
- self,
- client: "httpx.AsyncClient",
- source_file_path: Path,
- *,
- filename: str,
- ) -> str:
- url = f"{self.endpoint}{CONVERT_PATH}"
- # Hand httpx a file object so its MultipartStream reads the body in
- # chunks instead of materializing the whole PDF/PPTX in worker memory.
- # With ``max_parallel_parse_docling > 1`` a per-doc bytes copy can
- # OOM the worker before docling-serve ever sees the request.
- with source_file_path.open("rb") as fh:
- files = {"files": (filename, fh, "application/octet-stream")}
- resp = await client.post(
- url, data=self._build_multipart_data(), files=files
- )
- raise_for_status_with_detail(resp, f"Docling upload for {filename!r}")
- payload = resp.json() if resp.text else {}
- task_id = str(payload.get("task_id") or payload.get("id") or "").strip()
- if not task_id:
- raise RuntimeError(f"Docling upload response missing task_id: {payload!r}")
- return task_id
- async def _poll_until_done(
- self,
- client: "httpx.AsyncClient",
- task_id: str,
- ) -> None:
- url = f"{self.endpoint}{POLL_PATH.format(task_id=task_id)}"
- params = {"wait": self.poll_wait_seconds}
- for _ in range(self.max_poll_attempts):
- iteration_started = time.monotonic()
- resp = await client.get(url, params=params)
- raise_for_status_with_detail(resp, f"Docling task {task_id} poll")
- payload = resp.json() if resp.text else {}
- status = str(
- payload.get("task_status") or payload.get("status") or ""
- ).lower()
- if status in SUCCESS_STATES:
- return
- if status in FAILURE_STATES:
- raise RuntimeError(_format_failure(task_id, status, payload))
- if status not in IN_PROGRESS_STATES:
- # Unknown status: keep polling, but surface it so operators notice.
- logger.warning(
- "[docling] unknown task status %r for task %s; continuing to poll",
- status,
- task_id,
- )
- # The intended cadence is one poll per ``poll_wait_seconds`` — the
- # design relies on docling-serve's ``?wait=N`` long-polling for
- # that. Some deployments return immediately instead, which would
- # burn through ``max_poll_attempts`` in milliseconds and fail
- # with a spurious timeout. Cap each iteration at the configured
- # interval ourselves so the total budget holds either way.
- elapsed = time.monotonic() - iteration_started
- remaining = self.poll_wait_seconds - elapsed
- if remaining > 0:
- await asyncio.sleep(remaining)
- raise TimeoutError(f"Docling task {task_id} polling timeout")
- async def _download_zip_bytes(
- self,
- client: "httpx.AsyncClient",
- task_id: str,
- ) -> bytes:
- url = f"{self.endpoint}{RESULT_PATH.format(task_id=task_id)}"
- resp = await client.get(url)
- raise_for_status_with_detail(resp, f"Docling result {task_id} download")
- ctype = resp.headers.get("content-type", "")
- if "zip" not in ctype.lower():
- raise RuntimeError(
- f"Docling result {task_id} returned non-zip content-type "
- f"{ctype!r}; body prefix={resp.text[:400]!r}"
- )
- return resp.content
- # ---------------------------------------------------------------------------
- # Helpers
- # ---------------------------------------------------------------------------
- def _bool_form(v: bool) -> str:
- return "true" if v else "false"
- def _parse_ocr_lang(raw: str) -> list[str]:
- """Best-effort parser for ``DOCLING_OCR_LANG``.
- Accepts a JSON array (``["en","zh"]``) or a comma-separated list
- (``en,zh``). Returns a list of stripped non-empty strings; empty in →
- empty out.
- """
- try:
- parsed = json.loads(raw)
- except json.JSONDecodeError:
- parsed = None
- if isinstance(parsed, list):
- return [str(x).strip() for x in parsed if str(x).strip()]
- return [item.strip() for item in raw.split(",") if item.strip()]
- def _format_failure(task_id: str, status: str, payload: Any) -> str:
- if isinstance(payload, dict):
- err = (
- payload.get("error_message")
- or payload.get("error")
- or payload.get("message")
- or "<no error_message>"
- )
- else:
- err = "<no error_message>"
- truncated = json.dumps(payload, ensure_ascii=False)[:400]
- return f"Docling task {task_id} ended in {status}: {err}; payload={truncated}"
- __all__ = [
- "DoclingRawClient",
- "CONVERT_PATH",
- "DEFAULT_MAX_POLLS",
- "DEFAULT_POLL_WAIT_SECONDS",
- "FIXED_CONSTANTS",
- "IMAGE_EXPORT_MODE",
- "PIPELINE",
- "POLL_PATH",
- "RESULT_PATH",
- "SUCCESS_STATES",
- "FAILURE_STATES",
- "TARGET_TYPE",
- "TO_FORMATS",
- ]
|