test_pipeline_release_closure.py 133 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528
  1. import asyncio
  2. import json
  3. from pathlib import Path
  4. import numpy as np
  5. import pytest
  6. from lightrag import LightRAG, ROLES, RoleLLMConfig
  7. from lightrag.base import DocStatus
  8. from lightrag.constants import (
  9. FULL_DOCS_FORMAT_PENDING_PARSE,
  10. PARSED_DIR_NAME,
  11. PARSER_ENGINE_MINERU,
  12. PARSER_ENGINE_NATIVE,
  13. )
  14. from lightrag.operate import (
  15. _get_relationship_vdb_timeout_seconds,
  16. _parse_mm_display_name,
  17. )
  18. from lightrag.parser.routing import (
  19. FilenameParserHintError,
  20. ParserRoutingConfigError,
  21. canonicalize_parser_hinted_basename,
  22. resolve_file_parser_engine,
  23. resolve_stored_document_parser_engine,
  24. validate_parser_routing_config,
  25. )
  26. from lightrag.utils import (
  27. EmbeddingFunc,
  28. Tokenizer,
  29. compute_mdhash_id,
  30. safe_vdb_operation_with_exception,
  31. )
  32. class _SimpleTokenizerImpl:
  33. def encode(self, content: str) -> list[int]:
  34. return [ord(ch) for ch in content]
  35. def decode(self, tokens: list[int]) -> str:
  36. return "".join(chr(t) for t in tokens)
  37. async def _mock_embedding(texts: list[str]) -> np.ndarray:
  38. return np.random.rand(len(texts), 32)
  39. async def _mock_llm(prompt, **kwargs):
  40. return '{"name":"x","summary":"s","detail_description":"d"}'
  41. _ROLE_FIELD_SUFFIXES = (
  42. ("_llm_model_func", "func"),
  43. ("_llm_model_kwargs", "kwargs"),
  44. ("_llm_model_max_async", "max_async"),
  45. ("_llm_timeout", "timeout"),
  46. )
  47. def _new_rag(tmp_path: Path, **kwargs) -> LightRAG:
  48. role_configs: dict[str, RoleLLMConfig] = {}
  49. for spec in ROLES:
  50. bucket = {}
  51. for suffix, target in _ROLE_FIELD_SUFFIXES:
  52. key = f"{spec.name}{suffix}"
  53. if key in kwargs:
  54. bucket[target] = kwargs.pop(key)
  55. if bucket:
  56. role_configs[spec.name] = RoleLLMConfig(**bucket)
  57. if role_configs:
  58. kwargs["role_llm_configs"] = role_configs
  59. # analyze_multimodal short-circuits when vlm_process_enable is False; this
  60. # helper drives several VLM-specific tests, so default the switch ON.
  61. kwargs.setdefault("vlm_process_enable", True)
  62. return LightRAG(
  63. working_dir=str(tmp_path),
  64. workspace=f"test-release-closure-{tmp_path.name}",
  65. llm_model_func=_mock_llm,
  66. embedding_func=EmbeddingFunc(
  67. embedding_dim=32,
  68. max_token_size=4096,
  69. func=_mock_embedding,
  70. ),
  71. tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
  72. **kwargs,
  73. )
  74. @pytest.mark.offline
  75. def test_parse_engine_routing_by_filename_and_env(monkeypatch):
  76. monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
  77. assert (
  78. resolve_stored_document_parser_engine("a.[docling-iet].docx", {}) == "docling"
  79. )
  80. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  81. monkeypatch.setenv("LIGHTRAG_PARSER", "pdf:mineru-iet,*:native")
  82. assert resolve_stored_document_parser_engine("paper.pdf", {}) == "mineru"
  83. assert (
  84. resolve_stored_document_parser_engine("paper.pdf", {"parse_engine": "native"})
  85. == "legacy"
  86. )
  87. @pytest.mark.offline
  88. def test_parse_engine_rule_fallback_and_default_legacy(monkeypatch):
  89. monkeypatch.setenv("LIGHTRAG_PARSER", "pdf:native,*:legacy")
  90. assert resolve_stored_document_parser_engine("paper.pdf", {}) == "legacy"
  91. monkeypatch.setenv("LIGHTRAG_PARSER", "pptx:docling,*:legacy")
  92. monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
  93. assert resolve_stored_document_parser_engine("slides.pptx", {}) == "legacy"
  94. monkeypatch.delenv("LIGHTRAG_PARSER", raising=False)
  95. monkeypatch.delenv("MINERU_LOCAL_ENDPOINT", raising=False)
  96. assert resolve_stored_document_parser_engine("slides.pptx", {}) == "legacy"
  97. @pytest.mark.offline
  98. def test_canonicalize_parser_hinted_basename():
  99. assert canonicalize_parser_hinted_basename("abc.[native].docx") == "abc.docx"
  100. assert canonicalize_parser_hinted_basename("/tmp/a.b.[mineru].pdf") == "a.b.pdf"
  101. assert canonicalize_parser_hinted_basename("abc.[draft].docx") == "abc.[draft].docx"
  102. # Engine token is case-insensitive (normalize_parser_engine lower-cases).
  103. assert canonicalize_parser_hinted_basename("abc.[NATIVE].docx") == "abc.docx"
  104. # Engine sub-variants like "mineru-iet" normalize to a base engine.
  105. assert canonicalize_parser_hinted_basename("abc.[mineru-iet].pdf") == "abc.pdf"
  106. # No extension after the bracket: pattern requires ``.[engine].ext``.
  107. assert canonicalize_parser_hinted_basename("abc.[native]") == "abc.[native]"
  108. # Plain basename without any hint is returned unchanged.
  109. assert canonicalize_parser_hinted_basename("abc.docx") == "abc.docx"
  110. # Bracket without a leading dot is not a hint.
  111. assert canonicalize_parser_hinted_basename("[native].docx") == "[native].docx"
  112. # Nested hints: only the outermost segment is stripped.
  113. assert (
  114. canonicalize_parser_hinted_basename("name.[native].[mineru].pdf")
  115. == "name.[native].pdf"
  116. )
  117. # New options-only and engine+options forms strip cleanly too.
  118. assert canonicalize_parser_hinted_basename("foo.[-!].docx") == "foo.docx"
  119. assert canonicalize_parser_hinted_basename("foo.[native-iet].docx") == "foo.docx"
  120. assert canonicalize_parser_hinted_basename("foo.[mineru-R!].pdf") == "foo.pdf"
  121. # Options without the leading hyphen and unknown parsers are left alone.
  122. assert canonicalize_parser_hinted_basename("foo.[!].docx") == "foo.[!].docx"
  123. assert (
  124. canonicalize_parser_hinted_basename("foo.[native-].docx")
  125. == "foo.[native-].docx"
  126. )
  127. assert canonicalize_parser_hinted_basename("foo.[xyz].docx") == "foo.[xyz].docx"
  128. @pytest.mark.offline
  129. def test_filename_parser_directives_decodes_engine_and_options():
  130. from lightrag.parser.routing import filename_parser_directives
  131. assert filename_parser_directives("paper.[native-iet].docx") == ("native", "iet")
  132. assert filename_parser_directives("memo.[native-R!].md") == ("native", "R!")
  133. assert filename_parser_directives("report.[-!].pdf") == (None, "!")
  134. assert filename_parser_directives("doc.[mineru].docx") == ("mineru", "")
  135. assert filename_parser_directives("foo.docx") == (None, "")
  136. # Unsupported tokens and old options-only syntax stay unparsed.
  137. assert filename_parser_directives("foo.[!].docx") == (None, "")
  138. assert filename_parser_directives("foo.[draft].docx") == (None, "")
  139. @pytest.mark.offline
  140. def test_filename_hint_rejects_invalid_engine_qualified_options():
  141. """Engine-qualified hints with bad option chars must fail validation
  142. during parser directive resolution instead of silently falling back to
  143. parser rules/defaults.
  144. """
  145. from lightrag.parser.routing import (
  146. canonicalize_parser_hinted_basename,
  147. filename_parser_directives,
  148. resolve_file_parser_directives,
  149. )
  150. # Low-level helpers stay non-throwing for scan grouping/canonicalization.
  151. assert filename_parser_directives("foo.[native-FR].docx") == (None, "")
  152. assert filename_parser_directives("foo.[native-Q].docx") == (None, "")
  153. assert (
  154. canonicalize_parser_hinted_basename("foo.[native-FR].docx")
  155. == "foo.[native-FR].docx"
  156. )
  157. assert (
  158. canonicalize_parser_hinted_basename("foo.[native-Q].docx")
  159. == "foo.[native-Q].docx"
  160. )
  161. with pytest.raises(FilenameParserHintError, match="multiple chunking modes"):
  162. resolve_file_parser_directives("foo.[native-FR].docx")
  163. with pytest.raises(FilenameParserHintError, match="unsupported character"):
  164. resolve_file_parser_directives("foo.[native-Q].docx")
  165. with pytest.raises(FilenameParserHintError, match="options-only filename hints"):
  166. resolve_file_parser_directives("foo.[!].docx")
  167. with pytest.raises(FilenameParserHintError, match="options-only filename hints"):
  168. resolve_file_parser_directives("foo.[iet].docx")
  169. with pytest.raises(
  170. FilenameParserHintError, match="unsupported parser engine 'abc'"
  171. ):
  172. resolve_file_parser_directives("foo.[abc].docx")
  173. with pytest.raises(
  174. FilenameParserHintError, match="unsupported parser engine 'xyz'"
  175. ):
  176. resolve_file_parser_directives("foo.[xyz].docx")
  177. with pytest.raises(FilenameParserHintError, match="is empty"):
  178. resolve_file_parser_directives("foo.[].docx")
  179. with pytest.raises(FilenameParserHintError, match="empty process options"):
  180. resolve_file_parser_directives("foo.[-].docx")
  181. with pytest.raises(FilenameParserHintError, match="empty process options"):
  182. resolve_file_parser_directives("foo.[native-].docx")
  183. @pytest.mark.offline
  184. def test_filename_hint_missing_required_endpoint_rejects(monkeypatch):
  185. from lightrag.parser.routing import resolve_file_parser_directives
  186. monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
  187. with pytest.raises(FilenameParserHintError, match="requires DOCLING_ENDPOINT"):
  188. resolve_file_parser_directives("foo.[docling].docx")
  189. @pytest.mark.offline
  190. def test_parse_process_options_decodes_flags():
  191. from lightrag.parser.routing import parse_process_options
  192. opts = parse_process_options("iet")
  193. assert opts.images and opts.tables and opts.equations
  194. assert not opts.skip_kg
  195. assert opts.chunking == "F"
  196. opts = parse_process_options("R!")
  197. assert opts.skip_kg and opts.chunking == "R"
  198. assert not opts.images and not opts.tables and not opts.equations
  199. opts = parse_process_options("P")
  200. assert opts.chunking == "P"
  201. opts = parse_process_options("")
  202. assert not (opts.images or opts.tables or opts.equations or opts.skip_kg)
  203. assert opts.chunking == "F"
  204. @pytest.mark.offline
  205. def test_validate_process_options_rejects_invalid_combos():
  206. from lightrag.parser.routing import validate_process_options
  207. assert validate_process_options("iet") == []
  208. assert validate_process_options("R!") == []
  209. # F+R conflict is reported.
  210. errs = validate_process_options("FR")
  211. assert any("multiple chunking modes" in m for m in errs)
  212. # Lowercase chunking selectors are not valid.
  213. errs = validate_process_options("f")
  214. assert any("'f'" in m for m in errs)
  215. # Unknown chars are reported individually.
  216. errs = validate_process_options("xyz")
  217. assert sum(1 for m in errs if "unsupported character" in m) == 3
  218. @pytest.mark.offline
  219. def test_lightrag_parser_rule_supports_options_suffix(monkeypatch):
  220. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  221. monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
  222. # Valid options suffix passes validation.
  223. validate_parser_routing_config("docx:native-iet,*:legacy")
  224. # Invalid options suffix is rejected with the rule label and message.
  225. with pytest.raises(ParserRoutingConfigError, match="multiple chunking modes"):
  226. validate_parser_routing_config("docx:native-FR,*:legacy")
  227. with pytest.raises(ParserRoutingConfigError, match="unsupported character"):
  228. validate_parser_routing_config("docx:native-Q,*:legacy")
  229. @pytest.mark.offline
  230. def test_resolve_file_parser_directives_priority(monkeypatch):
  231. from lightrag.parser.routing import resolve_file_parser_directives
  232. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  233. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native-iet,*:legacy")
  234. # Filename hint takes precedence for engine and options.
  235. engine, options = resolve_file_parser_directives("paper.[native-R!].docx")
  236. assert engine == "native"
  237. assert options == "R!"
  238. # No filename hint → fall through to LIGHTRAG_PARSER defaults for both.
  239. engine, options = resolve_file_parser_directives("plain.docx")
  240. assert engine == "native"
  241. assert options == "iet"
  242. # Options-only hint keeps engine from rule but uses hinted options.
  243. engine, options = resolve_file_parser_directives("plain.[-!].docx")
  244. assert engine == "native"
  245. assert options == "!"
  246. @pytest.mark.offline
  247. def test_doc_status_metadata_carry_over_helper():
  248. """``doc_status_transition_metadata`` preserves long-lived per-doc fields
  249. and layers in any transition-specific extras passed via ``extra=``.
  250. Empty / missing carry-over fields are dropped, not written as null.
  251. """
  252. from lightrag.utils_pipeline import doc_status_transition_metadata
  253. class _StubStatusDoc:
  254. def __init__(self, metadata):
  255. self.metadata = metadata
  256. # Carries process_options forward.
  257. md = doc_status_transition_metadata(_StubStatusDoc({"process_options": "iet"}))
  258. assert md == {"process_options": "iet"}
  259. # Carries the internal pending-parse source basename forward for retries.
  260. md = doc_status_transition_metadata(
  261. _StubStatusDoc({"source_file_name": "demo.[mineru].pdf"})
  262. )
  263. assert md == {"source_file_name": "demo.[mineru].pdf"}
  264. # Layers in transition extras while keeping the carry-over.
  265. md = doc_status_transition_metadata(
  266. _StubStatusDoc({"process_options": "R!"}),
  267. extra={"processing_start_time": 12345},
  268. )
  269. assert md == {"process_options": "R!", "processing_start_time": 12345}
  270. # No carry-over when metadata is missing or empty.
  271. assert doc_status_transition_metadata(_StubStatusDoc({})) == {}
  272. assert doc_status_transition_metadata(None) == {}
  273. # Empty / None process_options are not written as null.
  274. assert doc_status_transition_metadata(_StubStatusDoc({"process_options": ""})) == {}
  275. assert (
  276. doc_status_transition_metadata(_StubStatusDoc({"process_options": None})) == {}
  277. )
  278. @pytest.mark.offline
  279. def test_carry_over_keys_grouped_by_stage():
  280. """Strict tuple-equality guard on ``_DOC_STATUS_METADATA_CARRY_OVER_KEYS``.
  281. The tuple order is the WebUI ``DocumentStatusDetailsDialog`` render order,
  282. so per-stage fields must stay grouped (parse-stage trio then analyze-stage
  283. trio). Locking the order here forces any future field addition to update
  284. this assertion alongside the tuple, preventing silent regressions in the
  285. dialog's timeline-ordered display.
  286. """
  287. from lightrag.utils_pipeline import _DOC_STATUS_METADATA_CARRY_OVER_KEYS
  288. assert _DOC_STATUS_METADATA_CARRY_OVER_KEYS == (
  289. "process_options",
  290. "source_file_name",
  291. "parse_warnings",
  292. "chunk_opts",
  293. "parsing_start_time",
  294. "parsing_end_time",
  295. "parse_stage_skipped",
  296. "analyzing_start_time",
  297. "analyzing_end_time",
  298. "analyzing_stage_skipped",
  299. )
  300. @pytest.mark.offline
  301. def test_carry_over_helper_propagates_end_times_and_skipped():
  302. """Stage-end timestamps and skipped flags must survive carry-over so the
  303. PROCESSING / PROCESSED / FAILED upserts keep them visible for post-mortem
  304. stage-duration analysis.
  305. """
  306. from lightrag.utils_pipeline import doc_status_transition_metadata
  307. class _StubStatusDoc:
  308. def __init__(self, metadata):
  309. self.metadata = metadata
  310. md = doc_status_transition_metadata(
  311. _StubStatusDoc(
  312. {
  313. "parsing_start_time": 1700000000,
  314. "parsing_end_time": 1700000010,
  315. "analyzing_start_time": 1700000020,
  316. "analyzing_end_time": 1700000050,
  317. }
  318. )
  319. )
  320. assert md == {
  321. "parsing_start_time": 1700000000,
  322. "parsing_end_time": 1700000010,
  323. "analyzing_start_time": 1700000020,
  324. "analyzing_end_time": 1700000050,
  325. }
  326. # Skipped flags (bool True) also survive carry-over.
  327. md = doc_status_transition_metadata(
  328. _StubStatusDoc(
  329. {
  330. "parse_stage_skipped": True,
  331. "analyzing_stage_skipped": True,
  332. }
  333. )
  334. )
  335. assert md == {
  336. "parse_stage_skipped": True,
  337. "analyzing_stage_skipped": True,
  338. }
  339. def _status_value_text(status):
  340. """Helper: extract the value of a DocStatus enum or raw status string."""
  341. if hasattr(status, "value"):
  342. return status.value
  343. return str(status)
  344. @pytest.mark.offline
  345. def test_doc_status_metadata_survives_processed_transition(tmp_path):
  346. """End-to-end: a document enqueued with process_options must keep
  347. ``metadata.process_options`` set in ``doc_status`` after the pipeline
  348. drives it all the way to PROCESSED. This exercises the full state
  349. machine (PENDING → PARSING → ANALYZING → PROCESSING → PROCESSED) and
  350. asserts the carry-over works at every transition.
  351. """
  352. async def _run():
  353. rag = _new_rag(tmp_path)
  354. await rag.initialize_storages()
  355. try:
  356. await rag.apipeline_enqueue_documents(
  357. "Some content body for chunking.",
  358. file_paths="metadata_carry.txt",
  359. track_id="track-md-carry",
  360. process_options="iet!",
  361. )
  362. doc_id = compute_mdhash_id("metadata_carry.txt", prefix="doc-")
  363. pending_status = await rag.doc_status.get_by_id(doc_id)
  364. assert pending_status is not None
  365. assert (pending_status.get("metadata") or {}).get(
  366. "process_options"
  367. ) == "iet!"
  368. # Run the pipeline through to PROCESSED.
  369. await rag.apipeline_process_enqueue_documents()
  370. final_status = await rag.doc_status.get_by_id(doc_id)
  371. assert final_status is not None
  372. assert _status_value_text(final_status.get("status")) == "processed"
  373. metadata = final_status.get("metadata") or {}
  374. assert metadata.get("process_options") == "iet!", (
  375. f"process_options dropped during state-machine transitions; "
  376. f"final metadata: {metadata!r}"
  377. )
  378. # parse_native on FULL_DOCS_FORMAT_RAW does not actually parse —
  379. # it passes content through verbatim — so the skip branch fires
  380. # and ``parsing_end_time`` stays absent. ``parse_stage_skipped``
  381. # is the cache-hit / no-parse-work sentinel (same field used by
  382. # parse_mineru / parse_docling for raw-bundle cache hits).
  383. assert isinstance(metadata.get("parsing_start_time"), int)
  384. assert metadata.get("parse_stage_skipped") is True
  385. assert "parsing_end_time" not in metadata
  386. # parse_native on raw content returns blocks_path="", which makes
  387. # analyze_multimodal take the "no blocks_path" early-return branch
  388. # and set analyzing_stage_skipped=True (no analyzing_end_time).
  389. assert isinstance(metadata.get("analyzing_start_time"), int)
  390. assert metadata.get("analyzing_stage_skipped") is True
  391. assert "analyzing_end_time" not in metadata
  392. finally:
  393. await rag.finalize_storages()
  394. asyncio.run(_run())
  395. @pytest.mark.offline
  396. def test_analyze_soft_failure_writes_neither_end_time_nor_skipped(tmp_path):
  397. """If ``analyze_multimodal`` returns without setting either the
  398. ``multimodal_processed`` (completion) or ``analyzing_stage_skipped``
  399. (user/config skip) sentinel — e.g. the generic ``except Exception``
  400. soft-swallow path or a malformed-sidecar early return — the worker must
  401. treat the attempt as a soft failure and leave BOTH fields absent. This
  402. distinguishes "analyze actually completed" from "analyze attempted but
  403. bailed" without falsely claiming a duration.
  404. """
  405. async def _run():
  406. rag = _new_rag(tmp_path)
  407. await rag.initialize_storages()
  408. try:
  409. original_analyze_multimodal = rag.analyze_multimodal
  410. async def _soft_failing_analyze(*args, **kwargs):
  411. result = await original_analyze_multimodal(*args, **kwargs)
  412. # Strip both sentinels: simulate analyze_multimodal returning
  413. # parsed_data after the generic except Exception soft-swallow.
  414. result.pop("analyzing_stage_skipped", None)
  415. result.pop("multimodal_processed", None)
  416. return result
  417. rag.analyze_multimodal = _soft_failing_analyze # type: ignore[assignment]
  418. await rag.apipeline_enqueue_documents(
  419. "Soft-fail body for analyze stage.",
  420. file_paths="analyze_soft_fail.txt",
  421. track_id="track-analyze-softfail",
  422. process_options="iet!",
  423. )
  424. doc_id = compute_mdhash_id("analyze_soft_fail.txt", prefix="doc-")
  425. await rag.apipeline_process_enqueue_documents()
  426. final_status = await rag.doc_status.get_by_id(doc_id)
  427. assert final_status is not None
  428. assert _status_value_text(final_status.get("status")) == "processed"
  429. metadata = final_status.get("metadata") or {}
  430. assert isinstance(metadata.get("analyzing_start_time"), int)
  431. assert "analyzing_end_time" not in metadata, (
  432. f"soft-failed analyze incorrectly stamped analyzing_end_time; "
  433. f"final metadata: {metadata!r}"
  434. )
  435. assert "analyzing_stage_skipped" not in metadata, (
  436. f"soft-failed analyze incorrectly stamped analyzing_stage_skipped; "
  437. f"final metadata: {metadata!r}"
  438. )
  439. finally:
  440. await rag.finalize_storages()
  441. asyncio.run(_run())
  442. @pytest.mark.offline
  443. def test_apipeline_enqueue_persists_process_options(tmp_path):
  444. async def _run():
  445. rag = _new_rag(tmp_path)
  446. await rag.initialize_storages()
  447. try:
  448. await rag.apipeline_enqueue_documents(
  449. "alpha body",
  450. file_paths="abc.[native-R!].docx",
  451. track_id="track-opts",
  452. process_options="R!",
  453. )
  454. doc_id = compute_mdhash_id("abc.docx", prefix="doc-")
  455. full_doc = await rag.full_docs.get_by_id(doc_id)
  456. assert full_doc is not None
  457. # full_docs stores the canonical (hint-stripped) basename only.
  458. assert full_doc["file_path"] == "abc.docx"
  459. assert "canonical_basename" not in full_doc
  460. assert full_doc.get("process_options") == "R!"
  461. status_doc = await rag.doc_status.get_by_id(doc_id)
  462. assert status_doc is not None
  463. metadata = (
  464. status_doc.get("metadata")
  465. if isinstance(status_doc, dict)
  466. else getattr(status_doc, "metadata", {})
  467. )
  468. assert metadata.get("process_options") == "R!"
  469. finally:
  470. await rag.finalize_storages()
  471. asyncio.run(_run())
  472. @pytest.mark.offline
  473. def test_purge_doc_chunks_and_kg_is_noop_for_empty_chunks(tmp_path):
  474. """``_purge_doc_chunks_and_kg`` with an empty chunk_ids set must be a
  475. no-op so callers (including the resume branch) can invoke it
  476. unconditionally without first checking for non-empty chunks_list.
  477. """
  478. async def _run():
  479. from lightrag.kg.shared_storage import (
  480. get_namespace_data,
  481. get_namespace_lock,
  482. )
  483. rag = _new_rag(tmp_path)
  484. await rag.initialize_storages()
  485. try:
  486. pipeline_status = await get_namespace_data(
  487. "pipeline_status", workspace=rag.workspace
  488. )
  489. pipeline_status_lock = get_namespace_lock(
  490. "pipeline_status", workspace=rag.workspace
  491. )
  492. # Empty set: must return immediately without touching storage.
  493. await rag._purge_doc_chunks_and_kg(
  494. "doc-empty",
  495. set(),
  496. pipeline_status=pipeline_status,
  497. pipeline_status_lock=pipeline_status_lock,
  498. )
  499. # No exceptions → success. Calling twice in a row is also fine
  500. # since the helper is idempotent on the empty input.
  501. await rag._purge_doc_chunks_and_kg(
  502. "doc-empty",
  503. set(),
  504. pipeline_status=pipeline_status,
  505. pipeline_status_lock=pipeline_status_lock,
  506. )
  507. finally:
  508. await rag.finalize_storages()
  509. asyncio.run(_run())
  510. @pytest.mark.offline
  511. def test_purge_doc_chunks_and_kg_clears_chunks_for_unknown_doc(tmp_path):
  512. """When the doc has chunk_ids but no graph contributions yet
  513. (full_entities / full_relations empty), the helper must still clear
  514. the chunks from chunks_vdb / text_chunks without raising. This
  515. exercises the resume path for documents whose previous run was
  516. interrupted between chunking and entity extraction.
  517. """
  518. async def _run():
  519. from lightrag.kg.shared_storage import (
  520. get_namespace_data,
  521. get_namespace_lock,
  522. )
  523. rag = _new_rag(tmp_path)
  524. await rag.initialize_storages()
  525. try:
  526. # Seed text_chunks + chunks_vdb with two stale chunks.
  527. await rag.text_chunks.upsert(
  528. {
  529. "doc-X-chunk-0": {
  530. "content": "stale chunk 0",
  531. "chunk_order_index": 0,
  532. "full_doc_id": "doc-X",
  533. "tokens": 4,
  534. "file_path": "x.txt",
  535. },
  536. "doc-X-chunk-1": {
  537. "content": "stale chunk 1",
  538. "chunk_order_index": 1,
  539. "full_doc_id": "doc-X",
  540. "tokens": 4,
  541. "file_path": "x.txt",
  542. },
  543. }
  544. )
  545. await rag.chunks_vdb.upsert(
  546. {
  547. "doc-X-chunk-0": {
  548. "content": "stale chunk 0",
  549. "chunk_order_index": 0,
  550. "full_doc_id": "doc-X",
  551. "tokens": 4,
  552. "file_path": "x.txt",
  553. },
  554. "doc-X-chunk-1": {
  555. "content": "stale chunk 1",
  556. "chunk_order_index": 1,
  557. "full_doc_id": "doc-X",
  558. "tokens": 4,
  559. "file_path": "x.txt",
  560. },
  561. }
  562. )
  563. await rag.text_chunks.index_done_callback()
  564. await rag.chunks_vdb.index_done_callback()
  565. pipeline_status = await get_namespace_data(
  566. "pipeline_status", workspace=rag.workspace
  567. )
  568. pipeline_status_lock = get_namespace_lock(
  569. "pipeline_status", workspace=rag.workspace
  570. )
  571. await rag._purge_doc_chunks_and_kg(
  572. "doc-X",
  573. {"doc-X-chunk-0", "doc-X-chunk-1"},
  574. pipeline_status=pipeline_status,
  575. pipeline_status_lock=pipeline_status_lock,
  576. )
  577. # Both chunks gone from text_chunks.
  578. remaining = await rag.text_chunks.get_by_ids(
  579. ["doc-X-chunk-0", "doc-X-chunk-1"]
  580. )
  581. assert remaining == [None, None]
  582. finally:
  583. await rag.finalize_storages()
  584. asyncio.run(_run())
  585. @pytest.mark.offline
  586. def test_resume_purges_old_chunks_when_content_already_extracted(tmp_path):
  587. """When ``apipeline_process_enqueue_documents`` picks up a document
  588. whose content is already extracted (full_docs.format=raw with content)
  589. and whose doc_status carries a non-empty chunks_list from a previous
  590. half-finished run, the resume branch must call
  591. ``_purge_doc_chunks_and_kg`` with the old chunk-IDs *before* the
  592. chunking and entity-extraction stages run. This test wraps the
  593. helper so we can assert it is invoked exactly once with the expected
  594. inputs, then bails out so we don't have to mock the whole VLM /
  595. entity-extract stack.
  596. """
  597. async def _run():
  598. rag = _new_rag(tmp_path)
  599. await rag.initialize_storages()
  600. try:
  601. doc_id = compute_mdhash_id("resume.txt", prefix="doc-")
  602. # Seed full_docs as if extraction already completed.
  603. await rag.full_docs.upsert(
  604. {
  605. doc_id: {
  606. "content": "previously extracted body",
  607. "file_path": "resume.txt",
  608. "parse_format": "raw",
  609. "parse_engine": "legacy",
  610. "content_hash": "deadbeef",
  611. }
  612. }
  613. )
  614. # Seed doc_status as PROCESSING with chunks_list from a prior
  615. # half-finished run so the resume branch has something to purge.
  616. stale_chunks = [f"{doc_id}-chunk-{i:03d}" for i in range(2)]
  617. await rag.doc_status.upsert(
  618. {
  619. doc_id: {
  620. "status": DocStatus.PROCESSING,
  621. "content_summary": "previously extracted body",
  622. "content_length": len("previously extracted body"),
  623. "created_at": "2026-01-01T00:00:00+00:00",
  624. "updated_at": "2026-01-01T00:00:01+00:00",
  625. "file_path": "resume.txt",
  626. "track_id": "track-resume",
  627. "content_hash": "deadbeef",
  628. "chunks_list": stale_chunks,
  629. "chunks_count": len(stale_chunks),
  630. }
  631. }
  632. )
  633. # Wrap the helper to record invocations, and raise after the call
  634. # so the test exits cleanly without exercising downstream stages.
  635. calls: list[tuple[str, set[str]]] = []
  636. original = rag._purge_doc_chunks_and_kg
  637. class _ResumePurged(Exception):
  638. pass
  639. async def _wrapped(doc_id_arg, chunk_ids_arg, **kwargs):
  640. calls.append((doc_id_arg, set(chunk_ids_arg)))
  641. # Run the real helper so the side-effects (chunks gone from
  642. # storage) are observable, then short-circuit.
  643. await original(doc_id_arg, chunk_ids_arg, **kwargs)
  644. raise _ResumePurged()
  645. rag._purge_doc_chunks_and_kg = _wrapped # type: ignore[method-assign]
  646. # Pipeline will pick up the PROCESSING document, hit the resume
  647. # branch, call our wrapped purge, and our wrapper raises.
  648. await rag.apipeline_process_enqueue_documents()
  649. # Helper was invoked exactly once with the stale chunk-IDs.
  650. assert len(calls) == 1
  651. invoked_doc_id, invoked_chunks = calls[0]
  652. assert invoked_doc_id == doc_id
  653. assert invoked_chunks == set(stale_chunks)
  654. finally:
  655. await rag.finalize_storages()
  656. asyncio.run(_run())
  657. @pytest.mark.offline
  658. def test_resume_skips_purge_when_chunks_list_empty(tmp_path):
  659. """If the doc was extracted but never chunked (chunks_list empty),
  660. the resume branch must NOT call the purge helper — there's nothing
  661. to clean up.
  662. """
  663. async def _run():
  664. rag = _new_rag(tmp_path)
  665. await rag.initialize_storages()
  666. try:
  667. doc_id = compute_mdhash_id("noskip.txt", prefix="doc-")
  668. await rag.full_docs.upsert(
  669. {
  670. doc_id: {
  671. "content": "fresh body",
  672. "file_path": "noskip.txt",
  673. "parse_format": "raw",
  674. "parse_engine": "legacy",
  675. "content_hash": "fresh",
  676. }
  677. }
  678. )
  679. await rag.doc_status.upsert(
  680. {
  681. doc_id: {
  682. "status": DocStatus.PARSING,
  683. "content_summary": "fresh body",
  684. "content_length": len("fresh body"),
  685. "created_at": "2026-01-01T00:00:00+00:00",
  686. "updated_at": "2026-01-01T00:00:01+00:00",
  687. "file_path": "noskip.txt",
  688. "track_id": "track-noskip",
  689. "content_hash": "fresh",
  690. "chunks_list": [],
  691. "chunks_count": 0,
  692. }
  693. }
  694. )
  695. calls: list[tuple[str, set[str]]] = []
  696. async def _spy(doc_id_arg, chunk_ids_arg, **kwargs):
  697. calls.append((doc_id_arg, set(chunk_ids_arg)))
  698. # Don't actually purge; just record the call and let the
  699. # pipeline continue past this test boundary.
  700. raise RuntimeError("test stop after purge check")
  701. rag._purge_doc_chunks_and_kg = _spy # type: ignore[method-assign]
  702. try:
  703. await rag.apipeline_process_enqueue_documents()
  704. except Exception:
  705. # Whether the pipeline reaches our spy or fails downstream
  706. # doesn't matter for this test; we only care that the spy
  707. # was NOT called for an empty chunks_list.
  708. pass
  709. assert (
  710. calls == []
  711. ), "purge helper should not be called when chunks_list is empty"
  712. finally:
  713. await rag.finalize_storages()
  714. asyncio.run(_run())
  715. @pytest.mark.offline
  716. def test_apipeline_enqueue_allows_concurrent_with_busy(tmp_path):
  717. """``busy=True`` no longer blocks enqueue. Concurrent processing is
  718. explicitly permitted — the running loop's request_pending mechanism
  719. picks up newly-enqueued docs after the current batch. Enqueue
  720. nudges request_pending so a freshly-arrived doc is never stranded
  721. when the call site does not subsequently invoke
  722. ``apipeline_process_enqueue_documents``.
  723. """
  724. async def _run():
  725. from lightrag.kg.shared_storage import (
  726. get_namespace_data,
  727. get_namespace_lock,
  728. )
  729. rag = _new_rag(tmp_path)
  730. await rag.initialize_storages()
  731. try:
  732. pipeline_status = await get_namespace_data(
  733. "pipeline_status", workspace=rag.workspace
  734. )
  735. pipeline_status_lock = get_namespace_lock(
  736. "pipeline_status", workspace=rag.workspace
  737. )
  738. # Simulate an in-flight indexing job.
  739. async with pipeline_status_lock:
  740. pipeline_status["busy"] = True
  741. pipeline_status["request_pending"] = False
  742. try:
  743. returned_track_id = await rag.apipeline_enqueue_documents(
  744. "concurrent with busy",
  745. file_paths="concurrent.txt",
  746. track_id="track-concurrent",
  747. )
  748. assert returned_track_id == "track-concurrent"
  749. # Enqueue nudged the running loop.
  750. assert pipeline_status.get("request_pending") is True
  751. finally:
  752. async with pipeline_status_lock:
  753. pipeline_status["busy"] = False
  754. pipeline_status["request_pending"] = False
  755. finally:
  756. await rag.finalize_storages()
  757. asyncio.run(_run())
  758. @pytest.mark.offline
  759. def test_apipeline_enqueue_rejects_when_scanning(tmp_path):
  760. """Scan is the only state that blocks new enqueues — scan reads
  761. doc_status to make classification decisions and would race with
  762. mid-flight writes. The last-line guard inside
  763. ``apipeline_enqueue_documents`` enforces this: HTTP endpoints catch
  764. it earlier and return 409, but core API callers must surface the
  765. invariant violation as a RuntimeError.
  766. """
  767. async def _run():
  768. from lightrag.kg.shared_storage import (
  769. get_namespace_data,
  770. get_namespace_lock,
  771. )
  772. rag = _new_rag(tmp_path)
  773. await rag.initialize_storages()
  774. try:
  775. pipeline_status = await get_namespace_data(
  776. "pipeline_status", workspace=rag.workspace
  777. )
  778. pipeline_status_lock = get_namespace_lock(
  779. "pipeline_status", workspace=rag.workspace
  780. )
  781. # Scan classification phase rejects. ``scanning_exclusive``
  782. # is the field that gates the enqueue last-line guard, not
  783. # plain ``scanning`` (which covers the whole scan lifecycle
  784. # including its processing phase, where concurrent enqueue
  785. # is allowed).
  786. async with pipeline_status_lock:
  787. pipeline_status["scanning"] = True
  788. pipeline_status["scanning_exclusive"] = True
  789. try:
  790. with pytest.raises(RuntimeError, match="scan is classifying"):
  791. await rag.apipeline_enqueue_documents(
  792. "should not enqueue",
  793. file_paths="scan.txt",
  794. track_id="track-scan",
  795. )
  796. finally:
  797. async with pipeline_status_lock:
  798. pipeline_status["scanning"] = False
  799. pipeline_status["scanning_exclusive"] = False
  800. # Scan processing phase (scanning=True, scanning_exclusive=False)
  801. # ALLOWS concurrent enqueue — same as upload-during-busy.
  802. async with pipeline_status_lock:
  803. pipeline_status["scanning"] = True
  804. pipeline_status["scanning_exclusive"] = False
  805. try:
  806. track_processing = await rag.apipeline_enqueue_documents(
  807. "allowed during scan processing",
  808. file_paths="scan_processing.txt",
  809. track_id="track-scan-processing",
  810. )
  811. assert track_processing == "track-scan-processing"
  812. finally:
  813. async with pipeline_status_lock:
  814. pipeline_status["scanning"] = False
  815. # When idle, the same call succeeds — proving the guard is the
  816. # only thing blocking, not some side effect of the test setup.
  817. await rag.apipeline_enqueue_documents(
  818. "now allowed",
  819. file_paths="ok.txt",
  820. track_id="track-ok",
  821. )
  822. finally:
  823. await rag.finalize_storages()
  824. asyncio.run(_run())
  825. @pytest.mark.offline
  826. def test_enqueue_during_busy_sets_request_pending(tmp_path):
  827. """While the processing loop is running (busy=True), a concurrent
  828. enqueue must set ``request_pending`` so the loop knows to scan
  829. doc_status again after its current batch. This is the mechanism
  830. that makes "upload while pipeline is busy" actually drain the new
  831. work — without it, freshly enqueued docs would be stranded until
  832. an unrelated trigger.
  833. """
  834. async def _run():
  835. from lightrag.kg.shared_storage import (
  836. get_namespace_data,
  837. get_namespace_lock,
  838. )
  839. rag = _new_rag(tmp_path)
  840. await rag.initialize_storages()
  841. try:
  842. pipeline_status = await get_namespace_data(
  843. "pipeline_status", workspace=rag.workspace
  844. )
  845. pipeline_status_lock = get_namespace_lock(
  846. "pipeline_status", workspace=rag.workspace
  847. )
  848. async with pipeline_status_lock:
  849. pipeline_status["busy"] = True
  850. pipeline_status["request_pending"] = False
  851. try:
  852. # First enqueue: nudges request_pending.
  853. await rag.apipeline_enqueue_documents(
  854. "first while busy",
  855. file_paths="first.txt",
  856. track_id="track-first",
  857. )
  858. assert pipeline_status.get("request_pending") is True
  859. # Second enqueue while busy: stays True (idempotent).
  860. async with pipeline_status_lock:
  861. pipeline_status["request_pending"] = False
  862. await rag.apipeline_enqueue_documents(
  863. "second while busy",
  864. file_paths="second.txt",
  865. track_id="track-second",
  866. )
  867. assert pipeline_status.get("request_pending") is True
  868. finally:
  869. async with pipeline_status_lock:
  870. pipeline_status["busy"] = False
  871. pipeline_status["request_pending"] = False
  872. # When idle, enqueue does NOT set request_pending — there is
  873. # no running loop to nudge.
  874. await rag.apipeline_enqueue_documents(
  875. "while idle",
  876. file_paths="idle.txt",
  877. track_id="track-idle",
  878. )
  879. assert pipeline_status.get("request_pending") is False
  880. finally:
  881. await rag.finalize_storages()
  882. asyncio.run(_run())
  883. @pytest.mark.offline
  884. def test_atomic_release_busy_or_consume_pending(tmp_path):
  885. """The loop-exit handoff is atomic via
  886. ``_atomic_release_busy_or_consume_pending``: the same critical
  887. section that reads ``request_pending`` also writes ``busy=False``.
  888. This closes the race where a concurrent enqueue could set
  889. ``request_pending=True`` between the loop's read of the flag and
  890. the finally block's ``busy=False`` write — leaving newly-enqueued
  891. docs stranded in PENDING with no running loop to consume them.
  892. The helper has two outcomes:
  893. * ``request_pending=True`` at entry → flag cleared, return False
  894. (caller must continue the loop, refetching doc_status).
  895. * ``request_pending=False`` at entry → ``busy`` cleared, return
  896. True (caller must break out without re-clearing busy).
  897. Tested directly because the closure pattern inside
  898. ``apipeline_process_enqueue_documents`` is otherwise hard to
  899. exercise from a unit test without orchestrating real concurrency.
  900. """
  901. async def _run():
  902. from lightrag.kg.shared_storage import (
  903. get_namespace_data,
  904. get_namespace_lock,
  905. )
  906. rag = _new_rag(tmp_path)
  907. await rag.initialize_storages()
  908. try:
  909. pipeline_status = await get_namespace_data(
  910. "pipeline_status", workspace=rag.workspace
  911. )
  912. pipeline_status_lock = get_namespace_lock(
  913. "pipeline_status", workspace=rag.workspace
  914. )
  915. # Case 1: simulate the race — request_pending was set by a
  916. # concurrent enqueue while busy=True. Helper must consume
  917. # the flag and return False (continue loop) rather than
  918. # silently exit.
  919. async with pipeline_status_lock:
  920. pipeline_status["busy"] = True
  921. pipeline_status["request_pending"] = True
  922. released = await rag._atomic_release_busy_or_consume_pending(
  923. pipeline_status, pipeline_status_lock
  924. )
  925. assert released is False
  926. assert pipeline_status["busy"] is True # NOT released
  927. assert pipeline_status["request_pending"] is False # consumed
  928. # Case 2: clean exit path — no concurrent enqueue. Helper
  929. # releases busy under the SAME lock so any post-call
  930. # enqueue can either see busy=False (and trigger its own
  931. # process pass) or had to set request_pending BEFORE this
  932. # call (handled by Case 1). No stranded flag possible.
  933. async with pipeline_status_lock:
  934. pipeline_status["busy"] = True
  935. pipeline_status["request_pending"] = False
  936. released = await rag._atomic_release_busy_or_consume_pending(
  937. pipeline_status, pipeline_status_lock
  938. )
  939. assert released is True
  940. assert pipeline_status["busy"] is False # released
  941. assert pipeline_status["request_pending"] is False
  942. finally:
  943. await rag.finalize_storages()
  944. asyncio.run(_run())
  945. @pytest.mark.offline
  946. def test_apipeline_enqueue_rejects_when_destructive_busy(tmp_path):
  947. """``destructive_busy`` (set by /documents/clear and per-doc delete)
  948. must reject enqueue at the last-line guard. These jobs DROP
  949. storages and remove input files; concurrent enqueue would write to
  950. storages mid-drop and silently lose the document. Note: this is
  951. different from plain ``busy=True`` (the processing loop), which is
  952. explicitly compatible with concurrent enqueue.
  953. """
  954. async def _run():
  955. from lightrag.kg.shared_storage import (
  956. get_namespace_data,
  957. get_namespace_lock,
  958. )
  959. rag = _new_rag(tmp_path)
  960. await rag.initialize_storages()
  961. try:
  962. pipeline_status = await get_namespace_data(
  963. "pipeline_status", workspace=rag.workspace
  964. )
  965. pipeline_status_lock = get_namespace_lock(
  966. "pipeline_status", workspace=rag.workspace
  967. )
  968. async with pipeline_status_lock:
  969. pipeline_status["busy"] = True
  970. pipeline_status["destructive_busy"] = True
  971. try:
  972. with pytest.raises(RuntimeError, match="clearing or deleting"):
  973. await rag.apipeline_enqueue_documents(
  974. "should not enqueue",
  975. file_paths="while_clearing.txt",
  976. track_id="track-clearing",
  977. )
  978. # ``from_scan`` does NOT bypass destructive_busy: scan
  979. # is also a writer and would race with the drop.
  980. with pytest.raises(RuntimeError, match="clearing or deleting"):
  981. await rag.apipeline_enqueue_documents(
  982. "should not enqueue",
  983. file_paths="while_clearing_scan.txt",
  984. track_id="track-clearing-scan",
  985. from_scan=True,
  986. )
  987. finally:
  988. async with pipeline_status_lock:
  989. pipeline_status["busy"] = False
  990. pipeline_status["destructive_busy"] = False
  991. finally:
  992. await rag.finalize_storages()
  993. asyncio.run(_run())
  994. @pytest.mark.offline
  995. def test_concurrent_enqueue_dedupes_same_content_different_filenames(tmp_path):
  996. """Two concurrent ``apipeline_enqueue_documents`` calls with the
  997. same content but different filenames must not both end up as
  998. PENDING. The dedup-and-upsert critical section is serialised by
  999. a workspace-scoped lock so the second call always sees the first's
  1000. upserted row and is recorded as ``duplicate_kind=content_hash``.
  1001. The race only matters now that concurrent enqueue is permitted
  1002. (busy=True doesn't block, scan's processing phase doesn't block).
  1003. Without the lock, two enqueues can both read doc_status before
  1004. either upserts, both miss the content_hash dedup, and both write
  1005. PENDING — bypassing the dedup that's supposed to land one of them
  1006. as FAILED.
  1007. Determinism trick: patch ``get_existing_doc_by_content_hash`` to
  1008. yield via ``asyncio.sleep(0)`` before reading. This guarantees the
  1009. asyncio scheduler interleaves the two coroutines at the dedup
  1010. read, so without the serialise lock both would miss the existing
  1011. row. With the lock, the second coroutine waits until the first
  1012. has finished upserting, then sees the row.
  1013. """
  1014. async def _run():
  1015. import lightrag.pipeline as pipeline_module
  1016. rag = _new_rag(tmp_path)
  1017. await rag.initialize_storages()
  1018. try:
  1019. original = pipeline_module.get_existing_doc_by_content_hash
  1020. async def yielding_get_by_content_hash(doc_status, content_hash):
  1021. # Yield to the event loop so the SECOND enqueue gets a
  1022. # chance to run its dedup read before we proceed. This
  1023. # is the exact interleaving the lock must defeat.
  1024. await asyncio.sleep(0)
  1025. return await original(doc_status, content_hash)
  1026. import unittest.mock
  1027. with unittest.mock.patch.object(
  1028. pipeline_module,
  1029. "get_existing_doc_by_content_hash",
  1030. yielding_get_by_content_hash,
  1031. ):
  1032. # Same content, two distinct filenames so the basename
  1033. # dedup misses and the content_hash dedup is the gate.
  1034. shared_content = "shared content for dedup race"
  1035. results = await asyncio.gather(
  1036. rag.apipeline_enqueue_documents(
  1037. shared_content,
  1038. file_paths="first.txt",
  1039. track_id="track-first",
  1040. ),
  1041. rag.apipeline_enqueue_documents(
  1042. shared_content,
  1043. file_paths="second.txt",
  1044. track_id="track-second",
  1045. ),
  1046. )
  1047. # First call enqueues the doc and returns its track_id.
  1048. # Second call sees the upserted row inside the
  1049. # serialised dedup section, finds zero unique docs, and
  1050. # returns None (the existing "no new unique docs"
  1051. # early-exit path). The duplicate record is still
  1052. # written to doc_status as FAILED.
  1053. assert results[0] == "track-first"
  1054. assert results[1] is None
  1055. # Exactly ONE PENDING doc should exist for this content,
  1056. # not two. The second enqueue must have been recorded as a
  1057. # content_hash duplicate (FAILED with metadata).
  1058. pending_docs = await rag.doc_status.get_docs_by_statuses(
  1059. [DocStatus.PENDING]
  1060. )
  1061. assert len(pending_docs) == 1, (
  1062. f"Expected exactly 1 PENDING doc after concurrent enqueue, "
  1063. f"got {len(pending_docs)}: {list(pending_docs.keys())}"
  1064. )
  1065. # The duplicate record (FAILED + duplicate_kind=content_hash)
  1066. # carries the second filename and the metadata pointer back
  1067. # to the original.
  1068. failed_docs = await rag.doc_status.get_docs_by_statuses([DocStatus.FAILED])
  1069. duplicate_records = [
  1070. d
  1071. for d in failed_docs.values()
  1072. if (
  1073. getattr(d, "metadata", None)
  1074. and d.metadata.get("duplicate_kind") == "content_hash"
  1075. )
  1076. ]
  1077. assert len(duplicate_records) == 1, (
  1078. f"Expected exactly 1 content_hash-duplicate FAILED row, "
  1079. f"got {len(duplicate_records)}"
  1080. )
  1081. dup = duplicate_records[0]
  1082. assert dup.metadata["is_duplicate"] is True
  1083. assert dup.metadata["original_doc_id"]
  1084. finally:
  1085. await rag.finalize_storages()
  1086. asyncio.run(_run())
  1087. @pytest.mark.offline
  1088. def test_apipeline_enqueue_from_scan_bypasses_scanning_guard(tmp_path):
  1089. """The scan-owned background task sets ``scanning=True`` itself, so its
  1090. own enqueue calls must be allowed through. External callers (without
  1091. ``from_scan=True``) remain blocked. ``busy=True`` no longer rejects
  1092. enqueue (concurrent processing is permitted under the new contract),
  1093. so it is not exercised here.
  1094. """
  1095. async def _run():
  1096. from lightrag.kg.shared_storage import (
  1097. get_namespace_data,
  1098. get_namespace_lock,
  1099. )
  1100. rag = _new_rag(tmp_path)
  1101. await rag.initialize_storages()
  1102. try:
  1103. pipeline_status = await get_namespace_data(
  1104. "pipeline_status", workspace=rag.workspace
  1105. )
  1106. pipeline_status_lock = get_namespace_lock(
  1107. "pipeline_status", workspace=rag.workspace
  1108. )
  1109. # Scan classification phase: scanning_exclusive=True, but
  1110. # ``from_scan=True`` lifts the guard so scan can enqueue
  1111. # files it just discovered. Non-scan callers are still
  1112. # rejected.
  1113. async with pipeline_status_lock:
  1114. pipeline_status["scanning"] = True
  1115. pipeline_status["scanning_exclusive"] = True
  1116. try:
  1117. returned_track_id = await rag.apipeline_enqueue_documents(
  1118. "scan-owned content",
  1119. file_paths="scan_owned.txt",
  1120. track_id="track-scan-owned",
  1121. from_scan=True,
  1122. )
  1123. assert returned_track_id == "track-scan-owned"
  1124. with pytest.raises(RuntimeError, match="scan is classifying"):
  1125. await rag.apipeline_enqueue_documents(
  1126. "external content",
  1127. file_paths="external.txt",
  1128. track_id="track-external",
  1129. )
  1130. finally:
  1131. async with pipeline_status_lock:
  1132. pipeline_status["scanning"] = False
  1133. pipeline_status["scanning_exclusive"] = False
  1134. finally:
  1135. await rag.finalize_storages()
  1136. asyncio.run(_run())
  1137. @pytest.mark.offline
  1138. def test_analyze_multimodal_overwrites_already_analyzed_items(tmp_path):
  1139. """Re-running analyze_multimodal recomputes enabled modalities and
  1140. overwrites any prior ``llm_analyze_result`` from the sidecar.
  1141. """
  1142. async def _run():
  1143. vlm_calls = {"n": 0}
  1144. extract_calls = {"n": 0}
  1145. async def _vlm(_prompt, **_kwargs):
  1146. vlm_calls["n"] += 1
  1147. return json.dumps(
  1148. {
  1149. "name": "Image",
  1150. "type": "Chart",
  1151. "description": "details",
  1152. }
  1153. )
  1154. async def _extract(_prompt, **_kwargs):
  1155. extract_calls["n"] += 1
  1156. return json.dumps(
  1157. {
  1158. "name": "Item",
  1159. "description": "table content summary",
  1160. }
  1161. )
  1162. rag = _new_rag(
  1163. tmp_path,
  1164. vlm_llm_model_func=_vlm,
  1165. extract_llm_model_func=_extract,
  1166. )
  1167. await rag.initialize_storages()
  1168. # Minimal blocks file with valid meta.
  1169. blocks = tmp_path / "demo.blocks.jsonl"
  1170. blocks.write_text(
  1171. "\n".join(
  1172. [
  1173. json.dumps({"type": "meta", "format_version": "1.0"}),
  1174. json.dumps({"type": "content", "content": "body"}),
  1175. ]
  1176. )
  1177. + "\n",
  1178. encoding="utf-8",
  1179. )
  1180. # 64x64 PNG so the image-pixel skip guard does NOT short-circuit
  1181. # before the VLM call.
  1182. img_path = tmp_path / "img1.png"
  1183. import struct
  1184. import zlib
  1185. def _png_bytes(w: int, h: int) -> bytes:
  1186. sig = b"\x89PNG\r\n\x1a\n"
  1187. ihdr = struct.pack(">II", w, h) + b"\x08\x06\x00\x00\x00"
  1188. crc = zlib.crc32(b"IHDR" + ihdr).to_bytes(4, "big")
  1189. ihdr_chunk = struct.pack(">I", len(ihdr)) + b"IHDR" + ihdr + crc
  1190. idat_payload = b"\x00" * (w * h * 4 + h)
  1191. compressed = zlib.compress(idat_payload)
  1192. crc_idat = zlib.crc32(b"IDAT" + compressed).to_bytes(4, "big")
  1193. idat_chunk = (
  1194. struct.pack(">I", len(compressed)) + b"IDAT" + compressed + crc_idat
  1195. )
  1196. iend_chunk = b"\x00\x00\x00\x00IEND\xaeB`\x82"
  1197. return sig + ihdr_chunk + idat_chunk + iend_chunk
  1198. img_path.write_bytes(_png_bytes(64, 64))
  1199. # Drawings sidecar with ONE item already analyzed (status=success).
  1200. drawings = tmp_path / "demo.drawings.json"
  1201. drawings.write_text(
  1202. json.dumps(
  1203. {
  1204. "version": "1.0",
  1205. "drawings": {
  1206. "id1": {
  1207. "id": "id1",
  1208. "caption": "fig1",
  1209. "path": str(img_path),
  1210. "llm_analyze_result": {
  1211. "name": "Existing",
  1212. "type": "Photo",
  1213. "description": "kept as-is",
  1214. "analyze_time": 1700000000,
  1215. "status": "success",
  1216. "message": "",
  1217. },
  1218. }
  1219. },
  1220. }
  1221. ),
  1222. encoding="utf-8",
  1223. )
  1224. # Tables sidecar with one fresh item (no prior result).
  1225. tables = tmp_path / "demo.tables.json"
  1226. tables.write_text(
  1227. json.dumps(
  1228. {
  1229. "version": "1.0",
  1230. "tables": {
  1231. "tbl1": {
  1232. "id": "tbl1",
  1233. "caption": "tbl",
  1234. "content": "Header|Row",
  1235. }
  1236. },
  1237. }
  1238. ),
  1239. encoding="utf-8",
  1240. )
  1241. parsed = {
  1242. "doc_id": "doc-1",
  1243. "file_path": "demo.pdf",
  1244. "blocks_path": str(blocks),
  1245. "content": "body",
  1246. }
  1247. await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="it")
  1248. drawings_payload = json.loads(drawings.read_text(encoding="utf-8"))
  1249. existing = drawings_payload["drawings"]["id1"]["llm_analyze_result"]
  1250. # Existing result was overwritten by the new VLM result.
  1251. assert existing["name"] == "Image"
  1252. assert existing["description"] == "details"
  1253. assert existing["status"] == "success"
  1254. tables_payload = json.loads(tables.read_text(encoding="utf-8"))
  1255. new_result = tables_payload["tables"]["tbl1"]["llm_analyze_result"]
  1256. assert new_result["name"] == "Item"
  1257. assert new_result["status"] == "success"
  1258. # Drawings are recomputed through VLM; tables take the EXTRACT role
  1259. # (per design §3.1), not VLM.
  1260. assert vlm_calls["n"] == 1
  1261. assert extract_calls["n"] == 1
  1262. asyncio.run(_run())
  1263. @pytest.mark.offline
  1264. def test_enqueue_dedupes_by_filename_and_content_hash(tmp_path):
  1265. async def _run():
  1266. rag = _new_rag(tmp_path)
  1267. await rag.initialize_storages()
  1268. try:
  1269. # Distinct filenames with distinct content both get enqueued.
  1270. await rag.apipeline_enqueue_documents(
  1271. ["alpha body", "beta body"],
  1272. file_paths=["first.txt", "second.txt"],
  1273. track_id="track-a",
  1274. )
  1275. first_id = compute_mdhash_id("first.txt", prefix="doc-")
  1276. second_id = compute_mdhash_id("second.txt", prefix="doc-")
  1277. first_doc = await rag.full_docs.get_by_id(first_id)
  1278. second_doc = await rag.full_docs.get_by_id(second_id)
  1279. assert first_doc is not None
  1280. assert second_doc is not None
  1281. assert first_doc.get("content_hash")
  1282. assert second_doc.get("content_hash")
  1283. assert first_doc["content_hash"] != second_doc["content_hash"]
  1284. # Same filename basename with new content is rejected (filename dedup).
  1285. await rag.apipeline_enqueue_documents(
  1286. "changed content",
  1287. file_paths="/tmp/first.txt",
  1288. track_id="track-b",
  1289. )
  1290. first_doc = await rag.full_docs.get_by_id(first_id)
  1291. assert first_doc["content"] == "alpha body"
  1292. # New filename but same content as an existing doc is rejected
  1293. # (content_hash dedup).
  1294. await rag.apipeline_enqueue_documents(
  1295. "alpha body",
  1296. file_paths="third.txt",
  1297. track_id="track-c",
  1298. )
  1299. third_id = compute_mdhash_id("third.txt", prefix="doc-")
  1300. assert await rag.full_docs.get_by_id(third_id) is None
  1301. failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
  1302. kinds = {
  1303. getattr(doc, "metadata", {}).get("duplicate_kind")
  1304. for doc in failed_docs.values()
  1305. if getattr(doc, "metadata", {}).get("is_duplicate")
  1306. }
  1307. assert {"filename", "content_hash"}.issubset(kinds)
  1308. finally:
  1309. await rag.finalize_storages()
  1310. asyncio.run(_run())
  1311. @pytest.mark.offline
  1312. def test_enqueue_dedupes_parser_hinted_filename_variants(tmp_path):
  1313. async def _run():
  1314. rag = _new_rag(tmp_path)
  1315. await rag.initialize_storages()
  1316. try:
  1317. await rag.apipeline_enqueue_documents(
  1318. "alpha body",
  1319. file_paths="abc.docx",
  1320. track_id="track-a",
  1321. )
  1322. first_id = compute_mdhash_id("abc.docx", prefix="doc-")
  1323. first_doc = await rag.full_docs.get_by_id(first_id)
  1324. assert first_doc is not None
  1325. # ``file_path`` is the canonical (hint-stripped) basename and
  1326. # serves as the dedup key — no separate ``canonical_basename``
  1327. # field is written.
  1328. assert first_doc["file_path"] == "abc.docx"
  1329. assert "canonical_basename" not in first_doc
  1330. await rag.apipeline_enqueue_documents(
  1331. "changed body",
  1332. file_paths="/tmp/abc.[native].docx",
  1333. track_id="track-b",
  1334. )
  1335. assert (await rag.full_docs.get_by_id(first_id))["content"] == "alpha body"
  1336. failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
  1337. # The duplicate record stores the canonical basename — hint is
  1338. # not preserved anywhere in the new schema.
  1339. assert any(
  1340. getattr(doc, "metadata", {}).get("duplicate_kind") == "filename"
  1341. and getattr(doc, "file_path", "") == "abc.docx"
  1342. for doc in failed_docs.values()
  1343. )
  1344. finally:
  1345. await rag.finalize_storages()
  1346. asyncio.run(_run())
  1347. @pytest.mark.offline
  1348. def test_delete_result_uses_canonical_file_path(tmp_path):
  1349. async def _run():
  1350. rag = _new_rag(tmp_path)
  1351. await rag.initialize_storages()
  1352. try:
  1353. await rag.apipeline_enqueue_documents(
  1354. "",
  1355. file_paths=str(tmp_path / "abc.[native].docx"),
  1356. docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
  1357. parse_engine=PARSER_ENGINE_NATIVE,
  1358. track_id="track-delete-source",
  1359. )
  1360. doc_id = compute_mdhash_id("abc.docx", prefix="doc-")
  1361. result = await rag.adelete_by_doc_id(doc_id)
  1362. assert result.status == "success"
  1363. # New schema: file_path is the canonical (hint-stripped)
  1364. # basename; the ``source_path`` field is no longer carried on
  1365. # DeletionResult.
  1366. assert result.file_path == "abc.docx"
  1367. assert not hasattr(result, "source_path")
  1368. finally:
  1369. await rag.finalize_storages()
  1370. asyncio.run(_run())
  1371. @pytest.mark.offline
  1372. def test_enqueue_without_file_paths_uses_content_ids(tmp_path):
  1373. async def _run():
  1374. rag = _new_rag(tmp_path)
  1375. await rag.initialize_storages()
  1376. try:
  1377. docs = ["alpha without source", "beta without source"]
  1378. await rag.apipeline_enqueue_documents(docs, track_id="track-no-source")
  1379. for content in docs:
  1380. doc_id = compute_mdhash_id(content, prefix="doc-")
  1381. full_doc = await rag.full_docs.get_by_id(doc_id)
  1382. status = await rag.doc_status.get_by_id(doc_id)
  1383. assert full_doc is not None
  1384. assert status is not None
  1385. assert full_doc["content"] == content
  1386. assert full_doc["file_path"] == "unknown_source"
  1387. assert status["file_path"] == "unknown_source"
  1388. assert full_doc.get("content_hash")
  1389. assert status.get("content_hash") == full_doc.get("content_hash")
  1390. failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
  1391. duplicate_failures = [
  1392. doc
  1393. for doc in failed_docs.values()
  1394. if getattr(doc, "track_id", "") == "track-no-source"
  1395. and getattr(doc, "metadata", {}).get("is_duplicate")
  1396. ]
  1397. assert duplicate_failures == []
  1398. finally:
  1399. await rag.finalize_storages()
  1400. asyncio.run(_run())
  1401. @pytest.mark.offline
  1402. def test_legacy_empty_file_paths_do_not_block_unsourced_insert(tmp_path):
  1403. async def _run():
  1404. rag = _new_rag(tmp_path)
  1405. await rag.initialize_storages()
  1406. try:
  1407. await rag.doc_status.upsert(
  1408. {
  1409. "legacy-empty": {
  1410. "status": DocStatus.PROCESSED,
  1411. "content_summary": "legacy empty",
  1412. "content_length": 0,
  1413. "file_path": "",
  1414. "track_id": "legacy",
  1415. "created_at": "2025-01-01T00:00:00+00:00",
  1416. "updated_at": "2025-01-01T00:00:00+00:00",
  1417. "chunks_list": [],
  1418. },
  1419. "legacy-no-file": {
  1420. "status": DocStatus.PROCESSED,
  1421. "content_summary": "legacy no-file",
  1422. "content_length": 0,
  1423. "file_path": "no-file-path",
  1424. "track_id": "legacy",
  1425. "created_at": "2025-01-01T00:00:00+00:00",
  1426. "updated_at": "2025-01-01T00:00:00+00:00",
  1427. "chunks_list": [],
  1428. },
  1429. }
  1430. )
  1431. content = "fresh unsourced body"
  1432. await rag.apipeline_enqueue_documents(content, track_id="track-fresh")
  1433. doc_id = compute_mdhash_id(content, prefix="doc-")
  1434. full_doc = await rag.full_docs.get_by_id(doc_id)
  1435. status = await rag.doc_status.get_by_id(doc_id)
  1436. assert full_doc is not None
  1437. assert status is not None
  1438. assert full_doc["file_path"] == "unknown_source"
  1439. assert status["file_path"] == "unknown_source"
  1440. finally:
  1441. await rag.finalize_storages()
  1442. asyncio.run(_run())
  1443. @pytest.mark.offline
  1444. def test_basename_lookup_requires_canonical_stored_file_path(tmp_path):
  1445. async def _run():
  1446. rag = _new_rag(tmp_path)
  1447. await rag.initialize_storages()
  1448. try:
  1449. # Storage does not normalize or apply legacy basename fallback.
  1450. # Business-layer writes must persist the canonical basename.
  1451. noncanonical_id = "doc-noncanonical-1"
  1452. await rag.doc_status.upsert(
  1453. {
  1454. noncanonical_id: {
  1455. "status": DocStatus.PROCESSED,
  1456. "content_summary": "noncanonical",
  1457. "content_length": 7,
  1458. "file_path": "/inputs/legacy.txt",
  1459. "track_id": "noncanonical-track",
  1460. "created_at": "2025-01-01T00:00:00+00:00",
  1461. "updated_at": "2025-01-01T00:00:00+00:00",
  1462. "chunks_list": [],
  1463. }
  1464. }
  1465. )
  1466. match = await rag.doc_status.get_doc_by_file_basename("legacy.txt")
  1467. assert match is None
  1468. # Re-enqueueing through the business path stores the canonical
  1469. # basename and is not blocked by a noncanonical storage row.
  1470. await rag.apipeline_enqueue_documents(
  1471. "fresh body",
  1472. file_paths="legacy.txt",
  1473. track_id="track-x",
  1474. )
  1475. new_id = compute_mdhash_id("legacy.txt", prefix="doc-")
  1476. full_doc = await rag.full_docs.get_by_id(new_id)
  1477. status = await rag.doc_status.get_by_id(new_id)
  1478. assert full_doc is not None
  1479. assert status is not None
  1480. assert full_doc["file_path"] == "legacy.txt"
  1481. assert status["file_path"] == "legacy.txt"
  1482. finally:
  1483. await rag.finalize_storages()
  1484. asyncio.run(_run())
  1485. @pytest.mark.offline
  1486. def test_content_hash_lookup_via_storage(tmp_path):
  1487. async def _run():
  1488. rag = _new_rag(tmp_path)
  1489. await rag.initialize_storages()
  1490. try:
  1491. await rag.apipeline_enqueue_documents(
  1492. "shared body",
  1493. file_paths="alpha.txt",
  1494. track_id="track-a",
  1495. )
  1496. alpha_id = compute_mdhash_id("alpha.txt", prefix="doc-")
  1497. alpha_full = await rag.full_docs.get_by_id(alpha_id)
  1498. assert alpha_full is not None
  1499. content_hash = alpha_full["content_hash"]
  1500. match = await rag.doc_status.get_doc_by_content_hash(content_hash)
  1501. assert match is not None
  1502. doc_id, _ = match
  1503. assert doc_id == alpha_id
  1504. finally:
  1505. await rag.finalize_storages()
  1506. asyncio.run(_run())
  1507. @pytest.mark.offline
  1508. def test_lightrag_format_uses_blocks_file_hash(tmp_path, monkeypatch):
  1509. async def _run():
  1510. input_dir = tmp_path / "input"
  1511. parsed_dir = input_dir / "__parsed__"
  1512. parsed_dir.mkdir(parents=True)
  1513. monkeypatch.setenv("INPUT_DIR", str(input_dir))
  1514. rag = _new_rag(tmp_path / "work")
  1515. rag.workspace = "test-pending-parse-duplicate"
  1516. await rag.initialize_storages()
  1517. try:
  1518. blocks_path = parsed_dir / "doc.blocks.jsonl"
  1519. blocks_path.write_text(
  1520. json.dumps({"type": "header"})
  1521. + "\n"
  1522. + json.dumps({"type": "content", "text": "hello"})
  1523. + "\n",
  1524. encoding="utf-8",
  1525. )
  1526. # Enqueue twice with different filenames pointing at the same
  1527. # blocks file: the second one must be rejected as content_hash dup.
  1528. # ``content`` arg is ignored on the LIGHTRAG path — the LightRAG
  1529. # Document file is read to derive both content_hash and the
  1530. # ``{{LRdoc}}`` summary — so any string here is fine.
  1531. await rag.apipeline_enqueue_documents(
  1532. "",
  1533. file_paths="first.lightrag",
  1534. docs_format="lightrag",
  1535. lightrag_document_paths="__parsed__/doc.blocks.jsonl",
  1536. track_id="track-a",
  1537. )
  1538. await rag.apipeline_enqueue_documents(
  1539. "",
  1540. file_paths="second.lightrag",
  1541. docs_format="lightrag",
  1542. lightrag_document_paths="__parsed__/doc.blocks.jsonl",
  1543. track_id="track-b",
  1544. )
  1545. second_id = compute_mdhash_id("second.lightrag", prefix="doc-")
  1546. assert await rag.full_docs.get_by_id(second_id) is None
  1547. failed = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
  1548. kinds = {
  1549. getattr(doc, "metadata", {}).get("duplicate_kind")
  1550. for doc in failed.values()
  1551. if getattr(doc, "metadata", {}).get("is_duplicate")
  1552. }
  1553. assert "content_hash" in kinds
  1554. finally:
  1555. await rag.finalize_storages()
  1556. asyncio.run(_run())
  1557. @pytest.mark.offline
  1558. def test_persist_parsed_full_docs_syncs_hash_to_doc_status(tmp_path):
  1559. async def _run():
  1560. rag = _new_rag(tmp_path)
  1561. await rag.initialize_storages()
  1562. try:
  1563. # Enqueue a pending_parse record: no content_hash should exist yet.
  1564. await rag.apipeline_enqueue_documents(
  1565. "",
  1566. file_paths="pending.txt",
  1567. docs_format="pending_parse",
  1568. track_id="track-pending",
  1569. )
  1570. doc_id = compute_mdhash_id("pending.txt", prefix="doc-")
  1571. full_doc = await rag.full_docs.get_by_id(doc_id)
  1572. assert full_doc is not None
  1573. assert full_doc.get("content_hash") in (None, "")
  1574. status_pre = await rag.doc_status.get_by_id(doc_id)
  1575. assert (status_pre or {}).get("content_hash") in (None, "")
  1576. # Simulate a parse_* completion that converts the record to RAW.
  1577. content = "extracted body text"
  1578. await rag._persist_parsed_full_docs(
  1579. doc_id,
  1580. {
  1581. "content": content,
  1582. "file_path": "pending.txt",
  1583. "parse_format": "raw",
  1584. "parse_engine": "native",
  1585. },
  1586. )
  1587. full_doc = await rag.full_docs.get_by_id(doc_id)
  1588. status_post = await rag.doc_status.get_by_id(doc_id)
  1589. expected_hash = compute_mdhash_id(content, prefix="")
  1590. assert full_doc["content_hash"] == expected_hash
  1591. assert (status_post or {}).get("content_hash") == expected_hash
  1592. # The hash should be queryable via the storage index.
  1593. match = await rag.doc_status.get_doc_by_content_hash(expected_hash)
  1594. assert match is not None
  1595. assert match[0] == doc_id
  1596. finally:
  1597. await rag.finalize_storages()
  1598. asyncio.run(_run())
  1599. @pytest.mark.offline
  1600. def test_persist_parsed_full_docs_preserves_pending_metadata(tmp_path):
  1601. """``_persist_parsed_full_docs`` must keep process_options seeded at
  1602. enqueue time so downstream stages (analyze_multimodal, chunking
  1603. selection, KG-skip) still see the user's original opt-ins after the
  1604. parse-result record overwrites the pending_parse row.
  1605. """
  1606. async def _run():
  1607. rag = _new_rag(tmp_path)
  1608. await rag.initialize_storages()
  1609. try:
  1610. await rag.apipeline_enqueue_documents(
  1611. "",
  1612. file_paths="report.[native-iet!].docx",
  1613. docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
  1614. parse_engine=PARSER_ENGINE_NATIVE,
  1615. process_options="iet!",
  1616. track_id="track-merge",
  1617. )
  1618. doc_id = compute_mdhash_id("report.docx", prefix="doc-")
  1619. pre = await rag.full_docs.get_by_id(doc_id)
  1620. assert pre is not None
  1621. assert pre.get("process_options") == "iet!"
  1622. assert "canonical_basename" not in pre
  1623. assert pre.get("file_path") == "report.docx"
  1624. # Simulate a parse_* completion: pass only the fresh fields the
  1625. # parsers actually emit and verify that pre-existing metadata
  1626. # survives the upsert.
  1627. await rag._persist_parsed_full_docs(
  1628. doc_id,
  1629. {
  1630. "content": "extracted body",
  1631. "file_path": "report.docx",
  1632. "parse_format": "raw",
  1633. "parse_engine": PARSER_ENGINE_NATIVE,
  1634. "update_time": 12345,
  1635. },
  1636. )
  1637. post = await rag.full_docs.get_by_id(doc_id)
  1638. assert post is not None
  1639. # Parser-supplied fields take precedence...
  1640. assert post["content"] == "extracted body"
  1641. assert post["parse_format"] == "raw"
  1642. # ...while metadata seeded at enqueue time is preserved.
  1643. assert post.get("process_options") == "iet!"
  1644. assert post.get("file_path") == "report.docx"
  1645. # And content_hash is freshly computed from the parsed body.
  1646. assert post["content_hash"] == compute_mdhash_id(
  1647. "extracted body", prefix=""
  1648. )
  1649. finally:
  1650. await rag.finalize_storages()
  1651. asyncio.run(_run())
  1652. @pytest.mark.offline
  1653. def test_state_machine_upsert_preserves_content_hash(tmp_path):
  1654. async def _run():
  1655. rag = _new_rag(tmp_path)
  1656. await rag.initialize_storages()
  1657. try:
  1658. await rag.apipeline_enqueue_documents(
  1659. "raw body",
  1660. file_paths="alpha.txt",
  1661. track_id="track-state",
  1662. )
  1663. doc_id = compute_mdhash_id("alpha.txt", prefix="doc-")
  1664. initial_hash = (await rag.doc_status.get_by_id(doc_id))["content_hash"]
  1665. assert initial_hash
  1666. # Simulate the production state-machine upsert pattern: read
  1667. # status_doc, then write a new payload that includes content_hash.
  1668. status_doc = (await rag.doc_status.get_docs_by_status(DocStatus.PENDING))[
  1669. doc_id
  1670. ]
  1671. for next_status in (
  1672. DocStatus.PARSING,
  1673. DocStatus.ANALYZING,
  1674. DocStatus.PROCESSED,
  1675. ):
  1676. await rag.doc_status.upsert(
  1677. {
  1678. doc_id: {
  1679. "status": next_status,
  1680. "content_summary": status_doc.content_summary,
  1681. "content_length": status_doc.content_length,
  1682. "created_at": status_doc.created_at,
  1683. "updated_at": "now",
  1684. "file_path": status_doc.file_path,
  1685. "track_id": status_doc.track_id,
  1686. "content_hash": status_doc.content_hash,
  1687. }
  1688. }
  1689. )
  1690. current = await rag.doc_status.get_by_id(doc_id)
  1691. assert current.get("content_hash") == initial_hash
  1692. # And the index lookup must still return this doc.
  1693. match = await rag.doc_status.get_doc_by_content_hash(initial_hash)
  1694. assert match is not None and match[0] == doc_id
  1695. finally:
  1696. await rag.finalize_storages()
  1697. asyncio.run(_run())
  1698. @pytest.mark.offline
  1699. def test_pending_parse_duplicate_hash_fails_and_archives_source(tmp_path, monkeypatch):
  1700. """Two PENDING_PARSE docx files with identical extracted bodies must be
  1701. detected as content_hash duplicates and the loser archived.
  1702. ``content_hash`` for LIGHTRAG-format docs is the MD5 of the normalized
  1703. ``merged_text`` (sidecar item ids and ``<base>.blocks.assets/`` prefixes
  1704. stripped via :func:`normalize_merged_text_for_hash`), so identical
  1705. bodies under different filenames produce the same hash and
  1706. ``_mark_duplicate_after_parse`` fires.
  1707. """
  1708. async def _run():
  1709. input_dir = tmp_path / "input"
  1710. input_dir.mkdir()
  1711. monkeypatch.setenv("INPUT_DIR", str(input_dir))
  1712. rag = _new_rag(tmp_path / "work")
  1713. await rag.initialize_storages()
  1714. try:
  1715. from datetime import datetime, timezone
  1716. import lightrag.lightrag as lightrag_module
  1717. import lightrag.pipeline as pipeline_module
  1718. class _FrozenDateTime(datetime):
  1719. @classmethod
  1720. def now(cls, tz=None): # noqa: D401
  1721. return datetime(2026, 1, 1, tzinfo=tz or timezone.utc)
  1722. monkeypatch.setattr(lightrag_module, "datetime", _FrozenDateTime)
  1723. monkeypatch.setattr(pipeline_module, "datetime", _FrozenDateTime)
  1724. # Both docx files emit the same blocks list, so combined with the
  1725. # frozen datetime the resulting .blocks.jsonl bytes are equal.
  1726. stable_block = {
  1727. "uuid": "p1",
  1728. "uuid_end": "p1",
  1729. "heading": "",
  1730. "content": "same extracted body",
  1731. "type": "text",
  1732. "parent_headings": [],
  1733. "level": 0,
  1734. "table_chunk_role": "none",
  1735. }
  1736. monkeypatch.setattr(
  1737. "lightrag.parser.docx.parse_document.extract_docx_blocks",
  1738. lambda *args, **kwargs: [dict(stable_block)],
  1739. )
  1740. # First original docx: enqueue, parse, mark PROCESSED.
  1741. original_path = input_dir / "original.docx"
  1742. original_path.write_bytes(b"original docx bytes")
  1743. await rag.apipeline_enqueue_documents(
  1744. "",
  1745. file_paths=str(original_path),
  1746. docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
  1747. parse_engine=PARSER_ENGINE_NATIVE,
  1748. track_id="track-original",
  1749. )
  1750. await rag.apipeline_process_enqueue_documents()
  1751. original_id = compute_mdhash_id("original.docx", prefix="doc-")
  1752. original_status = await rag.doc_status.get_by_id(original_id)
  1753. assert original_status is not None
  1754. original_status["status"] = DocStatus.PROCESSED
  1755. await rag.doc_status.upsert({original_id: original_status})
  1756. # Second docx: distinct filename so filename dedup misses, but
  1757. # content_hash should match the first because content_list +
  1758. # frozen datetime → identical .blocks.jsonl bytes.
  1759. source_path = input_dir / "duplicate.docx"
  1760. source_path.write_bytes(b"docx bytes")
  1761. async def _fail_extract(*args, **kwargs):
  1762. raise AssertionError("duplicate document should not reach extraction")
  1763. monkeypatch.setattr(rag, "_process_extract_entities", _fail_extract)
  1764. await rag.apipeline_enqueue_documents(
  1765. "",
  1766. file_paths=str(source_path),
  1767. docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
  1768. parse_engine=PARSER_ENGINE_NATIVE,
  1769. track_id="track-dup",
  1770. )
  1771. await rag.apipeline_process_enqueue_documents()
  1772. duplicate_id = compute_mdhash_id("duplicate.docx", prefix="doc-")
  1773. duplicate_status = await rag.doc_status.get_by_id(duplicate_id)
  1774. assert duplicate_status["status"] == DocStatus.FAILED
  1775. assert duplicate_status["metadata"]["is_duplicate"] is True
  1776. assert duplicate_status["metadata"]["duplicate_kind"] == "content_hash"
  1777. assert duplicate_status["metadata"]["original_doc_id"] == original_id
  1778. assert not source_path.exists()
  1779. assert (input_dir / PARSED_DIR_NAME / source_path.name).exists()
  1780. finally:
  1781. await rag.finalize_storages()
  1782. asyncio.run(_run())
  1783. @pytest.mark.offline
  1784. def test_parser_routing_accepts_semicolon_rules(monkeypatch):
  1785. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  1786. monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
  1787. rules = "*:mineru;html:docling"
  1788. validate_parser_routing_config(rules)
  1789. assert resolve_file_parser_engine("paper.pdf", parser_rules=rules) == "mineru"
  1790. assert resolve_file_parser_engine("index.html", parser_rules=rules) == "docling"
  1791. assert resolve_file_parser_engine("notes.txt", parser_rules=rules) == "legacy"
  1792. @pytest.mark.offline
  1793. def test_parser_routing_validation_requires_external_endpoints(monkeypatch):
  1794. monkeypatch.delenv("MINERU_LOCAL_ENDPOINT", raising=False)
  1795. monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
  1796. with pytest.raises(ParserRoutingConfigError, match="MINERU_LOCAL_ENDPOINT"):
  1797. validate_parser_routing_config("*:mineru;html:docling")
  1798. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  1799. monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
  1800. with pytest.raises(ParserRoutingConfigError, match="DOCLING_ENDPOINT"):
  1801. validate_parser_routing_config("*:mineru;html:docling")
  1802. @pytest.mark.offline
  1803. def test_parser_routing_validation_honors_mineru_api_mode(monkeypatch):
  1804. monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
  1805. monkeypatch.setenv("MINERU_API_MODE", "official")
  1806. monkeypatch.delenv("MINERU_API_TOKEN", raising=False)
  1807. with pytest.raises(ParserRoutingConfigError, match="MINERU_API_TOKEN"):
  1808. validate_parser_routing_config("pdf:mineru")
  1809. monkeypatch.setenv("MINERU_API_TOKEN", "token")
  1810. validate_parser_routing_config("pdf:mineru")
  1811. monkeypatch.setenv("MINERU_API_MODE", "local")
  1812. monkeypatch.delenv("MINERU_LOCAL_ENDPOINT", raising=False)
  1813. with pytest.raises(ParserRoutingConfigError, match="MINERU_LOCAL_ENDPOINT"):
  1814. validate_parser_routing_config("pdf:mineru")
  1815. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-local")
  1816. validate_parser_routing_config("pdf:mineru")
  1817. @pytest.mark.offline
  1818. def test_parser_routing_validation_rejects_invalid_rules(monkeypatch):
  1819. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  1820. with pytest.raises(ParserRoutingConfigError, match=r"\*\.pdf"):
  1821. validate_parser_routing_config("*.pdf:mineru")
  1822. with pytest.raises(ParserRoutingConfigError, match="unsupported parser engine"):
  1823. validate_parser_routing_config("pdf:unknown")
  1824. with pytest.raises(ParserRoutingConfigError, match="does not match any suffix"):
  1825. validate_parser_routing_config("pdf:native")
  1826. @pytest.mark.offline
  1827. def test_three_phase_status_flow(tmp_path, monkeypatch):
  1828. async def _run():
  1829. rag = _new_rag(tmp_path)
  1830. await rag.initialize_storages()
  1831. async def _fake_extract(*args, **kwargs):
  1832. return []
  1833. async def _fake_merge(*args, **kwargs):
  1834. return None
  1835. async def _fake_parse_native(doc_id, file_path, content_data):
  1836. return {
  1837. "doc_id": doc_id,
  1838. "file_path": file_path,
  1839. "parse_format": "raw",
  1840. "content": "hello world",
  1841. "blocks_path": "",
  1842. }
  1843. async def _fake_analyze(doc_id, file_path, parsed_data, **kwargs):
  1844. parsed_data["multimodal_processed"] = True
  1845. return parsed_data
  1846. monkeypatch.setattr(rag, "_process_extract_entities", _fake_extract)
  1847. monkeypatch.setattr("lightrag.pipeline.merge_nodes_and_edges", _fake_merge)
  1848. monkeypatch.setattr(rag, "parse_native", _fake_parse_native)
  1849. monkeypatch.setattr(rag, "analyze_multimodal", _fake_analyze)
  1850. status_seq: list[str] = []
  1851. original_upsert = rag.doc_status.upsert
  1852. async def _record_upsert(data):
  1853. for _, val in data.items():
  1854. if isinstance(val, dict) and "status" in val:
  1855. status_seq.append(str(val["status"]))
  1856. return await original_upsert(data)
  1857. monkeypatch.setattr(rag.doc_status, "upsert", _record_upsert)
  1858. await rag.apipeline_enqueue_documents("sample text", file_paths="s.txt")
  1859. await rag.apipeline_process_enqueue_documents()
  1860. joined = " ".join(status_seq)
  1861. assert "DocStatus.PARSING" in joined
  1862. assert "DocStatus.ANALYZING" in joined
  1863. assert "DocStatus.PROCESSING" in joined
  1864. assert "DocStatus.PROCESSED" in joined
  1865. await rag.finalize_storages()
  1866. asyncio.run(_run())
  1867. @pytest.mark.offline
  1868. def test_analyze_multimodal_invalid_json_hard_fails(tmp_path):
  1869. """An invalid VLM response is a hard failure under the new contract:
  1870. the sidecar item gets status='failure' and MultimodalAnalysisError
  1871. bubbles up so the document fails (no silent conservative fallback)."""
  1872. async def _run():
  1873. calls = {"n": 0}
  1874. async def _broken_vlm(prompt, **kwargs):
  1875. calls["n"] += 1
  1876. return "not-json"
  1877. rag = _new_rag(tmp_path, vlm_llm_model_func=_broken_vlm)
  1878. await rag.initialize_storages()
  1879. # 64x64 PNG so the image-pixel skip guard does NOT short-circuit
  1880. # before the VLM call.
  1881. img_path = tmp_path / "img1.png"
  1882. import struct
  1883. import zlib
  1884. def _png_bytes(w: int, h: int) -> bytes:
  1885. sig = b"\x89PNG\r\n\x1a\n"
  1886. ihdr = struct.pack(">II", w, h) + b"\x08\x06\x00\x00\x00"
  1887. crc = zlib.crc32(b"IHDR" + ihdr).to_bytes(4, "big")
  1888. ihdr_chunk = struct.pack(">I", len(ihdr)) + b"IHDR" + ihdr + crc
  1889. idat_payload = b"\x00" * (w * h * 4 + h)
  1890. compressed = zlib.compress(idat_payload)
  1891. crc_idat = zlib.crc32(b"IDAT" + compressed).to_bytes(4, "big")
  1892. idat_chunk = (
  1893. struct.pack(">I", len(compressed)) + b"IDAT" + compressed + crc_idat
  1894. )
  1895. iend_chunk = b"\x00\x00\x00\x00IEND\xaeB`\x82"
  1896. return sig + ihdr_chunk + idat_chunk + iend_chunk
  1897. img_path.write_bytes(_png_bytes(64, 64))
  1898. blocks = tmp_path / "demo.blocks.jsonl"
  1899. blocks.write_text(
  1900. "\n".join(
  1901. [
  1902. json.dumps({"type": "meta", "format_version": "1.0"}),
  1903. json.dumps({"type": "content", "content": "body"}),
  1904. ]
  1905. )
  1906. + "\n",
  1907. encoding="utf-8",
  1908. )
  1909. drawings = tmp_path / "demo.drawings.json"
  1910. drawings.write_text(
  1911. json.dumps(
  1912. {
  1913. "version": "1.0",
  1914. "drawings": {
  1915. "id1": {
  1916. "id": "id1",
  1917. "caption": "图1 测试图",
  1918. "footnotes": [],
  1919. "path": str(img_path),
  1920. }
  1921. },
  1922. },
  1923. ensure_ascii=False,
  1924. ),
  1925. encoding="utf-8",
  1926. )
  1927. parsed = {
  1928. "doc_id": "doc-1",
  1929. "file_path": "demo.pdf",
  1930. "blocks_path": str(blocks),
  1931. "content": "body",
  1932. }
  1933. from lightrag.exceptions import MultimodalAnalysisError
  1934. with pytest.raises(MultimodalAnalysisError):
  1935. await rag.analyze_multimodal(
  1936. "doc-1", "demo.pdf", parsed, process_options="i"
  1937. )
  1938. drawings_payload = json.loads(drawings.read_text(encoding="utf-8"))
  1939. result = drawings_payload["drawings"]["id1"]["llm_analyze_result"]
  1940. # No retry: VLM mock called exactly once.
  1941. assert calls["n"] == 1
  1942. # Sidecar carries a failure marker so a re-run sees the prior failure
  1943. # and does not silently consume it.
  1944. assert result["status"] == "failure"
  1945. assert "missing or invalid field" in result["message"]
  1946. asyncio.run(_run())
  1947. @pytest.mark.offline
  1948. def test_analyze_multimodal_uses_effective_vlm_max_async_when_role_none(tmp_path):
  1949. async def _run():
  1950. rag = _new_rag(
  1951. tmp_path,
  1952. llm_model_max_async=3,
  1953. vlm_llm_model_max_async=None,
  1954. )
  1955. blocks = tmp_path / "demo.blocks.jsonl"
  1956. blocks.write_text(
  1957. json.dumps({"type": "meta", "format_version": "1.0"}) + "\n",
  1958. encoding="utf-8",
  1959. )
  1960. parsed = {
  1961. "doc_id": "doc-1",
  1962. "file_path": "demo.pdf",
  1963. "blocks_path": str(blocks),
  1964. "content": "body",
  1965. }
  1966. result = await rag.analyze_multimodal(
  1967. "doc-1", "demo.pdf", parsed, process_options="ite"
  1968. )
  1969. assert result["multimodal_processed"] is True
  1970. asyncio.run(_run())
  1971. @pytest.mark.offline
  1972. def test_safe_vdb_operation_times_out_with_context():
  1973. async def _run():
  1974. async def _hang():
  1975. await asyncio.sleep(0.2)
  1976. with pytest.raises(TimeoutError) as exc_info:
  1977. await safe_vdb_operation_with_exception(
  1978. operation=_hang,
  1979. operation_name="relationship_upsert",
  1980. entity_name="A->B",
  1981. max_retries=1,
  1982. retry_delay=0,
  1983. timeout_seconds=0.05,
  1984. )
  1985. assert "relationship_upsert" in str(exc_info.value)
  1986. assert "A->B" in str(exc_info.value)
  1987. assert "timeout" in str(exc_info.value).lower()
  1988. asyncio.run(_run())
  1989. @pytest.mark.offline
  1990. def test_relationship_vdb_timeout_has_120s_floor():
  1991. assert _get_relationship_vdb_timeout_seconds({}) == 120.0
  1992. assert (
  1993. _get_relationship_vdb_timeout_seconds({"default_embedding_timeout": 10})
  1994. == 120.0
  1995. )
  1996. assert (
  1997. _get_relationship_vdb_timeout_seconds({"default_embedding_timeout": 50})
  1998. == 150.0
  1999. )
  2000. @pytest.mark.offline
  2001. def test_analyze_multimodal_unknown_image_type_folds_to_other(tmp_path):
  2002. """Model output with an out-of-enum ``type`` is folded to ``Other``
  2003. instead of failing the document (per design §3.4)."""
  2004. async def _run():
  2005. async def _vlm(_prompt, **_kwargs):
  2006. return json.dumps(
  2007. {
  2008. "name": "Figure A",
  2009. "type": "diagram", # not in IMAGE_TYPE_ENUM
  2010. "description": "details",
  2011. },
  2012. ensure_ascii=False,
  2013. )
  2014. rag = _new_rag(tmp_path, vlm_llm_model_func=_vlm)
  2015. await rag.initialize_storages()
  2016. import struct
  2017. import zlib
  2018. def _png(w, h):
  2019. sig = b"\x89PNG\r\n\x1a\n"
  2020. ihdr = struct.pack(">II", w, h) + b"\x08\x06\x00\x00\x00"
  2021. crc = zlib.crc32(b"IHDR" + ihdr).to_bytes(4, "big")
  2022. return sig + struct.pack(">I", len(ihdr)) + b"IHDR" + ihdr + crc
  2023. img_path = tmp_path / "img1.png"
  2024. img_path.write_bytes(_png(64, 64))
  2025. blocks = tmp_path / "demo.blocks.jsonl"
  2026. blocks.write_text(
  2027. "\n".join(
  2028. [
  2029. json.dumps({"type": "meta", "format_version": "1.0"}),
  2030. json.dumps({"type": "content", "content": "body"}),
  2031. ]
  2032. )
  2033. + "\n",
  2034. encoding="utf-8",
  2035. )
  2036. drawings = tmp_path / "demo.drawings.json"
  2037. drawings.write_text(
  2038. json.dumps(
  2039. {
  2040. "version": "1.0",
  2041. "drawings": {
  2042. "id1": {
  2043. "id": "id1",
  2044. "caption": "图1 测试图",
  2045. "footnotes": [],
  2046. "path": str(img_path),
  2047. }
  2048. },
  2049. },
  2050. ensure_ascii=False,
  2051. ),
  2052. encoding="utf-8",
  2053. )
  2054. parsed = {
  2055. "doc_id": "doc-1",
  2056. "file_path": "demo.pdf",
  2057. "blocks_path": str(blocks),
  2058. "content": "body",
  2059. }
  2060. await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="i")
  2061. payload = json.loads(drawings.read_text(encoding="utf-8"))
  2062. result = payload["drawings"]["id1"]["llm_analyze_result"]
  2063. assert result["status"] == "success"
  2064. assert result["type"] == "Other"
  2065. assert result["description"] == "details"
  2066. assert "analyze_time" in result
  2067. asyncio.run(_run())
  2068. @pytest.mark.offline
  2069. def test_analyze_multimodal_skips_tiny_image_without_vlm_call(tmp_path):
  2070. """Images smaller than VLM_MIN_IMAGE_PIXEL (default 32px) are flagged
  2071. status=skipped without invoking the VLM."""
  2072. async def _run():
  2073. calls = {"n": 0}
  2074. async def _vlm(_prompt, **_kwargs):
  2075. calls["n"] += 1
  2076. return "{}"
  2077. rag = _new_rag(tmp_path, vlm_llm_model_func=_vlm)
  2078. await rag.initialize_storages()
  2079. # 1x1 PNG.
  2080. img_path = tmp_path / "tiny.png"
  2081. img_path.write_bytes(
  2082. b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01"
  2083. b"\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\x9cc`\x00\x00"
  2084. b"\x00\x02\x00\x01\xe2!\xbc3\x00\x00\x00\x00IEND\xaeB`\x82"
  2085. )
  2086. blocks = tmp_path / "demo.blocks.jsonl"
  2087. blocks.write_text(
  2088. "\n".join(
  2089. [
  2090. json.dumps({"type": "meta", "format_version": "1.0"}),
  2091. json.dumps({"type": "content", "content": "body"}),
  2092. ]
  2093. )
  2094. + "\n",
  2095. encoding="utf-8",
  2096. )
  2097. drawings = tmp_path / "demo.drawings.json"
  2098. drawings.write_text(
  2099. json.dumps(
  2100. {
  2101. "version": "1.0",
  2102. "drawings": {
  2103. "id1": {
  2104. "id": "id1",
  2105. "caption": "tiny",
  2106. "path": str(img_path),
  2107. }
  2108. },
  2109. }
  2110. ),
  2111. encoding="utf-8",
  2112. )
  2113. parsed = {
  2114. "doc_id": "doc-1",
  2115. "file_path": "demo.pdf",
  2116. "blocks_path": str(blocks),
  2117. "content": "body",
  2118. }
  2119. await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="i")
  2120. payload = json.loads(drawings.read_text(encoding="utf-8"))
  2121. result = payload["drawings"]["id1"]["llm_analyze_result"]
  2122. assert result["status"] == "skipped"
  2123. assert "smaller than" in result["message"]
  2124. assert calls["n"] == 0
  2125. asyncio.run(_run())
  2126. @pytest.mark.offline
  2127. def test_write_lightrag_document_preserves_headings_and_table_dimensions(
  2128. tmp_path, monkeypatch
  2129. ):
  2130. async def _run():
  2131. monkeypatch.setenv("INPUT_DIR", str(tmp_path))
  2132. rag = _new_rag(tmp_path)
  2133. await rag.initialize_storages()
  2134. source_path = tmp_path / "demo.docx"
  2135. source_path.write_bytes(b"docx bytes")
  2136. content_list = [
  2137. {"type": "section_header", "text": "第一章 绪论", "level": 1},
  2138. {"type": "section_header", "text": "1.1 研究背景", "level": 2},
  2139. {"type": "text", "text": "这是正文段落。"},
  2140. {
  2141. "type": "table",
  2142. "table_caption": ["表1 指标说明"],
  2143. "table_body": {
  2144. "num_rows": 2,
  2145. "num_cols": 3,
  2146. "grid": [
  2147. [{"text": "符号"}, {"text": "含义"}, {"text": "单位"}],
  2148. [{"text": "A"}, {"text": "面积"}, {"text": "m2"}],
  2149. ],
  2150. },
  2151. },
  2152. {
  2153. "type": "image",
  2154. "img_path": "/tmp/a.png",
  2155. "image_caption": ["图1 架构图"],
  2156. },
  2157. ]
  2158. parsed = await rag._write_lightrag_document_from_content_list(
  2159. doc_id="doc-1",
  2160. file_path="demo.docx",
  2161. content_list=content_list,
  2162. engine="docling",
  2163. )
  2164. blocks_path = Path(parsed["blocks_path"])
  2165. assert blocks_path == (
  2166. tmp_path / PARSED_DIR_NAME / "demo.docx.parsed" / "demo.blocks.jsonl"
  2167. )
  2168. assert not source_path.exists()
  2169. assert (tmp_path / PARSED_DIR_NAME / source_path.name).exists()
  2170. blocks = [
  2171. json.loads(line)
  2172. for line in blocks_path.read_text(encoding="utf-8").splitlines()
  2173. ]
  2174. content_blocks = blocks[1:]
  2175. body_block = next(x for x in content_blocks if x["content"] == "这是正文段落。")
  2176. table_block = next(
  2177. x for x in content_blocks if 'refid="tb-1-0001"' in x["content"]
  2178. )
  2179. image_block = next(
  2180. x for x in content_blocks if 'id="im-1-0001"' in x["content"]
  2181. )
  2182. assert body_block["heading"] == "1.1 研究背景"
  2183. assert body_block["parent_headings"] == ["第一章 绪论"]
  2184. assert table_block["heading"] == "1.1 研究背景"
  2185. assert image_block["heading"] == "1.1 研究背景"
  2186. base = str(blocks_path)[: -len(".blocks.jsonl")]
  2187. tables = json.loads(Path(base + ".tables.json").read_text(encoding="utf-8"))
  2188. table_entry = tables["tables"]["tb-1-0001"]
  2189. assert table_entry["heading"] == "1.1 研究背景"
  2190. assert table_entry["dimension"] == [2, 3]
  2191. assert table_entry["format"] == "json"
  2192. assert json.loads(table_entry["content"]) == [
  2193. ["符号", "含义", "单位"],
  2194. ["A", "面积", "m2"],
  2195. ]
  2196. drawings = json.loads(Path(base + ".drawings.json").read_text(encoding="utf-8"))
  2197. assert drawings["drawings"]["im-1-0001"]["heading"] == "1.1 研究背景"
  2198. full_doc = await rag.full_docs.get_by_id("doc-1")
  2199. expected_sidecar_dir = (
  2200. tmp_path / PARSED_DIR_NAME / "demo.docx.parsed"
  2201. ).resolve()
  2202. assert full_doc["sidecar_location"].startswith("file://")
  2203. assert full_doc["sidecar_location"].endswith("/")
  2204. assert str(expected_sidecar_dir) in full_doc["sidecar_location"]
  2205. await rag.finalize_storages()
  2206. asyncio.run(_run())
  2207. @pytest.mark.offline
  2208. def test_write_lightrag_document_strips_parser_hint_from_artifact_names(
  2209. tmp_path, monkeypatch
  2210. ):
  2211. async def _run():
  2212. monkeypatch.setenv("INPUT_DIR", str(tmp_path))
  2213. rag = _new_rag(tmp_path)
  2214. await rag.initialize_storages()
  2215. try:
  2216. source_path = tmp_path / "demo.[native].docx"
  2217. source_path.write_bytes(b"docx bytes")
  2218. parsed = await rag._write_lightrag_document_from_content_list(
  2219. doc_id="doc-hinted",
  2220. file_path="demo.[native].docx",
  2221. content_list=[{"type": "text", "text": "body"}],
  2222. engine="native",
  2223. )
  2224. blocks_path = Path(parsed["blocks_path"])
  2225. assert blocks_path == (
  2226. tmp_path / PARSED_DIR_NAME / "demo.docx.parsed" / "demo.blocks.jsonl"
  2227. )
  2228. assert not source_path.exists()
  2229. assert (tmp_path / PARSED_DIR_NAME / source_path.name).exists()
  2230. full_doc = await rag.full_docs.get_by_id("doc-hinted")
  2231. expected_sidecar_dir = (
  2232. tmp_path / PARSED_DIR_NAME / "demo.docx.parsed"
  2233. ).resolve()
  2234. assert full_doc["sidecar_location"].startswith("file://")
  2235. assert full_doc["sidecar_location"].endswith("/")
  2236. assert str(expected_sidecar_dir) in full_doc["sidecar_location"]
  2237. finally:
  2238. await rag.finalize_storages()
  2239. asyncio.run(_run())
  2240. @pytest.mark.offline
  2241. def test_analyze_multimodal_table_without_image_uses_textual_analysis(tmp_path):
  2242. async def _run():
  2243. # Tables now route to the EXTRACT role, not VLM (per design §3.1).
  2244. async def _extract(_prompt, **_kwargs):
  2245. return json.dumps(
  2246. {
  2247. "name": "model_benchmark_metrics",
  2248. "description": "表格包含三列,分别为符号、代表意义和单位,列出了 A、F、e 等符号。",
  2249. },
  2250. ensure_ascii=False,
  2251. )
  2252. async def _vlm_unused(_prompt, **_kwargs):
  2253. raise AssertionError("VLM must not be called for tables")
  2254. rag = _new_rag(
  2255. tmp_path,
  2256. vlm_llm_model_func=_vlm_unused,
  2257. extract_llm_model_func=_extract,
  2258. )
  2259. await rag.initialize_storages()
  2260. blocks = tmp_path / "demo.blocks.jsonl"
  2261. blocks.write_text(
  2262. "\n".join(
  2263. [
  2264. json.dumps({"type": "meta", "format_version": "1.0"}),
  2265. json.dumps({"type": "content", "content": "body"}),
  2266. ]
  2267. )
  2268. + "\n",
  2269. encoding="utf-8",
  2270. )
  2271. tables = tmp_path / "demo.tables.json"
  2272. tables.write_text(
  2273. json.dumps(
  2274. {
  2275. "version": "1.0",
  2276. "tables": {
  2277. "id1": {
  2278. "id": "id1",
  2279. "caption": "表1 指标说明",
  2280. "footnotes": ["单位:国际标准单位"],
  2281. "content": "<table><tr><th>符号</th><th>代表意义</th><th>单位</th></tr><tr><td>A</td><td>面积</td><td>m2</td></tr></table>",
  2282. }
  2283. },
  2284. },
  2285. ensure_ascii=False,
  2286. ),
  2287. encoding="utf-8",
  2288. )
  2289. parsed = {
  2290. "doc_id": "doc-1",
  2291. "file_path": "demo.pdf",
  2292. "blocks_path": str(blocks),
  2293. "content": "body",
  2294. }
  2295. await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="t")
  2296. payload = json.loads(tables.read_text(encoding="utf-8"))
  2297. result = payload["tables"]["id1"]["llm_analyze_result"]
  2298. assert result["status"] == "success"
  2299. assert result["name"] == "model_benchmark_metrics"
  2300. assert "符号、代表意义和单位" in result["description"]
  2301. # Cache id was written back so document delete can clean it up.
  2302. assert any(
  2303. cid.startswith("default:analysis:")
  2304. for cid in payload["tables"]["id1"].get("llm_cache_list", [])
  2305. )
  2306. asyncio.run(_run())
  2307. @pytest.mark.offline
  2308. def test_parser_source_resolver_finds_hint_variant_by_canonical_name(
  2309. tmp_path, monkeypatch
  2310. ):
  2311. input_dir = tmp_path / "input"
  2312. input_dir.mkdir()
  2313. monkeypatch.setenv("INPUT_DIR", str(input_dir))
  2314. hinted = input_dir / "demo.[mineru].pdf"
  2315. hinted.write_bytes(b"fake-pdf")
  2316. rag = _new_rag(tmp_path / "work")
  2317. resolved = rag._resolve_source_file_for_parser(
  2318. "demo.pdf",
  2319. parser_engine=PARSER_ENGINE_MINERU,
  2320. )
  2321. assert Path(resolved) == hinted
  2322. @pytest.mark.offline
  2323. def test_parser_source_resolver_prefers_exact_canonical_file(tmp_path, monkeypatch):
  2324. input_dir = tmp_path / "input"
  2325. input_dir.mkdir()
  2326. monkeypatch.setenv("INPUT_DIR", str(input_dir))
  2327. exact = input_dir / "demo.pdf"
  2328. hinted = input_dir / "demo.[mineru].pdf"
  2329. exact.write_bytes(b"canonical")
  2330. hinted.write_bytes(b"hinted")
  2331. rag = _new_rag(tmp_path / "work")
  2332. resolved = rag._resolve_source_file_for_parser(
  2333. "demo.pdf",
  2334. parser_engine=PARSER_ENGINE_MINERU,
  2335. )
  2336. assert Path(resolved) == exact
  2337. @pytest.mark.offline
  2338. def test_parse_mineru_to_lightrag_document(tmp_path, monkeypatch):
  2339. """End-to-end: parse_mineru routes through MinerURawClient + sidecar
  2340. writer and produces spec-compliant *.parsed/ + *.mineru_raw/ artifacts.
  2341. With the unified pipeline (introduced alongside the MinerU raw bundle
  2342. cache), the MinerU download choreography happens inside
  2343. :meth:`MinerURawClient.download_into`. We stub that method directly.
  2344. """
  2345. from lightrag.parser.external.mineru import compute_size_and_hash
  2346. from lightrag.parser.external.mineru.cache import current_mineru_options_signature
  2347. from lightrag.parser.external.mineru.client import MinerURawClient
  2348. from lightrag.parser.external.mineru.manifest import (
  2349. Manifest,
  2350. ManifestFile,
  2351. write_manifest,
  2352. )
  2353. async def _run():
  2354. input_dir = tmp_path / "input"
  2355. input_dir.mkdir()
  2356. monkeypatch.setenv("INPUT_DIR", str(input_dir))
  2357. rag = _new_rag(tmp_path / "work")
  2358. await rag.initialize_storages()
  2359. src_file = input_dir / "demo.pdf"
  2360. src_file.write_bytes(b"fake-pdf")
  2361. async def _fake_download(self, raw_dir, source_file_path, **_kwargs):
  2362. assert source_file_path == src_file
  2363. raw_dir.mkdir(parents=True, exist_ok=True)
  2364. content_list = [
  2365. {"type": "text", "text": "第一段正文"},
  2366. {
  2367. "type": "image",
  2368. "img_path": "assets/img1.png",
  2369. "image_caption": ["图1 架构图"],
  2370. "image_footnote": ["示意图"],
  2371. },
  2372. {
  2373. "type": "table",
  2374. "table_body": "<table><tr><td>A</td></tr></table>",
  2375. "table_caption": ["表1 指标"],
  2376. "table_footnote": ["单位:%"],
  2377. },
  2378. {"type": "equation", "text": "$$E=mc^2$$"},
  2379. ]
  2380. (raw_dir / "content_list.json").write_text(
  2381. json.dumps(content_list, ensure_ascii=False),
  2382. encoding="utf-8",
  2383. )
  2384. (raw_dir / "assets").mkdir()
  2385. (raw_dir / "assets" / "img1.png").write_bytes(b"\x89PNGfake")
  2386. src_size, src_hash = compute_size_and_hash(source_file_path)
  2387. crit_size, crit_hash = compute_size_and_hash(raw_dir / "content_list.json")
  2388. manifest = Manifest(
  2389. source_content_hash=src_hash,
  2390. source_size_bytes=src_size,
  2391. source_filename_at_parse=source_file_path.name,
  2392. critical_file=ManifestFile(
  2393. path="content_list.json", size=crit_size, sha256=crit_hash
  2394. ),
  2395. files=[
  2396. ManifestFile(
  2397. path="assets/img1.png",
  2398. size=(raw_dir / "assets" / "img1.png").stat().st_size,
  2399. )
  2400. ],
  2401. total_size_bytes=crit_size,
  2402. task_id="fake-task",
  2403. api_mode="local",
  2404. options_signature=current_mineru_options_signature(),
  2405. )
  2406. write_manifest(raw_dir, manifest)
  2407. return manifest
  2408. monkeypatch.setattr(MinerURawClient, "download_into", _fake_download)
  2409. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  2410. parsed = await rag.parse_mineru(
  2411. doc_id="doc-1",
  2412. file_path=str(src_file),
  2413. content_data={"content": ""},
  2414. )
  2415. assert parsed["parse_format"] == "lightrag"
  2416. assert parsed["blocks_path"]
  2417. blocks_path = Path(parsed["blocks_path"])
  2418. assert blocks_path.exists()
  2419. lines = blocks_path.read_text(encoding="utf-8").splitlines()
  2420. meta = json.loads(lines[0])
  2421. assert meta["type"] == "meta"
  2422. assert meta["format"] == "lightrag"
  2423. assert meta["drawing_file"] is True
  2424. assert meta["table_file"] is True
  2425. assert meta["equation_file"] is True
  2426. base = str(blocks_path)[: -len(".blocks.jsonl")]
  2427. drawings = json.loads(Path(base + ".drawings.json").read_text(encoding="utf-8"))
  2428. tables = json.loads(Path(base + ".tables.json").read_text(encoding="utf-8"))
  2429. equations = json.loads(
  2430. Path(base + ".equations.json").read_text(encoding="utf-8")
  2431. )
  2432. assert drawings["drawings"]
  2433. assert tables["tables"]
  2434. assert equations["equations"]
  2435. full_doc = await rag.full_docs.get_by_id("doc-1")
  2436. assert full_doc["parse_format"] == "lightrag"
  2437. # Per docs/FileProcessingConfiguration-zh.md spec, ``content`` is now
  2438. # ``{{LRdoc}}`` followed by a leading-text summary of the document.
  2439. assert full_doc["content"].startswith("{{LRdoc}}")
  2440. assert full_doc["sidecar_location"].startswith("file://")
  2441. assert full_doc["sidecar_location"].endswith("/")
  2442. assert str(blocks_path.parent.resolve()) in full_doc["sidecar_location"]
  2443. await rag.finalize_storages()
  2444. asyncio.run(_run())
  2445. @pytest.mark.offline
  2446. def test_parse_mineru_uses_hint_source_and_canonical_upload_name(tmp_path, monkeypatch):
  2447. from lightrag.parser.external.mineru import compute_size_and_hash
  2448. from lightrag.parser.external.mineru.cache import current_mineru_options_signature
  2449. from lightrag.parser.external.mineru.client import MinerURawClient
  2450. from lightrag.parser.external.mineru.manifest import (
  2451. Manifest,
  2452. ManifestFile,
  2453. write_manifest,
  2454. )
  2455. async def _run():
  2456. input_dir = tmp_path / "input"
  2457. input_dir.mkdir()
  2458. monkeypatch.setenv("INPUT_DIR", str(input_dir))
  2459. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  2460. rag = _new_rag(tmp_path / "work")
  2461. await rag.initialize_storages()
  2462. hinted_name = "LightRAG - Simple and Fast RAG.[mineru].pdf"
  2463. canonical_name = "LightRAG - Simple and Fast RAG.pdf"
  2464. src_file = input_dir / hinted_name
  2465. src_file.write_bytes(b"fake-pdf")
  2466. async def _fake_download(self, raw_dir, source_file_path, **kwargs):
  2467. assert source_file_path == src_file
  2468. assert kwargs.get("upload_name") == canonical_name
  2469. raw_dir.mkdir(parents=True, exist_ok=True)
  2470. content_list = [{"type": "text", "text": "第一段正文"}]
  2471. content_path = raw_dir / "content_list.json"
  2472. content_path.write_text(
  2473. json.dumps(content_list, ensure_ascii=False),
  2474. encoding="utf-8",
  2475. )
  2476. src_size, src_hash = compute_size_and_hash(source_file_path)
  2477. crit_size, crit_hash = compute_size_and_hash(content_path)
  2478. manifest = Manifest(
  2479. source_content_hash=src_hash,
  2480. source_size_bytes=src_size,
  2481. source_filename_at_parse=kwargs.get("upload_name"),
  2482. critical_file=ManifestFile(
  2483. path="content_list.json",
  2484. size=crit_size,
  2485. sha256=crit_hash,
  2486. ),
  2487. files=[],
  2488. total_size_bytes=crit_size,
  2489. task_id="fake-task",
  2490. api_mode="local",
  2491. options_signature=current_mineru_options_signature(),
  2492. )
  2493. write_manifest(raw_dir, manifest)
  2494. return manifest
  2495. monkeypatch.setattr(MinerURawClient, "download_into", _fake_download)
  2496. await rag.apipeline_enqueue_documents(
  2497. "",
  2498. file_paths=str(src_file),
  2499. track_id="track-hint",
  2500. docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
  2501. parse_engine=PARSER_ENGINE_MINERU,
  2502. )
  2503. doc_id = compute_mdhash_id(canonical_name, prefix="doc-")
  2504. status = await rag.doc_status.get_by_id(doc_id)
  2505. assert status is not None
  2506. assert status["file_path"] == canonical_name
  2507. assert status["metadata"]["source_file_name"] == hinted_name
  2508. content_data = await rag.full_docs.get_by_id(doc_id)
  2509. assert content_data is not None
  2510. content_data["source_file_name"] = status["metadata"]["source_file_name"]
  2511. parsed = await rag.parse_mineru(
  2512. doc_id=doc_id,
  2513. file_path=status["file_path"],
  2514. content_data=content_data,
  2515. )
  2516. blocks_path = Path(parsed["blocks_path"])
  2517. expected_parsed_dir = input_dir / PARSED_DIR_NAME / f"{canonical_name}.parsed"
  2518. expected_raw_dir = (
  2519. input_dir
  2520. / PARSED_DIR_NAME
  2521. / ("LightRAG - Simple and Fast RAG.pdf.mineru_raw")
  2522. )
  2523. archived_source = input_dir / PARSED_DIR_NAME / hinted_name
  2524. assert blocks_path.parent == expected_parsed_dir
  2525. assert expected_raw_dir.is_dir()
  2526. assert not src_file.exists()
  2527. assert archived_source.is_file()
  2528. await rag.finalize_storages()
  2529. asyncio.run(_run())
  2530. @pytest.mark.offline
  2531. def test_mm_chunks_and_modality_relations_from_sidecars(tmp_path):
  2532. async def _run():
  2533. rag = _new_rag(tmp_path)
  2534. await rag.initialize_storages()
  2535. blocks = tmp_path / "demo.blocks.jsonl"
  2536. blocks.write_text(
  2537. "\n".join(
  2538. [
  2539. json.dumps(
  2540. {
  2541. "type": "meta",
  2542. "format": "lightrag",
  2543. "version": "1.0",
  2544. "doc_id": "doc-1",
  2545. },
  2546. ensure_ascii=False,
  2547. ),
  2548. json.dumps(
  2549. {
  2550. "type": "content",
  2551. "blockid": "b1",
  2552. "format": "plain_text",
  2553. "content": "正文",
  2554. },
  2555. ensure_ascii=False,
  2556. ),
  2557. ]
  2558. )
  2559. + "\n",
  2560. encoding="utf-8",
  2561. )
  2562. drawings = tmp_path / "demo.drawings.json"
  2563. drawings.write_text(
  2564. json.dumps(
  2565. {
  2566. "version": "1.0",
  2567. "drawings": {
  2568. "d1": {
  2569. "id": "d1",
  2570. "heading": "章节A",
  2571. "caption": "图1 架构",
  2572. "llm_cache_list": [
  2573. "default:analysis:abc123",
  2574. ],
  2575. "llm_analyze_result": {
  2576. "name": "系统架构图",
  2577. "type": "Chart",
  2578. "description": "模块交互关系",
  2579. "analyze_time": 1700000000,
  2580. "status": "success",
  2581. "message": "",
  2582. },
  2583. }
  2584. },
  2585. },
  2586. ensure_ascii=False,
  2587. ),
  2588. encoding="utf-8",
  2589. )
  2590. mm_chunks = rag._build_mm_chunks_from_sidecars(
  2591. doc_id="doc-1",
  2592. file_path="demo.pdf",
  2593. blocks_path=str(blocks),
  2594. base_order_index=2,
  2595. )
  2596. assert len(mm_chunks) == 1
  2597. chunk = mm_chunks[0]
  2598. # New nested schema: heading + sidecar + llm_cache_list merge.
  2599. assert chunk["content"].startswith("[Image Name]")
  2600. assert "[Image Type]Chart" in chunk["content"]
  2601. assert chunk["sidecar"] == {
  2602. "type": "drawing",
  2603. "id": "d1",
  2604. "refs": [{"type": "drawing", "id": "d1"}],
  2605. }
  2606. assert chunk["heading"] == {
  2607. "level": 0,
  2608. "heading": "章节A",
  2609. "parent_headings": [],
  2610. }
  2611. assert chunk["llm_cache_list"] == ["default:analysis:abc123"]
  2612. # Multimodal entity injection now lives in
  2613. # operate.extract_entities._process_single_content; this test only
  2614. # covers chunk assembly. The companion regression below
  2615. # (test_parse_mm_display_name_matches_chunk_format) pins the
  2616. # builder/consumer format contract.
  2617. await rag.finalize_storages()
  2618. asyncio.run(_run())
  2619. @pytest.mark.offline
  2620. def test_parse_mm_display_name_matches_chunk_format():
  2621. """Pin the builder/consumer contract: the chunk content emitted by
  2622. ``_build_mm_chunks_from_sidecars`` must parse cleanly via
  2623. ``operate._parse_mm_display_name`` for all three modalities.
  2624. Regression for the case where the renderer's label format diverged
  2625. from the consumer's regex and display names silently fell back to
  2626. sidecar ids.
  2627. """
  2628. drawing_content = (
  2629. "[Image Name]系统架构图\n[Image Type]Chart\n\n模块交互关系\n\n"
  2630. "[Image Footnotes]脚注1; 脚注2"
  2631. )
  2632. assert _parse_mm_display_name(drawing_content, "d1") == "系统架构图"
  2633. table_content = "[Table Name]性能对比表\n\n各方法的指标对比"
  2634. assert _parse_mm_display_name(table_content, "t1") == "性能对比表"
  2635. equation_content = "E = mc^2\n[Equation Name]质能方程\n\n爱因斯坦的质能等效公式"
  2636. assert _parse_mm_display_name(equation_content, "e1") == "质能方程"
  2637. # Fallbacks: missing marker, empty content, marker with blank name.
  2638. assert _parse_mm_display_name("no marker here", "fallback-id") == "fallback-id"
  2639. assert _parse_mm_display_name("", "fallback-id") == "fallback-id"
  2640. assert _parse_mm_display_name("[Image Name] ", "fallback-id") == "fallback-id"
  2641. @pytest.mark.offline
  2642. def test_parse_mm_display_name_on_real_builder_output(tmp_path):
  2643. """End-to-end pin: feed actual chunks from
  2644. ``_build_mm_chunks_from_sidecars`` for all three modalities straight
  2645. into the consumer's parser, and require the analysis ``name`` field
  2646. to come back. This locks the bidirectional builder/consumer contract
  2647. so that renaming the ``[Image|Table|Equation] Name`` label without
  2648. updating the regex in ``operate._parse_mm_display_name`` immediately
  2649. breaks here instead of silently degrading relation descriptions.
  2650. """
  2651. async def _run():
  2652. rag = _new_rag(tmp_path)
  2653. await rag.initialize_storages()
  2654. blocks = tmp_path / "demo.pdf.blocks.jsonl"
  2655. blocks.write_text("", encoding="utf-8")
  2656. heading = {"level": 1, "heading": "章节A", "parent_headings": []}
  2657. (tmp_path / "demo.pdf.drawings.json").write_text(
  2658. json.dumps(
  2659. {
  2660. "drawings": {
  2661. "d1": {
  2662. "heading": heading,
  2663. "footnotes": [],
  2664. "llm_cache_list": [],
  2665. "llm_analyze_result": {
  2666. "name": "系统架构图",
  2667. "type": "Chart",
  2668. "description": "模块交互关系",
  2669. "analyze_time": 1700000000,
  2670. "status": "success",
  2671. "message": "",
  2672. },
  2673. }
  2674. }
  2675. },
  2676. ensure_ascii=False,
  2677. ),
  2678. encoding="utf-8",
  2679. )
  2680. (tmp_path / "demo.pdf.tables.json").write_text(
  2681. json.dumps(
  2682. {
  2683. "tables": {
  2684. "t1": {
  2685. "heading": heading,
  2686. "footnotes": [],
  2687. "llm_cache_list": [],
  2688. "llm_analyze_result": {
  2689. "name": "性能对比表",
  2690. "description": "各方法的指标对比",
  2691. "analyze_time": 1700000001,
  2692. "status": "success",
  2693. "message": "",
  2694. },
  2695. }
  2696. }
  2697. },
  2698. ensure_ascii=False,
  2699. ),
  2700. encoding="utf-8",
  2701. )
  2702. (tmp_path / "demo.pdf.equations.json").write_text(
  2703. json.dumps(
  2704. {
  2705. "equations": {
  2706. "e1": {
  2707. "heading": heading,
  2708. "footnotes": [],
  2709. "llm_cache_list": [],
  2710. "llm_analyze_result": {
  2711. "name": "质能方程",
  2712. "equation": "E = mc^2",
  2713. "description": "爱因斯坦的质能等效公式",
  2714. "analyze_time": 1700000002,
  2715. "status": "success",
  2716. "message": "",
  2717. },
  2718. }
  2719. }
  2720. },
  2721. ensure_ascii=False,
  2722. ),
  2723. encoding="utf-8",
  2724. )
  2725. mm_chunks = rag._build_mm_chunks_from_sidecars(
  2726. doc_id="doc-1",
  2727. file_path="demo.pdf",
  2728. blocks_path=str(blocks),
  2729. base_order_index=0,
  2730. process_options="ite",
  2731. )
  2732. # Index by sidecar type for stable assertions independent of
  2733. # iteration order in the builder.
  2734. by_type = {chunk["sidecar"]["type"]: chunk for chunk in mm_chunks}
  2735. assert set(by_type.keys()) == {"drawing", "table", "equation"}
  2736. expected = {
  2737. "drawing": ("d1", "系统架构图"),
  2738. "table": ("t1", "性能对比表"),
  2739. "equation": ("e1", "质能方程"),
  2740. }
  2741. for kind, (sidecar_id, name) in expected.items():
  2742. display = _parse_mm_display_name(by_type[kind]["content"], sidecar_id)
  2743. assert display == name, (
  2744. f"{kind}: parser failed to extract '{name}' from builder "
  2745. f"output, got '{display}' (content: {by_type[kind]['content']!r})"
  2746. )
  2747. await rag.finalize_storages()
  2748. asyncio.run(_run())
  2749. @pytest.mark.offline
  2750. def test_parse_mineru_empty_service_result_raises_without_fallback(
  2751. tmp_path, monkeypatch
  2752. ):
  2753. """When MinerU produces no content_list.json the IR builder raises and the
  2754. pipeline propagates the error — no silent fallback to raw text.
  2755. With the unified pipeline, an "empty result" surfaces as a missing
  2756. critical file inside ``*.mineru_raw/``; the IR builder's
  2757. ``normalize_from_workdir`` raises :class:`FileNotFoundError` and the
  2758. parse fails fast.
  2759. """
  2760. from lightrag.parser.external.mineru.client import MinerURawClient
  2761. async def _run():
  2762. rag = _new_rag(tmp_path)
  2763. await rag.initialize_storages()
  2764. src_file = tmp_path / "demo.pdf"
  2765. src_file.write_bytes(b"fake-pdf")
  2766. async def _fake_download(self, raw_dir, source_file_path, **_kwargs):
  2767. # Simulate a "MinerU returned nothing useful" bundle: dir is
  2768. # touched but no content_list.json is produced.
  2769. raw_dir.mkdir(parents=True, exist_ok=True)
  2770. monkeypatch.setattr(MinerURawClient, "download_into", _fake_download)
  2771. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake")
  2772. with pytest.raises(FileNotFoundError, match="content_list.json"):
  2773. await rag.parse_mineru(
  2774. doc_id="doc-local-1",
  2775. file_path=str(src_file),
  2776. content_data={"content": "native fallback content"},
  2777. )
  2778. await rag.finalize_storages()
  2779. asyncio.run(_run())
  2780. @pytest.mark.offline
  2781. def test_build_chunks_dict_preserves_existing_llm_cache_list():
  2782. """Regression: build_chunks_dict_from_chunking_result must not overwrite
  2783. a chunk's pre-existing llm_cache_list — multimodal chunks arrive with
  2784. analysis cache ids already attached so document deletion can clean
  2785. them up via the per-chunk llm_cache_list."""
  2786. from lightrag.utils_pipeline import build_chunks_dict_from_chunking_result
  2787. chunking_result = [
  2788. {
  2789. "chunk_order_index": 0,
  2790. "content": "first chunk",
  2791. "tokens": 4,
  2792. },
  2793. {
  2794. "chunk_id": "doc-1-mm-drawing-000",
  2795. "chunk_order_index": 1,
  2796. "content": "second chunk",
  2797. "tokens": 6,
  2798. "llm_cache_list": [
  2799. "default:analysis:abc",
  2800. "default:analysis:abc", # dedup verification
  2801. "default:analysis:def",
  2802. ],
  2803. },
  2804. ]
  2805. chunks = build_chunks_dict_from_chunking_result(
  2806. chunking_result, doc_id="doc-1", file_path="demo.pdf"
  2807. )
  2808. # Order is chunking_result order; locate by chunk_id.
  2809. mm_chunk = next(
  2810. v for v in chunks.values() if v.get("chunk_id") == "doc-1-mm-drawing-000"
  2811. )
  2812. text_chunk = next(
  2813. v for v in chunks.values() if v.get("chunk_id") != "doc-1-mm-drawing-000"
  2814. )
  2815. assert mm_chunk["llm_cache_list"] == [
  2816. "default:analysis:abc",
  2817. "default:analysis:def",
  2818. ]
  2819. # Plain text chunks still start with an empty list (no pre-existing ids).
  2820. assert text_chunk["llm_cache_list"] == []
  2821. @pytest.mark.offline
  2822. def test_build_mm_chunks_respects_process_options_filter(tmp_path):
  2823. """Regression: _build_mm_chunks_from_sidecars must gate sidecar reads
  2824. by the active process_options. A document re-processed after opting
  2825. out of i/t/e MUST NOT pick up stale success results from a prior pass.
  2826. """
  2827. async def _run():
  2828. rag = _new_rag(tmp_path)
  2829. await rag.initialize_storages()
  2830. blocks = tmp_path / "demo.blocks.jsonl"
  2831. blocks.write_text(
  2832. "\n".join(
  2833. [
  2834. json.dumps({"type": "meta", "format_version": "1.0"}),
  2835. json.dumps({"type": "content", "content": "body"}),
  2836. ]
  2837. )
  2838. + "\n",
  2839. encoding="utf-8",
  2840. )
  2841. # Both modalities carry a stale ``success`` from a prior pass.
  2842. drawings = tmp_path / "demo.drawings.json"
  2843. drawings.write_text(
  2844. json.dumps(
  2845. {
  2846. "drawings": {
  2847. "d1": {
  2848. "id": "d1",
  2849. "llm_analyze_result": {
  2850. "name": "old",
  2851. "type": "Chart",
  2852. "description": "stale drawing",
  2853. "analyze_time": 1700000000,
  2854. "status": "success",
  2855. "message": "",
  2856. },
  2857. }
  2858. }
  2859. }
  2860. ),
  2861. encoding="utf-8",
  2862. )
  2863. tables = tmp_path / "demo.tables.json"
  2864. tables.write_text(
  2865. json.dumps(
  2866. {
  2867. "tables": {
  2868. "t1": {
  2869. "id": "t1",
  2870. "llm_analyze_result": {
  2871. "name": "old",
  2872. "description": "stale table",
  2873. "analyze_time": 1700000000,
  2874. "status": "success",
  2875. "message": "",
  2876. },
  2877. }
  2878. }
  2879. }
  2880. ),
  2881. encoding="utf-8",
  2882. )
  2883. # process_options="t" → only tables are considered; the drawing
  2884. # success entry must NOT generate a chunk.
  2885. only_tables = rag._build_mm_chunks_from_sidecars(
  2886. doc_id="doc-1",
  2887. file_path="demo.pdf",
  2888. blocks_path=str(blocks),
  2889. base_order_index=0,
  2890. process_options="t",
  2891. )
  2892. assert len(only_tables) == 1
  2893. assert only_tables[0]["sidecar"]["type"] == "table"
  2894. # Empty/None process_options → no modalities active → no chunks.
  2895. none_active = rag._build_mm_chunks_from_sidecars(
  2896. doc_id="doc-1",
  2897. file_path="demo.pdf",
  2898. blocks_path=str(blocks),
  2899. base_order_index=0,
  2900. process_options="",
  2901. )
  2902. assert none_active == []
  2903. # Backwards-compat: callers that pass process_options=None see
  2904. # every modality (legacy behaviour for ad-hoc unit tests).
  2905. legacy = rag._build_mm_chunks_from_sidecars(
  2906. doc_id="doc-1",
  2907. file_path="demo.pdf",
  2908. blocks_path=str(blocks),
  2909. base_order_index=0,
  2910. )
  2911. assert {ch["sidecar"]["type"] for ch in legacy} == {"drawing", "table"}
  2912. await rag.finalize_storages()
  2913. asyncio.run(_run())
  2914. @pytest.mark.offline
  2915. def test_strip_internal_multimodal_markup_cleans_table_id():
  2916. """Regression: parser-emitted ``<table id="tb-...">`` tags must have
  2917. their internal id stripped before the entity-extraction prompt sees
  2918. them. ``format`` / ``caption`` and the row body stay verbatim so the
  2919. extractor still recognizes the structured element."""
  2920. from lightrag.chunk_schema import (
  2921. strip_internal_multimodal_markup_for_extraction,
  2922. )
  2923. source = (
  2924. '<table id="tb-1-0001" format="json" caption="Indicator metrics">'
  2925. '[["a","b"],["1","2"]]'
  2926. "</table>"
  2927. )
  2928. cleaned = strip_internal_multimodal_markup_for_extraction(source)
  2929. assert "tb-1-0001" not in cleaned
  2930. assert 'format="json"' in cleaned
  2931. assert 'caption="Indicator metrics"' in cleaned
  2932. # Row body preserved.
  2933. assert '[["a","b"],["1","2"]]' in cleaned
  2934. @pytest.mark.offline
  2935. def test_strip_internal_multimodal_markup_cite_default_unwraps():
  2936. """Default (keep_cite_tag=False) is the entity-extraction path: the
  2937. ``<cite>`` wrapper is stripped so the extractor does not surface it
  2938. as a structural entity — only the visible label survives.
  2939. The surrounding-context path overrides this via keep_cite_tag=True
  2940. (verified in tests/pipeline/test_multimodal_surrounding_context.py); this
  2941. test pins the default to prevent regressions on the extraction
  2942. path when callers refactor the function signature.
  2943. """
  2944. from lightrag.chunk_schema import (
  2945. strip_internal_multimodal_markup_for_extraction,
  2946. )
  2947. source = (
  2948. 'see <cite type="table" refid="tb-1-0001">表 1</cite> and '
  2949. '<cite type="equation" refid="eq-1-0002">公式 2</cite> for details.'
  2950. )
  2951. cleaned = strip_internal_multimodal_markup_for_extraction(source)
  2952. # Wrappers and ids both gone; visible labels survive as plain text.
  2953. assert "<cite" not in cleaned
  2954. assert "refid=" not in cleaned
  2955. assert "tb-1-0001" not in cleaned
  2956. assert "eq-1-0002" not in cleaned
  2957. assert "表 1" in cleaned
  2958. assert "公式 2" in cleaned
  2959. @pytest.mark.offline
  2960. def test_strip_internal_multimodal_markup_cite_keep_tag_strips_refid_only():
  2961. """keep_cite_tag=True (surrounding-context path): preserve the
  2962. ``<cite type="…">label</cite>`` wrapper but drop the parser-
  2963. internal ``refid``. Other identifier transformations
  2964. (``<table id=…>`` / ``<drawing id=…/>`` / ``<equation id=…>``) are
  2965. unaffected by the flag and still apply."""
  2966. from lightrag.chunk_schema import (
  2967. strip_internal_multimodal_markup_for_extraction,
  2968. )
  2969. source = (
  2970. 'see <cite type="table" refid="tb-1-0001">表 1</cite>; '
  2971. '<drawing id="im-1" path="a.png" src="a" caption="Fig" />'
  2972. )
  2973. cleaned = strip_internal_multimodal_markup_for_extraction(
  2974. source, keep_cite_tag=True
  2975. )
  2976. assert '<cite type="table">表 1</cite>' in cleaned
  2977. assert "refid=" not in cleaned
  2978. assert "tb-1-0001" not in cleaned
  2979. # Non-cite cleaning still applies in this mode.
  2980. assert '<drawing caption="Fig" />' in cleaned
  2981. assert 'id="im-1"' not in cleaned
  2982. assert "path=" not in cleaned
  2983. @pytest.mark.offline
  2984. def test_reinsert_without_process_options_skips_stale_mm_chunks(tmp_path):
  2985. """Regression for the call-site fallback in process_single_document.
  2986. A document re-inserted without ``process_options`` is signalled by a
  2987. missing / falsy ``content_data["process_options"]`` field. The
  2988. pipeline must pass ``""`` (not ``None``) to
  2989. ``_build_mm_chunks_from_sidecars`` so the builder honors the
  2990. "no modalities" contract: stale ``status="success"`` sidecar entries
  2991. from an earlier i/t/e pass MUST NOT be re-indexed.
  2992. The new builder happens to handle ``None`` by enabling every
  2993. modality for ad-hoc callers (unit tests), so this test pins the
  2994. call-site behaviour rather than the helper's default — passing the
  2995. same falsy value via ``or ""`` makes the intent explicit.
  2996. """
  2997. async def _run():
  2998. rag = _new_rag(tmp_path)
  2999. await rag.initialize_storages()
  3000. blocks = tmp_path / "demo.blocks.jsonl"
  3001. blocks.write_text(
  3002. "\n".join(
  3003. [
  3004. json.dumps({"type": "meta", "format_version": "1.0"}),
  3005. json.dumps({"type": "content", "content": "body"}),
  3006. ]
  3007. )
  3008. + "\n",
  3009. encoding="utf-8",
  3010. )
  3011. # Stale success from an earlier multimodal run.
  3012. drawings = tmp_path / "demo.drawings.json"
  3013. drawings.write_text(
  3014. json.dumps(
  3015. {
  3016. "drawings": {
  3017. "d1": {
  3018. "id": "d1",
  3019. "llm_analyze_result": {
  3020. "name": "old",
  3021. "type": "Chart",
  3022. "description": "stale drawing",
  3023. "analyze_time": 1700000000,
  3024. "status": "success",
  3025. "message": "",
  3026. },
  3027. }
  3028. }
  3029. }
  3030. ),
  3031. encoding="utf-8",
  3032. )
  3033. # Simulate the call-site contract: ``content_data`` has no
  3034. # ``process_options`` key, so ``.get(...) or ""`` yields "".
  3035. content_data: dict[str, str] = {}
  3036. effective = (content_data or {}).get("process_options") or ""
  3037. assert effective == ""
  3038. mm_chunks = rag._build_mm_chunks_from_sidecars(
  3039. doc_id="doc-1",
  3040. file_path="demo.pdf",
  3041. blocks_path=str(blocks),
  3042. base_order_index=0,
  3043. process_options=effective,
  3044. )
  3045. assert mm_chunks == []
  3046. await rag.finalize_storages()
  3047. asyncio.run(_run())