| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562 |
- """Tests for :class:`DoclingRawClient`.
- Cover the contract guarantees that protect the sidecar pipeline:
- - the fixed pipeline constants (``pipeline=standard`` / ``target_type=zip``
- / ``to_formats=[json,md]`` / ``image_export_mode=referenced``) are sent
- on every upload, regardless of env;
- - terminal non-success states (``failure`` / ``partial_success`` /
- ``skipped``) abort the run **before** any result download;
- - ``DOCLING_OCR_LANG`` is omitted when empty so docling-serve falls back
- to its own default.
- Uses an in-process fake httpx client mirroring ``tests/parser/external/mineru/test_client.py``
- so we don't trip httpx's sync/async stream guard on multipart uploads.
- """
- from __future__ import annotations
- import io
- import json
- import zipfile
- from pathlib import Path
- from typing import Any
- import pytest
- from lightrag.parser.external.docling.client import (
- CONVERT_PATH,
- POLL_PATH,
- RESULT_PATH,
- DoclingRawClient,
- )
- # ---------------------------------------------------------------------------
- # Minimal httpx fake (no MockTransport — avoids the multipart encode path)
- # ---------------------------------------------------------------------------
- class _FakeResponse:
- def __init__(
- self,
- *,
- status_code: int = 200,
- text: str = "",
- content: bytes = b"",
- headers: dict[str, str] | None = None,
- ) -> None:
- self.status_code = status_code
- self.text = text
- self.content = content or text.encode("utf-8")
- self.headers = headers or {}
- def json(self) -> Any:
- return json.loads(self.text) if self.text else {}
- def raise_for_status(self) -> None:
- if self.status_code >= 400:
- raise RuntimeError(f"HTTP {self.status_code}")
- class _Recorder:
- def __init__(
- self,
- *,
- terminal_status: str,
- zip_bytes: bytes,
- task_id: str = "task-abc",
- submit_status_code: int = 200,
- submit_text: str | None = None,
- poll_status_code: int = 200,
- poll_text: str | None = None,
- result_status_code: int = 200,
- result_text: str | None = None,
- ) -> None:
- self.terminal_status = terminal_status
- self.zip_bytes = zip_bytes
- self.task_id = task_id
- self.submit_status_code = submit_status_code
- self.submit_text = submit_text
- self.poll_status_code = poll_status_code
- self.poll_text = poll_text
- self.result_status_code = result_status_code
- self.result_text = result_text
- self.post_calls: list[dict] = []
- self.get_calls: list[dict] = []
- self.result_calls = 0
- _CURRENT: dict[str, _Recorder] = {}
- class _FakeAsyncClient:
- def __init__(self, *_: Any, **__: Any) -> None:
- pass
- async def __aenter__(self) -> "_FakeAsyncClient":
- return self
- async def __aexit__(self, *_: Any) -> None:
- pass
- async def post(
- self,
- url: str,
- files: Any = None,
- data: Any = None,
- json: Any = None,
- headers: Any = None,
- ) -> _FakeResponse:
- recorder = _CURRENT["recorder"]
- # Production passes a file handle inside a `with` block — by the time
- # tests inspect `post_calls` it's already closed. Drain the stream
- # here so assertions can keep reading the payload as bytes.
- snapshot_files = files
- if files and "files" in files:
- name, payload, ctype = files["files"]
- if hasattr(payload, "read"):
- payload = payload.read()
- snapshot_files = {"files": (name, payload, ctype)}
- recorder.post_calls.append(
- {"url": url, "files": snapshot_files, "data": data, "json": json}
- )
- if CONVERT_PATH in url:
- if recorder.submit_status_code != 200:
- return _FakeResponse(
- status_code=recorder.submit_status_code,
- text=recorder.submit_text or "",
- )
- return _FakeResponse(
- status_code=200,
- text=json_dump({"task_id": recorder.task_id}),
- )
- raise AssertionError(f"unexpected POST {url}")
- async def get(
- self, url: str, params: Any = None, headers: Any = None
- ) -> _FakeResponse:
- recorder = _CURRENT["recorder"]
- recorder.get_calls.append({"url": url, "params": params})
- if POLL_PATH.format(task_id=recorder.task_id) in url:
- if recorder.poll_status_code != 200:
- return _FakeResponse(
- status_code=recorder.poll_status_code,
- text=recorder.poll_text or "",
- )
- payload: dict[str, Any] = {
- "task_id": recorder.task_id,
- "task_status": recorder.terminal_status,
- }
- if recorder.terminal_status != "success":
- payload["error_message"] = "synthetic-failure"
- return _FakeResponse(status_code=200, text=json_dump(payload))
- if RESULT_PATH.format(task_id=recorder.task_id) in url:
- recorder.result_calls += 1
- if recorder.result_status_code != 200:
- return _FakeResponse(
- status_code=recorder.result_status_code,
- text=recorder.result_text or "",
- )
- return _FakeResponse(
- status_code=200,
- content=recorder.zip_bytes,
- headers={"content-type": "application/zip"},
- )
- raise AssertionError(f"unexpected GET {url}")
- def json_dump(payload: Any) -> str:
- return json.dumps(payload)
- def _form_pairs(data: Any) -> list[tuple[str, str]]:
- """Normalize httpx form data into repeated ``(name, value)`` pairs.
- Production passes a mapping so httpx 0.28 keeps multipart ``files=`` on
- the async path. List values in that mapping represent repeated form keys.
- Older tests used tuple lists directly; accepting both keeps assertions
- focused on the wire contract instead of the container type.
- """
- if isinstance(data, dict):
- pairs: list[tuple[str, str]] = []
- for name, value in data.items():
- values = value if isinstance(value, list) else [value]
- pairs.extend((str(name), str(v)) for v in values)
- return pairs
- return [(str(name), str(value)) for name, value in data]
- def _fake_zip_with_main_json(stem: str) -> bytes:
- buf = io.BytesIO()
- with zipfile.ZipFile(buf, "w") as zf:
- zf.writestr(f"{stem}.json", b'{"schema_name": "DoclingDocument"}')
- zf.writestr(f"{stem}.md", b"# hello")
- return buf.getvalue()
- def _install_fake_httpx(monkeypatch: pytest.MonkeyPatch) -> None:
- """Replace ``httpx.AsyncClient`` and ``httpx.Timeout`` references in
- the docling client module with no-arg fakes."""
- monkeypatch.setattr(
- "lightrag.parser.external.docling.client.httpx.AsyncClient",
- _FakeAsyncClient,
- )
- monkeypatch.setattr(
- "lightrag.parser.external.docling.client.httpx.Timeout",
- lambda *a, **kw: None,
- )
- # ---------------------------------------------------------------------------
- # Fixtures
- # ---------------------------------------------------------------------------
- @pytest.fixture
- def source_pdf(tmp_path: Path) -> Path:
- p = tmp_path / "demo.pdf"
- p.write_bytes(b"%PDF-1.4 fake")
- return p
- @pytest.fixture(autouse=True)
- def docling_endpoint(monkeypatch: pytest.MonkeyPatch) -> None:
- monkeypatch.setenv("DOCLING_ENDPOINT", "http://docling.test")
- for name in (
- "DOCLING_DO_OCR",
- "DOCLING_FORCE_OCR",
- "DOCLING_OCR_ENGINE",
- "DOCLING_OCR_PRESET",
- "DOCLING_OCR_LANG",
- "DOCLING_DO_FORMULA_ENRICHMENT",
- "DOCLING_ENGINE_VERSION",
- ):
- monkeypatch.delenv(name, raising=False)
- # ---------------------------------------------------------------------------
- # Tests
- # ---------------------------------------------------------------------------
- async def test_docling_client_sends_fixed_constants(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- raw_dir = tmp_path / "demo.docling_raw"
- manifest = await DoclingRawClient().download_into(raw_dir, source_pdf)
- assert len(recorder.post_calls) == 1
- data = recorder.post_calls[0]["data"]
- field_map: dict[str, list[str]] = {}
- for name, value in _form_pairs(data):
- field_map.setdefault(name, []).append(value)
- assert field_map["pipeline"] == ["standard"]
- assert field_map["target_type"] == ["zip"]
- assert field_map["image_export_mode"] == ["referenced"]
- assert sorted(field_map["to_formats"]) == ["json", "md"]
- files = recorder.post_calls[0]["files"]
- assert "files" in files
- name, blob, ctype = files["files"]
- assert name == "demo.pdf"
- assert blob.startswith(b"%PDF-1.4")
- assert ctype == "application/octet-stream"
- assert manifest.task_id == recorder.task_id
- assert manifest.engine == "docling"
- assert manifest.extras["fixed_constants"]["pipeline"] == "standard"
- assert manifest.endpoint_signature == "http://docling.test"
- async def test_docling_client_partial_success_aborts(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="partial_success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError) as excinfo:
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- msg = str(excinfo.value)
- assert recorder.task_id in msg
- assert "partial_success" in msg
- assert "synthetic-failure" in msg
- assert recorder.result_calls == 0
- async def test_docling_client_failure_aborts(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="failure",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError):
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- assert recorder.result_calls == 0
- async def test_docling_client_skipped_aborts(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="skipped",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError):
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- assert recorder.result_calls == 0
- async def test_docling_client_upload_http_error_preserves_response_body(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- submit_status_code=400,
- submit_text=json_dump({"detail": "unsupported file type"}),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError) as excinfo:
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- message = str(excinfo.value)
- assert "Docling upload for 'demo.pdf'" in message
- assert "HTTP 400" in message
- assert "unsupported file type" in message
- async def test_docling_client_poll_http_error_preserves_response_body(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- poll_status_code=503,
- poll_text=json_dump({"message": "queue unavailable"}),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError) as excinfo:
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- message = str(excinfo.value)
- assert "Docling task task-abc poll" in message
- assert "HTTP 503" in message
- assert "queue unavailable" in message
- async def test_docling_client_result_redirect_treated_as_error(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- # docling-serve fronted by a misconfigured proxy could emit a 302 to a
- # CDN that httpx (default ``follow_redirects=False``) won't follow.
- # Without the explicit non-2xx guard the redirect body would fall into
- # the zip-decoder and surface as a cryptic "bad zip" error.
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- result_status_code=302,
- result_text="",
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError) as excinfo:
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- message = str(excinfo.value)
- assert "Docling result task-abc download" in message
- assert "HTTP 302" in message
- async def test_docling_client_result_http_error_preserves_response_body(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- result_status_code=500,
- result_text="zip artifact missing",
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- with pytest.raises(RuntimeError) as excinfo:
- await DoclingRawClient().download_into(
- tmp_path / "demo.docling_raw", source_pdf
- )
- message = str(excinfo.value)
- assert "Docling result task-abc download" in message
- assert "HTTP 500" in message
- assert "zip artifact missing" in message
- async def test_docling_client_ocr_lang_omitted_when_empty(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- await DoclingRawClient().download_into(tmp_path / "demo.docling_raw", source_pdf)
- data = recorder.post_calls[0]["data"]
- names = [name for name, _ in _form_pairs(data)]
- assert "ocr_lang" not in names
- async def test_docling_client_ocr_lang_sent_when_set(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- monkeypatch.setenv("DOCLING_OCR_LANG", '["en","zh"]')
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- await DoclingRawClient().download_into(tmp_path / "demo.docling_raw", source_pdf)
- data = recorder.post_calls[0]["data"]
- langs = [v for name, v in _form_pairs(data) if name == "ocr_lang"]
- assert langs == ["en", "zh"]
- async def test_docling_client_ocr_lang_csv_form(
- monkeypatch: pytest.MonkeyPatch,
- tmp_path: Path,
- source_pdf: Path,
- ) -> None:
- """CSV fallback when value isn't valid JSON."""
- monkeypatch.setenv("DOCLING_OCR_LANG", "en, fr")
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- await DoclingRawClient().download_into(tmp_path / "demo.docling_raw", source_pdf)
- data = recorder.post_calls[0]["data"]
- langs = [v for name, v in _form_pairs(data) if name == "ocr_lang"]
- assert langs == ["en", "fr"]
- async def test_docling_client_rejects_missing_endpoint(
- monkeypatch: pytest.MonkeyPatch,
- ) -> None:
- monkeypatch.setenv("DOCLING_ENDPOINT", "")
- with pytest.raises(ValueError, match="DOCLING_ENDPOINT"):
- DoclingRawClient()
- async def test_docling_client_strips_parser_hint_from_upload_filename(
- monkeypatch: pytest.MonkeyPatch, tmp_path: Path
- ) -> None:
- # Regression: a hinted source (``report.[docling].pdf``) used to cause
- # docling-serve to name its bundle JSON ``report.[docling].json``, which
- # the adapter (looking for ``report.json``) could not locate. The
- # pipeline now passes the canonical name as ``upload_filename`` so the
- # bundle is canonical-stem from the start.
- hinted = tmp_path / "report.[docling].pdf"
- hinted.write_bytes(b"%PDF-1.4 fake")
- # The fake zip mimics docling-serve responding with the *canonical* stem,
- # which is what would happen once we send the canonical filename.
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("report"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- raw_dir = tmp_path / "report.docling_raw"
- manifest = await DoclingRawClient().download_into(
- raw_dir, hinted, upload_filename="report.pdf"
- )
- name, _blob, _ctype = recorder.post_calls[0]["files"]["files"]
- assert name == "report.pdf"
- assert manifest.source_filename_at_parse == "report.pdf"
- assert manifest.critical_file.path == "report.json"
- assert (raw_dir / "report.json").is_file()
- async def test_docling_client_default_upload_filename_falls_back_to_source_name(
- monkeypatch: pytest.MonkeyPatch, tmp_path: Path, source_pdf: Path
- ) -> None:
- # Back-compat guard: callers that don't pass ``upload_filename`` (any
- # path other than the production pipeline) keep the legacy behavior of
- # using the on-disk source filename.
- recorder = _Recorder(
- terminal_status="success",
- zip_bytes=_fake_zip_with_main_json("demo"),
- )
- _CURRENT["recorder"] = recorder
- _install_fake_httpx(monkeypatch)
- await DoclingRawClient().download_into(tmp_path / "demo.docling_raw", source_pdf)
- name, _blob, _ctype = recorder.post_calls[0]["files"]["files"]
- assert name == "demo.pdf"
|