| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984 |
- """End-to-end offline tests for the analyze_multimodal pipeline.
- Covers the contract introduced by the LR2 multimodal rewrite:
- 1. Drawings route to the VLM role; tables/equations route to the EXTRACT
- role. ``VLM_PROCESS_ENABLE=False`` (or a missing VLM role) is a hard
- failure for image-enabled documents.
- 2. ``llm_analyze_result`` uses the new ``status / message / analyze_time``
- shape with modality-specific required fields
- (``name/type/description`` for images, ``name/description`` for tables,
- ``name/equation/description`` for equations).
- 3. Each VLM/EXTRACT call carries ``_priority=DEFAULT_MM_ANALYSIS_PRIORITY``
- and ``image_inputs`` (not legacy ``messages``).
- 4. Analysis cache ids are written back to the sidecar item's
- ``llm_cache_list`` so document deletion can clean them up.
- 5. Images smaller than ``VLM_MIN_IMAGE_PIXEL`` (32 px) and unsupported
- formats are pre-emptively flagged ``status="skipped"`` without an LLM
- call.
- 6. Invalid model JSON is a hard failure — the sidecar carries
- ``status="failure"`` and :class:`MultimodalAnalysisError` propagates.
- """
- from __future__ import annotations
- import base64
- import json
- import logging
- import struct
- import zlib
- from pathlib import Path
- import numpy as np
- import pytest
- from lightrag import LightRAG, ROLES, RoleLLMConfig
- from lightrag.exceptions import MultimodalAnalysisError
- from lightrag.utils import EmbeddingFunc, Tokenizer
- @pytest.fixture
- def _propagate_lightrag_logger(monkeypatch):
- monkeypatch.setattr(logging.getLogger("lightrag"), "propagate", True)
- pytestmark = pytest.mark.offline
- def _png_bytes(width: int, height: int) -> bytes:
- """Build a minimal but parser-accepted PNG with the given dimensions."""
- signature = b"\x89PNG\r\n\x1a\n"
- ihdr_payload = struct.pack(">II", width, height) + b"\x08\x06\x00\x00\x00"
- ihdr_crc = zlib.crc32(b"IHDR" + ihdr_payload).to_bytes(4, "big")
- ihdr = struct.pack(">I", len(ihdr_payload)) + b"IHDR" + ihdr_payload + ihdr_crc
- idat_raw = b"\x00" * (width * height * 4 + height)
- idat_compressed = zlib.compress(idat_raw)
- idat_crc = zlib.crc32(b"IDAT" + idat_compressed).to_bytes(4, "big")
- idat = (
- struct.pack(">I", len(idat_compressed)) + b"IDAT" + idat_compressed + idat_crc
- )
- iend = b"\x00\x00\x00\x00IEND\xaeB`\x82"
- return signature + ihdr + idat + iend
- PNG_BYTES = _png_bytes(64, 64)
- TINY_PNG_BYTES = _png_bytes(8, 8)
- class _SimpleTokenizerImpl:
- def encode(self, content: str) -> list[int]:
- return [ord(ch) for ch in content]
- def decode(self, tokens: list[int]) -> str:
- return "".join(chr(t) for t in tokens)
- async def _mock_embedding(texts: list[str]) -> np.ndarray:
- return np.random.rand(len(texts), 8)
- def _make_vlm_mock(call_log: list[dict]):
- async def vlm_func(prompt, **kwargs):
- call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
- return json.dumps(
- {
- "name": "fig-1",
- "type": "Chart",
- "description": "concise figure description",
- }
- )
- return vlm_func
- def _make_extract_mock(call_log: list[dict]):
- async def extract_func(prompt, **kwargs):
- call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
- return json.dumps(
- {
- "name": "tbl-1",
- "description": "table summary",
- }
- )
- return extract_func
- def _build_rag(
- tmp_path: Path,
- *,
- vlm_process_enable: bool = True,
- vlm_func=None,
- extract_func=None,
- ) -> LightRAG:
- role_configs = {}
- for spec in ROLES:
- if spec.name == "vlm" and vlm_func is not None:
- role_configs[spec.name] = RoleLLMConfig(func=vlm_func)
- elif spec.name == "extract" and extract_func is not None:
- role_configs[spec.name] = RoleLLMConfig(func=extract_func)
- else:
- role_configs[spec.name] = RoleLLMConfig()
- base_func = vlm_func or extract_func
- return LightRAG(
- working_dir=str(tmp_path),
- workspace=f"vlm-pipeline-{tmp_path.name}",
- llm_model_func=base_func,
- embedding_func=EmbeddingFunc(
- embedding_dim=8,
- max_token_size=1024,
- func=_mock_embedding,
- ),
- tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
- vlm_process_enable=vlm_process_enable,
- role_llm_configs=role_configs,
- )
- def _write_sidecar_fixtures(tmp_path: Path) -> tuple[str, dict, Path]:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- image_path = parsed_dir / "fig1.png"
- image_path.write_bytes(PNG_BYTES)
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
- encoding="utf-8",
- )
- sidecar_path = parsed_dir / "doc.drawings.json"
- sidecar_path.write_text(
- json.dumps(
- {
- "drawings": {
- "im-001": {
- "caption": "Figure 1",
- "path": str(image_path),
- }
- }
- }
- ),
- encoding="utf-8",
- )
- parsed_data = {"blocks_path": str(blocks_path)}
- return "doc-1", parsed_data, sidecar_path
- @pytest.mark.asyncio
- async def test_vlm_process_enable_false_hard_fails_for_images(
- tmp_path, caplog, _propagate_lightrag_logger
- ):
- """With i opted-in but VLM disabled, analyze_multimodal must hard-fail
- the document rather than silently skipping."""
- call_log: list[dict] = []
- rag = _build_rag(
- tmp_path, vlm_process_enable=False, vlm_func=_make_vlm_mock(call_log)
- )
- await rag.initialize_storages()
- try:
- doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
- with pytest.raises(MultimodalAnalysisError):
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- # VLM mock must not be invoked when the master switch is off.
- assert call_log == []
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- item = payload["drawings"]["im-001"]
- assert item["llm_analyze_result"]["status"] == "failure"
- assert "VLM" in item["llm_analyze_result"]["message"]
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_vlm_disabled_then_enabled_reprocesses_item(tmp_path):
- """After the first run hard-failed under VLM=disabled, flipping the
- switch must re-invoke the VLM and overwrite the persisted failure."""
- call_log: list[dict] = []
- vlm_func = _make_vlm_mock(call_log)
- rag_off = _build_rag(tmp_path, vlm_process_enable=False, vlm_func=vlm_func)
- await rag_off.initialize_storages()
- try:
- doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
- with pytest.raises(MultimodalAnalysisError):
- await rag_off.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert call_log == []
- finally:
- await rag_off.finalize_storages()
- rag_on = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
- await rag_on.initialize_storages()
- try:
- await rag_on.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert len(call_log) == 1
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- result = payload["drawings"]["im-001"]["llm_analyze_result"]
- assert result["status"] == "success"
- assert result["type"] == "Chart"
- assert result["description"] == "concise figure description"
- assert "analyze_time" in result
- finally:
- await rag_on.finalize_storages()
- @pytest.mark.asyncio
- async def test_vlm_call_carries_image_inputs(tmp_path):
- """Sanity check the call kwargs: image_inputs (not legacy `messages`)
- must be present. The ``_priority`` argument is consumed by the role
- wrapper before reaching the raw model func, so it is not observable on
- the mock — see ``priority_limit_async_func_call`` in lightrag.utils."""
- call_log: list[dict] = []
- rag = _build_rag(
- tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
- )
- await rag.initialize_storages()
- try:
- doc_id, parsed_data, _ = _write_sidecar_fixtures(tmp_path)
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert len(call_log) == 1
- kwargs = call_log[0]["kwargs"]
- assert kwargs.get("stream") is False
- assert kwargs.get("image_inputs") is not None
- assert "messages" not in kwargs
- # _priority is consumed by the wrapper (see lightrag.utils
- # priority_limit_async_func_call); not observable on the raw mock.
- assert "_priority" not in kwargs
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_vlm_cache_hit_on_second_run(tmp_path):
- call_log: list[dict] = []
- rag = _build_rag(
- tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
- )
- await rag.initialize_storages()
- try:
- doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert len(call_log) == 1
- await rag.llm_response_cache.index_done_callback()
- cache_file = (
- Path(rag.working_dir) / rag.workspace / "kv_store_llm_response_cache.json"
- )
- cache_blob = json.loads(cache_file.read_text(encoding="utf-8"))
- analysis_keys = [
- k for k in cache_blob.keys() if k.startswith("default:analysis:")
- ]
- assert len(analysis_keys) == 1
- cache_id = analysis_keys[0]
- entry = cache_blob[cache_id]
- original_prompt = entry["original_prompt"]
- assert "<vlm_images>" in original_prompt
- # Raw base64 must NOT be embedded in the audit block.
- raw_b64 = base64.b64encode(PNG_BYTES).decode("ascii")
- assert raw_b64 not in original_prompt
- # Cache id was written back to the sidecar item for delete-time cleanup.
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- assert cache_id in payload["drawings"]["im-001"]["llm_cache_list"]
- # Re-run: analyze_multimodal always recomputes for enabled modalities,
- # but the cache key matches so the VLM is not called again.
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert len(call_log) == 1
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_image_path_resolved_relative_to_sidecar_dir(tmp_path):
- """Sidecar paths are parsed_dir-relative; the pipeline must resolve
- them against the sidecar directory before falling back to skipped."""
- call_log: list[dict] = []
- rag = _build_rag(
- tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- assets_dir = parsed_dir / "doc.blocks.assets"
- assets_dir.mkdir()
- (assets_dir / "image1.png").write_bytes(PNG_BYTES)
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
- encoding="utf-8",
- )
- sidecar_path = parsed_dir / "doc.drawings.json"
- sidecar_path.write_text(
- json.dumps(
- {
- "drawings": {
- "im-001": {
- "caption": "Figure 1",
- "path": "doc.blocks.assets/image1.png",
- }
- }
- }
- ),
- encoding="utf-8",
- )
- await rag.analyze_multimodal(
- doc_id="doc-1",
- file_path="fixture.pdf",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="i",
- )
- assert len(call_log) == 1
- assert call_log[0]["kwargs"].get("image_inputs") is not None
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- result = payload["drawings"]["im-001"]["llm_analyze_result"]
- assert result["status"] == "success"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_unsupported_vector_format_writes_skipped(tmp_path):
- """WMF/EMF/SVG and other non-raster formats short-circuit to
- status=skipped without calling the VLM."""
- call_log: list[dict] = []
- rag = _build_rag(
- tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- wmf_path = parsed_dir / "image1.wmf"
- wmf_path.write_bytes(b"WMF\x00fake-content-bytes")
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
- encoding="utf-8",
- )
- sidecar_path = parsed_dir / "doc.drawings.json"
- sidecar_path.write_text(
- json.dumps(
- {
- "drawings": {
- "im-001": {
- "caption": "vector diagram",
- "path": str(wmf_path),
- "format": "wmf",
- }
- }
- }
- ),
- encoding="utf-8",
- )
- await rag.analyze_multimodal(
- doc_id="doc-1",
- file_path="fixture.docx",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="i",
- )
- assert call_log == []
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- result = payload["drawings"]["im-001"]["llm_analyze_result"]
- assert result["status"] == "skipped"
- assert "unsupported image format" in result["message"]
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_tiny_image_writes_skipped_without_vlm_call(tmp_path):
- """Image smaller than VLM_MIN_IMAGE_PIXEL (32px) is decorative; skip
- without calling VLM."""
- call_log: list[dict] = []
- rag = _build_rag(
- tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- img_path = parsed_dir / "tiny.png"
- img_path.write_bytes(TINY_PNG_BYTES)
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
- encoding="utf-8",
- )
- sidecar_path = parsed_dir / "doc.drawings.json"
- sidecar_path.write_text(
- json.dumps(
- {
- "drawings": {
- "im-001": {
- "caption": "tiny icon",
- "path": str(img_path),
- }
- }
- }
- ),
- encoding="utf-8",
- )
- await rag.analyze_multimodal(
- doc_id="doc-1",
- file_path="fixture.pdf",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="i",
- )
- assert call_log == []
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- result = payload["drawings"]["im-001"]["llm_analyze_result"]
- assert result["status"] == "skipped"
- assert "smaller than" in result["message"]
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_invalid_vlm_response_hard_fails(tmp_path):
- """Invalid model JSON propagates MultimodalAnalysisError and lands a
- status=failure marker on the sidecar so re-runs don't silently
- consume the failure."""
- call_log: list[dict] = []
- async def vlm_func(prompt, **kwargs):
- call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
- return "not-json"
- rag = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
- await rag.initialize_storages()
- try:
- doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
- with pytest.raises(MultimodalAnalysisError):
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert len(call_log) == 1
- await rag.llm_response_cache.index_done_callback()
- cache_file = (
- Path(rag.working_dir) / rag.workspace / "kv_store_llm_response_cache.json"
- )
- if cache_file.exists():
- cache_blob = json.loads(cache_file.read_text(encoding="utf-8"))
- analysis_keys = [
- k for k in cache_blob.keys() if k.startswith("default:analysis:")
- ]
- assert (
- analysis_keys == []
- ), f"invalid VLM response was cached: {analysis_keys}"
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- item = payload["drawings"]["im-001"]
- assert item["llm_analyze_result"]["status"] == "failure"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_table_routes_to_extract_role_not_vlm(tmp_path):
- """Per design §3.1 tables (and equations) must hit the EXTRACT role,
- not VLM. The mocks below assert exactly that."""
- extract_log: list[dict] = []
- vlm_log: list[dict] = []
- rag = _build_rag(
- tmp_path,
- vlm_process_enable=True,
- vlm_func=_make_vlm_mock(vlm_log),
- extract_func=_make_extract_mock(extract_log),
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
- encoding="utf-8",
- )
- tables_path = parsed_dir / "doc.tables.json"
- tables_path.write_text(
- json.dumps(
- {
- "tables": {
- "tb-001": {
- "caption": "Table 1",
- "content": "<table><tr><td>A</td></tr></table>",
- }
- }
- }
- ),
- encoding="utf-8",
- )
- await rag.analyze_multimodal(
- doc_id="doc-1",
- file_path="fixture.pdf",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="t",
- )
- assert vlm_log == []
- assert len(extract_log) == 1
- kwargs = extract_log[0]["kwargs"]
- assert kwargs.get("response_format") == {"type": "json_object"}
- payload = json.loads(tables_path.read_text(encoding="utf-8"))
- result = payload["tables"]["tb-001"]["llm_analyze_result"]
- assert result["status"] == "success"
- assert "default:analysis:" in payload["tables"]["tb-001"]["llm_cache_list"][0]
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_invalid_json_with_trailing_comma_is_repaired(tmp_path):
- """Slightly malformed VLM JSON (trailing comma) must be repaired via
- ``json_repair`` instead of hard-failing the document — mirrors the
- extraction-side repair contract in operate._process_json_extraction_result.
- """
- call_log: list[dict] = []
- async def vlm_func(prompt, **kwargs):
- call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
- # Trailing comma after "description" — strict json.loads would reject.
- return '{"name": "fig-1", "type": "Chart", ' '"description": "ok",}'
- rag = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
- await rag.initialize_storages()
- try:
- doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- assert len(call_log) == 1
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- result = payload["drawings"]["im-001"]["llm_analyze_result"]
- assert result["status"] == "success"
- assert result["name"] == "fig-1"
- assert result["type"] == "Chart"
- assert result["description"] == "ok"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_analyze_worker_marks_doc_failed_on_multimodal_error(tmp_path):
- """When analyze_multimodal raises MultimodalAnalysisError, the worker
- must upsert DocStatus.FAILED with a diagnostic error_msg instead of
- letting the document stay stuck in ANALYZING."""
- import asyncio
- from dataclasses import asdict
- from lightrag.base import DocProcessingStatus, DocStatus
- from lightrag.pipeline import _BatchRunContext
- async def vlm_func(prompt, **kwargs):
- return ""
- rag = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
- await rag.initialize_storages()
- try:
- doc_id = "doc-fail-1"
- file_path = "demo.pdf"
- status_doc = DocProcessingStatus(
- content_summary="",
- content_length=0,
- file_path=file_path,
- status=DocStatus.PENDING,
- created_at="2026-05-14T00:00:00Z",
- updated_at="2026-05-14T00:00:00Z",
- track_id="t",
- content_hash="h",
- )
- await rag.doc_status.upsert({doc_id: asdict(status_doc)})
- async def _raise_mm_error(**_kwargs):
- from lightrag.exceptions import MultimodalAnalysisError
- raise MultimodalAnalysisError("forced failure for test")
- # Patch instance method so the worker's call site picks the mock.
- rag.analyze_multimodal = _raise_mm_error # type: ignore[assignment]
- ctx = _BatchRunContext(
- pipeline_status={
- "latest_message": "",
- "history_messages": [],
- "cancellation_requested": False,
- },
- pipeline_status_lock=asyncio.Lock(),
- semaphore=asyncio.Semaphore(1),
- total_files=1,
- q_native=asyncio.Queue(),
- q_mineru=asyncio.Queue(),
- q_docling=asyncio.Queue(),
- q_analyze=asyncio.Queue(),
- q_process=asyncio.Queue(),
- )
- worker = asyncio.create_task(rag._analyze_worker(ctx))
- await ctx.q_analyze.put((doc_id, status_doc, {"content": "body"}))
- await ctx.q_analyze.join()
- worker.cancel()
- try:
- await worker
- except asyncio.CancelledError:
- pass
- refreshed = await rag.doc_status.get_by_id(doc_id)
- # doc_status backends return either a dict or a dataclass-style obj.
- if not isinstance(refreshed, dict):
- refreshed = asdict(refreshed)
- assert refreshed["status"] == DocStatus.FAILED
- assert "forced failure for test" in (refreshed.get("error_msg") or "")
- # The worker must NOT advance to q_process when the analyze step
- # raises — otherwise process_single_document would run on a
- # half-baked document.
- assert ctx.q_process.empty()
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_analysis_cache_respects_disabled_flag(tmp_path):
- """When enable_llm_cache_for_entity_extract is False, analyze_multimodal
- must NOT persist the analysis response and MUST NOT attach a cache id
- to sidecar item.llm_cache_list — otherwise document deletion would try
- to clean up cache entries that were never written."""
- call_log: list[dict] = []
- rag = LightRAG(
- working_dir=str(tmp_path),
- workspace=f"vlm-pipeline-cache-{tmp_path.name}",
- llm_model_func=_make_vlm_mock(call_log),
- embedding_func=EmbeddingFunc(
- embedding_dim=8,
- max_token_size=1024,
- func=_mock_embedding,
- ),
- tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
- vlm_process_enable=True,
- # Disable the analysis cache (same flag handle_cache uses for mode="default").
- enable_llm_cache_for_entity_extract=False,
- role_llm_configs={
- spec.name: (
- RoleLLMConfig(func=_make_vlm_mock(call_log))
- if spec.name == "vlm"
- else RoleLLMConfig()
- )
- for spec in ROLES
- },
- )
- await rag.initialize_storages()
- try:
- doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
- await rag.analyze_multimodal(
- doc_id=doc_id,
- file_path="fixture.pdf",
- parsed_data=parsed_data,
- process_options="i",
- )
- await rag.llm_response_cache.index_done_callback()
- cache_file = (
- Path(rag.working_dir) / rag.workspace / "kv_store_llm_response_cache.json"
- )
- if cache_file.exists():
- cache_blob = json.loads(cache_file.read_text(encoding="utf-8"))
- assert not any(
- k.startswith("default:analysis:") for k in cache_blob.keys()
- ), "analysis cache must not be written when the flag is off"
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- item = payload["drawings"]["im-001"]
- # Analysis still succeeded — only the cache side-effects are gated.
- assert item["llm_analyze_result"]["status"] == "success"
- # No cache id may be attached when nothing was written.
- assert item.get("llm_cache_list", []) == []
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_oversized_table_content_truncated_to_extract_budget(tmp_path):
- """A sidecar table whose ``content`` alone exceeds the EXTRACT input
- cap must be trimmed before reaching the LLM. The captured prompt
- must (a) fit within ``DEFAULT_MAX_EXTRACT_INPUT_TOKENS``, (b) still
- wrap the trimmed body in a ``<table>`` tag, and (c) include the
- truncation marker so the LLM is told the body is incomplete.
- """
- from lightrag.constants import DEFAULT_MAX_EXTRACT_INPUT_TOKENS
- extract_log: list[dict] = []
- rag = _build_rag(
- tmp_path,
- vlm_process_enable=False,
- extract_func=_make_extract_mock(extract_log),
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-big"}) + "\n",
- encoding="utf-8",
- )
- # Build a JSON-format table whose total token count is well above
- # DEFAULT_MAX_EXTRACT_INPUT_TOKENS so the trim path must engage.
- # Each row is small; the row count is the lever.
- rows = [[f"r{i}c0", f"r{i}c1"] for i in range(8000)]
- big_table = '<table id="tb-big" format="json">' + json.dumps(rows) + "</table>"
- original_tokens = len(rag.tokenizer.encode(big_table))
- assert original_tokens > DEFAULT_MAX_EXTRACT_INPUT_TOKENS
- tables_path = parsed_dir / "doc.tables.json"
- tables_path.write_text(
- json.dumps(
- {
- "tables": {
- "tb-big": {
- "caption": "huge table",
- "content": big_table,
- }
- }
- }
- ),
- encoding="utf-8",
- )
- await rag.analyze_multimodal(
- doc_id="doc-big",
- file_path="fixture.pdf",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="t",
- )
- assert len(extract_log) == 1
- sent_prompt = extract_log[0]["prompt"]
- prompt_tokens = len(rag.tokenizer.encode(sent_prompt))
- # Whole prompt fits within the EXTRACT input cap.
- assert prompt_tokens <= DEFAULT_MAX_EXTRACT_INPUT_TOKENS, (
- f"prompt of {prompt_tokens} tokens exceeds "
- f"{DEFAULT_MAX_EXTRACT_INPUT_TOKENS}"
- )
- # Truncation marker present, <table> tag still closed inside the
- # CONTENT section.
- assert (
- "<!-- content truncated from " in sent_prompt
- and "head preserved -->" in sent_prompt
- )
- # Head rows preserved, last rows dropped.
- assert "r0c0" in sent_prompt
- assert "r7999c0" not in sent_prompt
- # Analysis still succeeded — trimming is transparent to status.
- payload = json.loads(tables_path.read_text(encoding="utf-8"))
- assert payload["tables"]["tb-big"]["llm_analyze_result"]["status"] == "success"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_max_extract_input_tokens_env_var_lowers_cap_and_logs_warning(
- tmp_path, caplog, _propagate_lightrag_logger, monkeypatch
- ):
- """``MAX_EXTRACT_INPUT_TOKENS`` env var overrides the compile-time
- default, and any truncation emits a WARNING-level log line so
- operators see when sidecar bodies are being cut for the EXTRACT call.
- """
- # Cap well below the default 20480 so a modest table triggers trimming,
- # but still comfortably above the ~3980-char prompt template frame so
- # `content_budget` stays positive.
- monkeypatch.setenv("MAX_EXTRACT_INPUT_TOKENS", "8000")
- extract_log: list[dict] = []
- rag = _build_rag(
- tmp_path,
- vlm_process_enable=False,
- extract_func=_make_extract_mock(extract_log),
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-mid"}) + "\n",
- encoding="utf-8",
- )
- # Table sized comfortably between the env cap (8000) and the
- # compile-time default (20480) — would NOT trim under the default,
- # MUST trim under the env override.
- rows = [[f"r{i}c0", f"r{i}c1"] for i in range(800)]
- mid_table = '<table id="tb-mid" format="json">' + json.dumps(rows) + "</table>"
- original_tokens = len(rag.tokenizer.encode(mid_table))
- assert 8000 < original_tokens < 20480
- tables_path = parsed_dir / "doc.tables.json"
- tables_path.write_text(
- json.dumps(
- {
- "tables": {
- "tb-mid": {
- "caption": "mid table",
- "content": mid_table,
- }
- }
- }
- ),
- encoding="utf-8",
- )
- with caplog.at_level(logging.WARNING, logger="lightrag.pipeline"):
- await rag.analyze_multimodal(
- doc_id="doc-mid",
- file_path="fixture.pdf",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="t",
- )
- assert len(extract_log) == 1
- sent_prompt = extract_log[0]["prompt"]
- # Env cap honored.
- assert len(rag.tokenizer.encode(sent_prompt)) <= 8000
- assert "<!-- content truncated from " in sent_prompt
- # WARNING-level log line was emitted naming this item.
- warning_records = [
- r
- for r in caplog.records
- if r.levelname == "WARNING"
- and "[analyze_multimodal]" in r.getMessage()
- and "tb-mid" in r.getMessage()
- and "content trimmed" in r.getMessage()
- ]
- assert (
- warning_records
- ), "expected a WARNING-level log line announcing content truncation"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_extract_cap_below_prompt_frame_fails_item_without_llm_call(
- tmp_path, monkeypatch
- ):
- """When ``MAX_EXTRACT_INPUT_TOKENS`` sits below the prompt template's
- own ``frame_tokens + SAFETY_BUFFER``, ``content_budget`` goes
- non-positive — no content trim can bring the request under the cap
- because the frame itself overflows. The pipeline must refuse to
- invoke the LLM and fail this item via :class:`MultimodalAnalysisError`,
- so the sidecar records ``status="failure"`` and operators get a clear
- actionable signal (raise the cap). Guards the P1 regression where
- marker-replacement merely shrank ``content`` while leaving the
- over-cap frame intact."""
- # The table_analysis prompt frame is ~3980 chars; pick a cap small
- # enough that the frame alone overflows.
- monkeypatch.setenv("MAX_EXTRACT_INPUT_TOKENS", "1000")
- extract_log: list[dict] = []
- rag = _build_rag(
- tmp_path,
- vlm_process_enable=False,
- extract_func=_make_extract_mock(extract_log),
- )
- await rag.initialize_storages()
- try:
- parsed_dir = tmp_path / "parsed"
- parsed_dir.mkdir()
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "meta", "doc_id": "doc-tight"}) + "\n",
- encoding="utf-8",
- )
- tables_path = parsed_dir / "doc.tables.json"
- tables_path.write_text(
- json.dumps(
- {
- "tables": {
- "tb-tight": {
- "content": (
- '<table id="tb-tight" format="json">'
- '[["A","B"]]</table>'
- )
- }
- }
- }
- ),
- encoding="utf-8",
- )
- with pytest.raises(MultimodalAnalysisError) as excinfo:
- await rag.analyze_multimodal(
- doc_id="doc-tight",
- file_path="fixture.pdf",
- parsed_data={"blocks_path": str(blocks_path)},
- process_options="t",
- )
- # Critical: the LLM mock must NOT have been invoked — we refused
- # to send an over-cap prompt rather than letting the provider
- # reject it with context_length_exceeded.
- assert extract_log == [], (
- f"EXTRACT must not be called when frame exceeds cap; "
- f"got {len(extract_log)} call(s)"
- )
- msg = str(excinfo.value)
- assert "table/tb-tight" in msg
- assert "MAX_EXTRACT_INPUT_TOKENS" in msg
- assert "1000" in msg
- # Sidecar records status=failure for the item, so operators can
- # spot it and re-run after raising the cap.
- payload = json.loads(tables_path.read_text(encoding="utf-8"))
- item = payload["tables"]["tb-tight"]
- assert item["llm_analyze_result"]["status"] == "failure"
- assert "MAX_EXTRACT_INPUT_TOKENS" in item["llm_analyze_result"]["message"]
- finally:
- await rag.finalize_storages()
|