test_pipeline_analyze_multimodal.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984
  1. """End-to-end offline tests for the analyze_multimodal pipeline.
  2. Covers the contract introduced by the LR2 multimodal rewrite:
  3. 1. Drawings route to the VLM role; tables/equations route to the EXTRACT
  4. role. ``VLM_PROCESS_ENABLE=False`` (or a missing VLM role) is a hard
  5. failure for image-enabled documents.
  6. 2. ``llm_analyze_result`` uses the new ``status / message / analyze_time``
  7. shape with modality-specific required fields
  8. (``name/type/description`` for images, ``name/description`` for tables,
  9. ``name/equation/description`` for equations).
  10. 3. Each VLM/EXTRACT call carries ``_priority=DEFAULT_MM_ANALYSIS_PRIORITY``
  11. and ``image_inputs`` (not legacy ``messages``).
  12. 4. Analysis cache ids are written back to the sidecar item's
  13. ``llm_cache_list`` so document deletion can clean them up.
  14. 5. Images smaller than ``VLM_MIN_IMAGE_PIXEL`` (32 px) and unsupported
  15. formats are pre-emptively flagged ``status="skipped"`` without an LLM
  16. call.
  17. 6. Invalid model JSON is a hard failure — the sidecar carries
  18. ``status="failure"`` and :class:`MultimodalAnalysisError` propagates.
  19. """
  20. from __future__ import annotations
  21. import base64
  22. import json
  23. import logging
  24. import struct
  25. import zlib
  26. from pathlib import Path
  27. import numpy as np
  28. import pytest
  29. from lightrag import LightRAG, ROLES, RoleLLMConfig
  30. from lightrag.exceptions import MultimodalAnalysisError
  31. from lightrag.utils import EmbeddingFunc, Tokenizer
  32. @pytest.fixture
  33. def _propagate_lightrag_logger(monkeypatch):
  34. monkeypatch.setattr(logging.getLogger("lightrag"), "propagate", True)
  35. pytestmark = pytest.mark.offline
  36. def _png_bytes(width: int, height: int) -> bytes:
  37. """Build a minimal but parser-accepted PNG with the given dimensions."""
  38. signature = b"\x89PNG\r\n\x1a\n"
  39. ihdr_payload = struct.pack(">II", width, height) + b"\x08\x06\x00\x00\x00"
  40. ihdr_crc = zlib.crc32(b"IHDR" + ihdr_payload).to_bytes(4, "big")
  41. ihdr = struct.pack(">I", len(ihdr_payload)) + b"IHDR" + ihdr_payload + ihdr_crc
  42. idat_raw = b"\x00" * (width * height * 4 + height)
  43. idat_compressed = zlib.compress(idat_raw)
  44. idat_crc = zlib.crc32(b"IDAT" + idat_compressed).to_bytes(4, "big")
  45. idat = (
  46. struct.pack(">I", len(idat_compressed)) + b"IDAT" + idat_compressed + idat_crc
  47. )
  48. iend = b"\x00\x00\x00\x00IEND\xaeB`\x82"
  49. return signature + ihdr + idat + iend
  50. PNG_BYTES = _png_bytes(64, 64)
  51. TINY_PNG_BYTES = _png_bytes(8, 8)
  52. class _SimpleTokenizerImpl:
  53. def encode(self, content: str) -> list[int]:
  54. return [ord(ch) for ch in content]
  55. def decode(self, tokens: list[int]) -> str:
  56. return "".join(chr(t) for t in tokens)
  57. async def _mock_embedding(texts: list[str]) -> np.ndarray:
  58. return np.random.rand(len(texts), 8)
  59. def _make_vlm_mock(call_log: list[dict]):
  60. async def vlm_func(prompt, **kwargs):
  61. call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
  62. return json.dumps(
  63. {
  64. "name": "fig-1",
  65. "type": "Chart",
  66. "description": "concise figure description",
  67. }
  68. )
  69. return vlm_func
  70. def _make_extract_mock(call_log: list[dict]):
  71. async def extract_func(prompt, **kwargs):
  72. call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
  73. return json.dumps(
  74. {
  75. "name": "tbl-1",
  76. "description": "table summary",
  77. }
  78. )
  79. return extract_func
  80. def _build_rag(
  81. tmp_path: Path,
  82. *,
  83. vlm_process_enable: bool = True,
  84. vlm_func=None,
  85. extract_func=None,
  86. ) -> LightRAG:
  87. role_configs = {}
  88. for spec in ROLES:
  89. if spec.name == "vlm" and vlm_func is not None:
  90. role_configs[spec.name] = RoleLLMConfig(func=vlm_func)
  91. elif spec.name == "extract" and extract_func is not None:
  92. role_configs[spec.name] = RoleLLMConfig(func=extract_func)
  93. else:
  94. role_configs[spec.name] = RoleLLMConfig()
  95. base_func = vlm_func or extract_func
  96. return LightRAG(
  97. working_dir=str(tmp_path),
  98. workspace=f"vlm-pipeline-{tmp_path.name}",
  99. llm_model_func=base_func,
  100. embedding_func=EmbeddingFunc(
  101. embedding_dim=8,
  102. max_token_size=1024,
  103. func=_mock_embedding,
  104. ),
  105. tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
  106. vlm_process_enable=vlm_process_enable,
  107. role_llm_configs=role_configs,
  108. )
  109. def _write_sidecar_fixtures(tmp_path: Path) -> tuple[str, dict, Path]:
  110. parsed_dir = tmp_path / "parsed"
  111. parsed_dir.mkdir()
  112. image_path = parsed_dir / "fig1.png"
  113. image_path.write_bytes(PNG_BYTES)
  114. blocks_path = parsed_dir / "doc.blocks.jsonl"
  115. blocks_path.write_text(
  116. json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
  117. encoding="utf-8",
  118. )
  119. sidecar_path = parsed_dir / "doc.drawings.json"
  120. sidecar_path.write_text(
  121. json.dumps(
  122. {
  123. "drawings": {
  124. "im-001": {
  125. "caption": "Figure 1",
  126. "path": str(image_path),
  127. }
  128. }
  129. }
  130. ),
  131. encoding="utf-8",
  132. )
  133. parsed_data = {"blocks_path": str(blocks_path)}
  134. return "doc-1", parsed_data, sidecar_path
  135. @pytest.mark.asyncio
  136. async def test_vlm_process_enable_false_hard_fails_for_images(
  137. tmp_path, caplog, _propagate_lightrag_logger
  138. ):
  139. """With i opted-in but VLM disabled, analyze_multimodal must hard-fail
  140. the document rather than silently skipping."""
  141. call_log: list[dict] = []
  142. rag = _build_rag(
  143. tmp_path, vlm_process_enable=False, vlm_func=_make_vlm_mock(call_log)
  144. )
  145. await rag.initialize_storages()
  146. try:
  147. doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
  148. with pytest.raises(MultimodalAnalysisError):
  149. await rag.analyze_multimodal(
  150. doc_id=doc_id,
  151. file_path="fixture.pdf",
  152. parsed_data=parsed_data,
  153. process_options="i",
  154. )
  155. # VLM mock must not be invoked when the master switch is off.
  156. assert call_log == []
  157. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  158. item = payload["drawings"]["im-001"]
  159. assert item["llm_analyze_result"]["status"] == "failure"
  160. assert "VLM" in item["llm_analyze_result"]["message"]
  161. finally:
  162. await rag.finalize_storages()
  163. @pytest.mark.asyncio
  164. async def test_vlm_disabled_then_enabled_reprocesses_item(tmp_path):
  165. """After the first run hard-failed under VLM=disabled, flipping the
  166. switch must re-invoke the VLM and overwrite the persisted failure."""
  167. call_log: list[dict] = []
  168. vlm_func = _make_vlm_mock(call_log)
  169. rag_off = _build_rag(tmp_path, vlm_process_enable=False, vlm_func=vlm_func)
  170. await rag_off.initialize_storages()
  171. try:
  172. doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
  173. with pytest.raises(MultimodalAnalysisError):
  174. await rag_off.analyze_multimodal(
  175. doc_id=doc_id,
  176. file_path="fixture.pdf",
  177. parsed_data=parsed_data,
  178. process_options="i",
  179. )
  180. assert call_log == []
  181. finally:
  182. await rag_off.finalize_storages()
  183. rag_on = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
  184. await rag_on.initialize_storages()
  185. try:
  186. await rag_on.analyze_multimodal(
  187. doc_id=doc_id,
  188. file_path="fixture.pdf",
  189. parsed_data=parsed_data,
  190. process_options="i",
  191. )
  192. assert len(call_log) == 1
  193. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  194. result = payload["drawings"]["im-001"]["llm_analyze_result"]
  195. assert result["status"] == "success"
  196. assert result["type"] == "Chart"
  197. assert result["description"] == "concise figure description"
  198. assert "analyze_time" in result
  199. finally:
  200. await rag_on.finalize_storages()
  201. @pytest.mark.asyncio
  202. async def test_vlm_call_carries_image_inputs(tmp_path):
  203. """Sanity check the call kwargs: image_inputs (not legacy `messages`)
  204. must be present. The ``_priority`` argument is consumed by the role
  205. wrapper before reaching the raw model func, so it is not observable on
  206. the mock — see ``priority_limit_async_func_call`` in lightrag.utils."""
  207. call_log: list[dict] = []
  208. rag = _build_rag(
  209. tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
  210. )
  211. await rag.initialize_storages()
  212. try:
  213. doc_id, parsed_data, _ = _write_sidecar_fixtures(tmp_path)
  214. await rag.analyze_multimodal(
  215. doc_id=doc_id,
  216. file_path="fixture.pdf",
  217. parsed_data=parsed_data,
  218. process_options="i",
  219. )
  220. assert len(call_log) == 1
  221. kwargs = call_log[0]["kwargs"]
  222. assert kwargs.get("stream") is False
  223. assert kwargs.get("image_inputs") is not None
  224. assert "messages" not in kwargs
  225. # _priority is consumed by the wrapper (see lightrag.utils
  226. # priority_limit_async_func_call); not observable on the raw mock.
  227. assert "_priority" not in kwargs
  228. finally:
  229. await rag.finalize_storages()
  230. @pytest.mark.asyncio
  231. async def test_vlm_cache_hit_on_second_run(tmp_path):
  232. call_log: list[dict] = []
  233. rag = _build_rag(
  234. tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
  235. )
  236. await rag.initialize_storages()
  237. try:
  238. doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
  239. await rag.analyze_multimodal(
  240. doc_id=doc_id,
  241. file_path="fixture.pdf",
  242. parsed_data=parsed_data,
  243. process_options="i",
  244. )
  245. assert len(call_log) == 1
  246. await rag.llm_response_cache.index_done_callback()
  247. cache_file = (
  248. Path(rag.working_dir) / rag.workspace / "kv_store_llm_response_cache.json"
  249. )
  250. cache_blob = json.loads(cache_file.read_text(encoding="utf-8"))
  251. analysis_keys = [
  252. k for k in cache_blob.keys() if k.startswith("default:analysis:")
  253. ]
  254. assert len(analysis_keys) == 1
  255. cache_id = analysis_keys[0]
  256. entry = cache_blob[cache_id]
  257. original_prompt = entry["original_prompt"]
  258. assert "<vlm_images>" in original_prompt
  259. # Raw base64 must NOT be embedded in the audit block.
  260. raw_b64 = base64.b64encode(PNG_BYTES).decode("ascii")
  261. assert raw_b64 not in original_prompt
  262. # Cache id was written back to the sidecar item for delete-time cleanup.
  263. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  264. assert cache_id in payload["drawings"]["im-001"]["llm_cache_list"]
  265. # Re-run: analyze_multimodal always recomputes for enabled modalities,
  266. # but the cache key matches so the VLM is not called again.
  267. await rag.analyze_multimodal(
  268. doc_id=doc_id,
  269. file_path="fixture.pdf",
  270. parsed_data=parsed_data,
  271. process_options="i",
  272. )
  273. assert len(call_log) == 1
  274. finally:
  275. await rag.finalize_storages()
  276. @pytest.mark.asyncio
  277. async def test_image_path_resolved_relative_to_sidecar_dir(tmp_path):
  278. """Sidecar paths are parsed_dir-relative; the pipeline must resolve
  279. them against the sidecar directory before falling back to skipped."""
  280. call_log: list[dict] = []
  281. rag = _build_rag(
  282. tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
  283. )
  284. await rag.initialize_storages()
  285. try:
  286. parsed_dir = tmp_path / "parsed"
  287. parsed_dir.mkdir()
  288. assets_dir = parsed_dir / "doc.blocks.assets"
  289. assets_dir.mkdir()
  290. (assets_dir / "image1.png").write_bytes(PNG_BYTES)
  291. blocks_path = parsed_dir / "doc.blocks.jsonl"
  292. blocks_path.write_text(
  293. json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
  294. encoding="utf-8",
  295. )
  296. sidecar_path = parsed_dir / "doc.drawings.json"
  297. sidecar_path.write_text(
  298. json.dumps(
  299. {
  300. "drawings": {
  301. "im-001": {
  302. "caption": "Figure 1",
  303. "path": "doc.blocks.assets/image1.png",
  304. }
  305. }
  306. }
  307. ),
  308. encoding="utf-8",
  309. )
  310. await rag.analyze_multimodal(
  311. doc_id="doc-1",
  312. file_path="fixture.pdf",
  313. parsed_data={"blocks_path": str(blocks_path)},
  314. process_options="i",
  315. )
  316. assert len(call_log) == 1
  317. assert call_log[0]["kwargs"].get("image_inputs") is not None
  318. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  319. result = payload["drawings"]["im-001"]["llm_analyze_result"]
  320. assert result["status"] == "success"
  321. finally:
  322. await rag.finalize_storages()
  323. @pytest.mark.asyncio
  324. async def test_unsupported_vector_format_writes_skipped(tmp_path):
  325. """WMF/EMF/SVG and other non-raster formats short-circuit to
  326. status=skipped without calling the VLM."""
  327. call_log: list[dict] = []
  328. rag = _build_rag(
  329. tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
  330. )
  331. await rag.initialize_storages()
  332. try:
  333. parsed_dir = tmp_path / "parsed"
  334. parsed_dir.mkdir()
  335. wmf_path = parsed_dir / "image1.wmf"
  336. wmf_path.write_bytes(b"WMF\x00fake-content-bytes")
  337. blocks_path = parsed_dir / "doc.blocks.jsonl"
  338. blocks_path.write_text(
  339. json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
  340. encoding="utf-8",
  341. )
  342. sidecar_path = parsed_dir / "doc.drawings.json"
  343. sidecar_path.write_text(
  344. json.dumps(
  345. {
  346. "drawings": {
  347. "im-001": {
  348. "caption": "vector diagram",
  349. "path": str(wmf_path),
  350. "format": "wmf",
  351. }
  352. }
  353. }
  354. ),
  355. encoding="utf-8",
  356. )
  357. await rag.analyze_multimodal(
  358. doc_id="doc-1",
  359. file_path="fixture.docx",
  360. parsed_data={"blocks_path": str(blocks_path)},
  361. process_options="i",
  362. )
  363. assert call_log == []
  364. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  365. result = payload["drawings"]["im-001"]["llm_analyze_result"]
  366. assert result["status"] == "skipped"
  367. assert "unsupported image format" in result["message"]
  368. finally:
  369. await rag.finalize_storages()
  370. @pytest.mark.asyncio
  371. async def test_tiny_image_writes_skipped_without_vlm_call(tmp_path):
  372. """Image smaller than VLM_MIN_IMAGE_PIXEL (32px) is decorative; skip
  373. without calling VLM."""
  374. call_log: list[dict] = []
  375. rag = _build_rag(
  376. tmp_path, vlm_process_enable=True, vlm_func=_make_vlm_mock(call_log)
  377. )
  378. await rag.initialize_storages()
  379. try:
  380. parsed_dir = tmp_path / "parsed"
  381. parsed_dir.mkdir()
  382. img_path = parsed_dir / "tiny.png"
  383. img_path.write_bytes(TINY_PNG_BYTES)
  384. blocks_path = parsed_dir / "doc.blocks.jsonl"
  385. blocks_path.write_text(
  386. json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
  387. encoding="utf-8",
  388. )
  389. sidecar_path = parsed_dir / "doc.drawings.json"
  390. sidecar_path.write_text(
  391. json.dumps(
  392. {
  393. "drawings": {
  394. "im-001": {
  395. "caption": "tiny icon",
  396. "path": str(img_path),
  397. }
  398. }
  399. }
  400. ),
  401. encoding="utf-8",
  402. )
  403. await rag.analyze_multimodal(
  404. doc_id="doc-1",
  405. file_path="fixture.pdf",
  406. parsed_data={"blocks_path": str(blocks_path)},
  407. process_options="i",
  408. )
  409. assert call_log == []
  410. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  411. result = payload["drawings"]["im-001"]["llm_analyze_result"]
  412. assert result["status"] == "skipped"
  413. assert "smaller than" in result["message"]
  414. finally:
  415. await rag.finalize_storages()
  416. @pytest.mark.asyncio
  417. async def test_invalid_vlm_response_hard_fails(tmp_path):
  418. """Invalid model JSON propagates MultimodalAnalysisError and lands a
  419. status=failure marker on the sidecar so re-runs don't silently
  420. consume the failure."""
  421. call_log: list[dict] = []
  422. async def vlm_func(prompt, **kwargs):
  423. call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
  424. return "not-json"
  425. rag = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
  426. await rag.initialize_storages()
  427. try:
  428. doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
  429. with pytest.raises(MultimodalAnalysisError):
  430. await rag.analyze_multimodal(
  431. doc_id=doc_id,
  432. file_path="fixture.pdf",
  433. parsed_data=parsed_data,
  434. process_options="i",
  435. )
  436. assert len(call_log) == 1
  437. await rag.llm_response_cache.index_done_callback()
  438. cache_file = (
  439. Path(rag.working_dir) / rag.workspace / "kv_store_llm_response_cache.json"
  440. )
  441. if cache_file.exists():
  442. cache_blob = json.loads(cache_file.read_text(encoding="utf-8"))
  443. analysis_keys = [
  444. k for k in cache_blob.keys() if k.startswith("default:analysis:")
  445. ]
  446. assert (
  447. analysis_keys == []
  448. ), f"invalid VLM response was cached: {analysis_keys}"
  449. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  450. item = payload["drawings"]["im-001"]
  451. assert item["llm_analyze_result"]["status"] == "failure"
  452. finally:
  453. await rag.finalize_storages()
  454. @pytest.mark.asyncio
  455. async def test_table_routes_to_extract_role_not_vlm(tmp_path):
  456. """Per design §3.1 tables (and equations) must hit the EXTRACT role,
  457. not VLM. The mocks below assert exactly that."""
  458. extract_log: list[dict] = []
  459. vlm_log: list[dict] = []
  460. rag = _build_rag(
  461. tmp_path,
  462. vlm_process_enable=True,
  463. vlm_func=_make_vlm_mock(vlm_log),
  464. extract_func=_make_extract_mock(extract_log),
  465. )
  466. await rag.initialize_storages()
  467. try:
  468. parsed_dir = tmp_path / "parsed"
  469. parsed_dir.mkdir()
  470. blocks_path = parsed_dir / "doc.blocks.jsonl"
  471. blocks_path.write_text(
  472. json.dumps({"type": "meta", "doc_id": "doc-1"}) + "\n",
  473. encoding="utf-8",
  474. )
  475. tables_path = parsed_dir / "doc.tables.json"
  476. tables_path.write_text(
  477. json.dumps(
  478. {
  479. "tables": {
  480. "tb-001": {
  481. "caption": "Table 1",
  482. "content": "<table><tr><td>A</td></tr></table>",
  483. }
  484. }
  485. }
  486. ),
  487. encoding="utf-8",
  488. )
  489. await rag.analyze_multimodal(
  490. doc_id="doc-1",
  491. file_path="fixture.pdf",
  492. parsed_data={"blocks_path": str(blocks_path)},
  493. process_options="t",
  494. )
  495. assert vlm_log == []
  496. assert len(extract_log) == 1
  497. kwargs = extract_log[0]["kwargs"]
  498. assert kwargs.get("response_format") == {"type": "json_object"}
  499. payload = json.loads(tables_path.read_text(encoding="utf-8"))
  500. result = payload["tables"]["tb-001"]["llm_analyze_result"]
  501. assert result["status"] == "success"
  502. assert "default:analysis:" in payload["tables"]["tb-001"]["llm_cache_list"][0]
  503. finally:
  504. await rag.finalize_storages()
  505. @pytest.mark.asyncio
  506. async def test_invalid_json_with_trailing_comma_is_repaired(tmp_path):
  507. """Slightly malformed VLM JSON (trailing comma) must be repaired via
  508. ``json_repair`` instead of hard-failing the document — mirrors the
  509. extraction-side repair contract in operate._process_json_extraction_result.
  510. """
  511. call_log: list[dict] = []
  512. async def vlm_func(prompt, **kwargs):
  513. call_log.append({"prompt": prompt, "kwargs": dict(kwargs)})
  514. # Trailing comma after "description" — strict json.loads would reject.
  515. return '{"name": "fig-1", "type": "Chart", ' '"description": "ok",}'
  516. rag = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
  517. await rag.initialize_storages()
  518. try:
  519. doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
  520. await rag.analyze_multimodal(
  521. doc_id=doc_id,
  522. file_path="fixture.pdf",
  523. parsed_data=parsed_data,
  524. process_options="i",
  525. )
  526. assert len(call_log) == 1
  527. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  528. result = payload["drawings"]["im-001"]["llm_analyze_result"]
  529. assert result["status"] == "success"
  530. assert result["name"] == "fig-1"
  531. assert result["type"] == "Chart"
  532. assert result["description"] == "ok"
  533. finally:
  534. await rag.finalize_storages()
  535. @pytest.mark.asyncio
  536. async def test_analyze_worker_marks_doc_failed_on_multimodal_error(tmp_path):
  537. """When analyze_multimodal raises MultimodalAnalysisError, the worker
  538. must upsert DocStatus.FAILED with a diagnostic error_msg instead of
  539. letting the document stay stuck in ANALYZING."""
  540. import asyncio
  541. from dataclasses import asdict
  542. from lightrag.base import DocProcessingStatus, DocStatus
  543. from lightrag.pipeline import _BatchRunContext
  544. async def vlm_func(prompt, **kwargs):
  545. return ""
  546. rag = _build_rag(tmp_path, vlm_process_enable=True, vlm_func=vlm_func)
  547. await rag.initialize_storages()
  548. try:
  549. doc_id = "doc-fail-1"
  550. file_path = "demo.pdf"
  551. status_doc = DocProcessingStatus(
  552. content_summary="",
  553. content_length=0,
  554. file_path=file_path,
  555. status=DocStatus.PENDING,
  556. created_at="2026-05-14T00:00:00Z",
  557. updated_at="2026-05-14T00:00:00Z",
  558. track_id="t",
  559. content_hash="h",
  560. )
  561. await rag.doc_status.upsert({doc_id: asdict(status_doc)})
  562. async def _raise_mm_error(**_kwargs):
  563. from lightrag.exceptions import MultimodalAnalysisError
  564. raise MultimodalAnalysisError("forced failure for test")
  565. # Patch instance method so the worker's call site picks the mock.
  566. rag.analyze_multimodal = _raise_mm_error # type: ignore[assignment]
  567. ctx = _BatchRunContext(
  568. pipeline_status={
  569. "latest_message": "",
  570. "history_messages": [],
  571. "cancellation_requested": False,
  572. },
  573. pipeline_status_lock=asyncio.Lock(),
  574. semaphore=asyncio.Semaphore(1),
  575. total_files=1,
  576. q_native=asyncio.Queue(),
  577. q_mineru=asyncio.Queue(),
  578. q_docling=asyncio.Queue(),
  579. q_analyze=asyncio.Queue(),
  580. q_process=asyncio.Queue(),
  581. )
  582. worker = asyncio.create_task(rag._analyze_worker(ctx))
  583. await ctx.q_analyze.put((doc_id, status_doc, {"content": "body"}))
  584. await ctx.q_analyze.join()
  585. worker.cancel()
  586. try:
  587. await worker
  588. except asyncio.CancelledError:
  589. pass
  590. refreshed = await rag.doc_status.get_by_id(doc_id)
  591. # doc_status backends return either a dict or a dataclass-style obj.
  592. if not isinstance(refreshed, dict):
  593. refreshed = asdict(refreshed)
  594. assert refreshed["status"] == DocStatus.FAILED
  595. assert "forced failure for test" in (refreshed.get("error_msg") or "")
  596. # The worker must NOT advance to q_process when the analyze step
  597. # raises — otherwise process_single_document would run on a
  598. # half-baked document.
  599. assert ctx.q_process.empty()
  600. finally:
  601. await rag.finalize_storages()
  602. @pytest.mark.asyncio
  603. async def test_analysis_cache_respects_disabled_flag(tmp_path):
  604. """When enable_llm_cache_for_entity_extract is False, analyze_multimodal
  605. must NOT persist the analysis response and MUST NOT attach a cache id
  606. to sidecar item.llm_cache_list — otherwise document deletion would try
  607. to clean up cache entries that were never written."""
  608. call_log: list[dict] = []
  609. rag = LightRAG(
  610. working_dir=str(tmp_path),
  611. workspace=f"vlm-pipeline-cache-{tmp_path.name}",
  612. llm_model_func=_make_vlm_mock(call_log),
  613. embedding_func=EmbeddingFunc(
  614. embedding_dim=8,
  615. max_token_size=1024,
  616. func=_mock_embedding,
  617. ),
  618. tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
  619. vlm_process_enable=True,
  620. # Disable the analysis cache (same flag handle_cache uses for mode="default").
  621. enable_llm_cache_for_entity_extract=False,
  622. role_llm_configs={
  623. spec.name: (
  624. RoleLLMConfig(func=_make_vlm_mock(call_log))
  625. if spec.name == "vlm"
  626. else RoleLLMConfig()
  627. )
  628. for spec in ROLES
  629. },
  630. )
  631. await rag.initialize_storages()
  632. try:
  633. doc_id, parsed_data, sidecar_path = _write_sidecar_fixtures(tmp_path)
  634. await rag.analyze_multimodal(
  635. doc_id=doc_id,
  636. file_path="fixture.pdf",
  637. parsed_data=parsed_data,
  638. process_options="i",
  639. )
  640. await rag.llm_response_cache.index_done_callback()
  641. cache_file = (
  642. Path(rag.working_dir) / rag.workspace / "kv_store_llm_response_cache.json"
  643. )
  644. if cache_file.exists():
  645. cache_blob = json.loads(cache_file.read_text(encoding="utf-8"))
  646. assert not any(
  647. k.startswith("default:analysis:") for k in cache_blob.keys()
  648. ), "analysis cache must not be written when the flag is off"
  649. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  650. item = payload["drawings"]["im-001"]
  651. # Analysis still succeeded — only the cache side-effects are gated.
  652. assert item["llm_analyze_result"]["status"] == "success"
  653. # No cache id may be attached when nothing was written.
  654. assert item.get("llm_cache_list", []) == []
  655. finally:
  656. await rag.finalize_storages()
  657. @pytest.mark.asyncio
  658. async def test_oversized_table_content_truncated_to_extract_budget(tmp_path):
  659. """A sidecar table whose ``content`` alone exceeds the EXTRACT input
  660. cap must be trimmed before reaching the LLM. The captured prompt
  661. must (a) fit within ``DEFAULT_MAX_EXTRACT_INPUT_TOKENS``, (b) still
  662. wrap the trimmed body in a ``<table>`` tag, and (c) include the
  663. truncation marker so the LLM is told the body is incomplete.
  664. """
  665. from lightrag.constants import DEFAULT_MAX_EXTRACT_INPUT_TOKENS
  666. extract_log: list[dict] = []
  667. rag = _build_rag(
  668. tmp_path,
  669. vlm_process_enable=False,
  670. extract_func=_make_extract_mock(extract_log),
  671. )
  672. await rag.initialize_storages()
  673. try:
  674. parsed_dir = tmp_path / "parsed"
  675. parsed_dir.mkdir()
  676. blocks_path = parsed_dir / "doc.blocks.jsonl"
  677. blocks_path.write_text(
  678. json.dumps({"type": "meta", "doc_id": "doc-big"}) + "\n",
  679. encoding="utf-8",
  680. )
  681. # Build a JSON-format table whose total token count is well above
  682. # DEFAULT_MAX_EXTRACT_INPUT_TOKENS so the trim path must engage.
  683. # Each row is small; the row count is the lever.
  684. rows = [[f"r{i}c0", f"r{i}c1"] for i in range(8000)]
  685. big_table = '<table id="tb-big" format="json">' + json.dumps(rows) + "</table>"
  686. original_tokens = len(rag.tokenizer.encode(big_table))
  687. assert original_tokens > DEFAULT_MAX_EXTRACT_INPUT_TOKENS
  688. tables_path = parsed_dir / "doc.tables.json"
  689. tables_path.write_text(
  690. json.dumps(
  691. {
  692. "tables": {
  693. "tb-big": {
  694. "caption": "huge table",
  695. "content": big_table,
  696. }
  697. }
  698. }
  699. ),
  700. encoding="utf-8",
  701. )
  702. await rag.analyze_multimodal(
  703. doc_id="doc-big",
  704. file_path="fixture.pdf",
  705. parsed_data={"blocks_path": str(blocks_path)},
  706. process_options="t",
  707. )
  708. assert len(extract_log) == 1
  709. sent_prompt = extract_log[0]["prompt"]
  710. prompt_tokens = len(rag.tokenizer.encode(sent_prompt))
  711. # Whole prompt fits within the EXTRACT input cap.
  712. assert prompt_tokens <= DEFAULT_MAX_EXTRACT_INPUT_TOKENS, (
  713. f"prompt of {prompt_tokens} tokens exceeds "
  714. f"{DEFAULT_MAX_EXTRACT_INPUT_TOKENS}"
  715. )
  716. # Truncation marker present, <table> tag still closed inside the
  717. # CONTENT section.
  718. assert (
  719. "<!-- content truncated from " in sent_prompt
  720. and "head preserved -->" in sent_prompt
  721. )
  722. # Head rows preserved, last rows dropped.
  723. assert "r0c0" in sent_prompt
  724. assert "r7999c0" not in sent_prompt
  725. # Analysis still succeeded — trimming is transparent to status.
  726. payload = json.loads(tables_path.read_text(encoding="utf-8"))
  727. assert payload["tables"]["tb-big"]["llm_analyze_result"]["status"] == "success"
  728. finally:
  729. await rag.finalize_storages()
  730. @pytest.mark.asyncio
  731. async def test_max_extract_input_tokens_env_var_lowers_cap_and_logs_warning(
  732. tmp_path, caplog, _propagate_lightrag_logger, monkeypatch
  733. ):
  734. """``MAX_EXTRACT_INPUT_TOKENS`` env var overrides the compile-time
  735. default, and any truncation emits a WARNING-level log line so
  736. operators see when sidecar bodies are being cut for the EXTRACT call.
  737. """
  738. # Cap well below the default 20480 so a modest table triggers trimming,
  739. # but still comfortably above the ~3980-char prompt template frame so
  740. # `content_budget` stays positive.
  741. monkeypatch.setenv("MAX_EXTRACT_INPUT_TOKENS", "8000")
  742. extract_log: list[dict] = []
  743. rag = _build_rag(
  744. tmp_path,
  745. vlm_process_enable=False,
  746. extract_func=_make_extract_mock(extract_log),
  747. )
  748. await rag.initialize_storages()
  749. try:
  750. parsed_dir = tmp_path / "parsed"
  751. parsed_dir.mkdir()
  752. blocks_path = parsed_dir / "doc.blocks.jsonl"
  753. blocks_path.write_text(
  754. json.dumps({"type": "meta", "doc_id": "doc-mid"}) + "\n",
  755. encoding="utf-8",
  756. )
  757. # Table sized comfortably between the env cap (8000) and the
  758. # compile-time default (20480) — would NOT trim under the default,
  759. # MUST trim under the env override.
  760. rows = [[f"r{i}c0", f"r{i}c1"] for i in range(800)]
  761. mid_table = '<table id="tb-mid" format="json">' + json.dumps(rows) + "</table>"
  762. original_tokens = len(rag.tokenizer.encode(mid_table))
  763. assert 8000 < original_tokens < 20480
  764. tables_path = parsed_dir / "doc.tables.json"
  765. tables_path.write_text(
  766. json.dumps(
  767. {
  768. "tables": {
  769. "tb-mid": {
  770. "caption": "mid table",
  771. "content": mid_table,
  772. }
  773. }
  774. }
  775. ),
  776. encoding="utf-8",
  777. )
  778. with caplog.at_level(logging.WARNING, logger="lightrag.pipeline"):
  779. await rag.analyze_multimodal(
  780. doc_id="doc-mid",
  781. file_path="fixture.pdf",
  782. parsed_data={"blocks_path": str(blocks_path)},
  783. process_options="t",
  784. )
  785. assert len(extract_log) == 1
  786. sent_prompt = extract_log[0]["prompt"]
  787. # Env cap honored.
  788. assert len(rag.tokenizer.encode(sent_prompt)) <= 8000
  789. assert "<!-- content truncated from " in sent_prompt
  790. # WARNING-level log line was emitted naming this item.
  791. warning_records = [
  792. r
  793. for r in caplog.records
  794. if r.levelname == "WARNING"
  795. and "[analyze_multimodal]" in r.getMessage()
  796. and "tb-mid" in r.getMessage()
  797. and "content trimmed" in r.getMessage()
  798. ]
  799. assert (
  800. warning_records
  801. ), "expected a WARNING-level log line announcing content truncation"
  802. finally:
  803. await rag.finalize_storages()
  804. @pytest.mark.asyncio
  805. async def test_extract_cap_below_prompt_frame_fails_item_without_llm_call(
  806. tmp_path, monkeypatch
  807. ):
  808. """When ``MAX_EXTRACT_INPUT_TOKENS`` sits below the prompt template's
  809. own ``frame_tokens + SAFETY_BUFFER``, ``content_budget`` goes
  810. non-positive — no content trim can bring the request under the cap
  811. because the frame itself overflows. The pipeline must refuse to
  812. invoke the LLM and fail this item via :class:`MultimodalAnalysisError`,
  813. so the sidecar records ``status="failure"`` and operators get a clear
  814. actionable signal (raise the cap). Guards the P1 regression where
  815. marker-replacement merely shrank ``content`` while leaving the
  816. over-cap frame intact."""
  817. # The table_analysis prompt frame is ~3980 chars; pick a cap small
  818. # enough that the frame alone overflows.
  819. monkeypatch.setenv("MAX_EXTRACT_INPUT_TOKENS", "1000")
  820. extract_log: list[dict] = []
  821. rag = _build_rag(
  822. tmp_path,
  823. vlm_process_enable=False,
  824. extract_func=_make_extract_mock(extract_log),
  825. )
  826. await rag.initialize_storages()
  827. try:
  828. parsed_dir = tmp_path / "parsed"
  829. parsed_dir.mkdir()
  830. blocks_path = parsed_dir / "doc.blocks.jsonl"
  831. blocks_path.write_text(
  832. json.dumps({"type": "meta", "doc_id": "doc-tight"}) + "\n",
  833. encoding="utf-8",
  834. )
  835. tables_path = parsed_dir / "doc.tables.json"
  836. tables_path.write_text(
  837. json.dumps(
  838. {
  839. "tables": {
  840. "tb-tight": {
  841. "content": (
  842. '<table id="tb-tight" format="json">'
  843. '[["A","B"]]</table>'
  844. )
  845. }
  846. }
  847. }
  848. ),
  849. encoding="utf-8",
  850. )
  851. with pytest.raises(MultimodalAnalysisError) as excinfo:
  852. await rag.analyze_multimodal(
  853. doc_id="doc-tight",
  854. file_path="fixture.pdf",
  855. parsed_data={"blocks_path": str(blocks_path)},
  856. process_options="t",
  857. )
  858. # Critical: the LLM mock must NOT have been invoked — we refused
  859. # to send an over-cap prompt rather than letting the provider
  860. # reject it with context_length_exceeded.
  861. assert extract_log == [], (
  862. f"EXTRACT must not be called when frame exceeds cap; "
  863. f"got {len(extract_log)} call(s)"
  864. )
  865. msg = str(excinfo.value)
  866. assert "table/tb-tight" in msg
  867. assert "MAX_EXTRACT_INPUT_TOKENS" in msg
  868. assert "1000" in msg
  869. # Sidecar records status=failure for the item, so operators can
  870. # spot it and re-run after raising the cap.
  871. payload = json.loads(tables_path.read_text(encoding="utf-8"))
  872. item = payload["tables"]["tb-tight"]
  873. assert item["llm_analyze_result"]["status"] == "failure"
  874. assert "MAX_EXTRACT_INPUT_TOKENS" in item["llm_analyze_result"]["message"]
  875. finally:
  876. await rag.finalize_storages()