| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528 |
- import asyncio
- import json
- from pathlib import Path
- import numpy as np
- import pytest
- from lightrag import LightRAG, ROLES, RoleLLMConfig
- from lightrag.base import DocStatus
- from lightrag.constants import (
- FULL_DOCS_FORMAT_PENDING_PARSE,
- PARSED_DIR_NAME,
- PARSER_ENGINE_MINERU,
- PARSER_ENGINE_NATIVE,
- )
- from lightrag.operate import (
- _get_relationship_vdb_timeout_seconds,
- _parse_mm_display_name,
- )
- from lightrag.parser.routing import (
- FilenameParserHintError,
- ParserRoutingConfigError,
- canonicalize_parser_hinted_basename,
- resolve_file_parser_engine,
- resolve_stored_document_parser_engine,
- validate_parser_routing_config,
- )
- from lightrag.utils import (
- EmbeddingFunc,
- Tokenizer,
- compute_mdhash_id,
- safe_vdb_operation_with_exception,
- )
- class _SimpleTokenizerImpl:
- def encode(self, content: str) -> list[int]:
- return [ord(ch) for ch in content]
- def decode(self, tokens: list[int]) -> str:
- return "".join(chr(t) for t in tokens)
- async def _mock_embedding(texts: list[str]) -> np.ndarray:
- return np.random.rand(len(texts), 32)
- async def _mock_llm(prompt, **kwargs):
- return '{"name":"x","summary":"s","detail_description":"d"}'
- _ROLE_FIELD_SUFFIXES = (
- ("_llm_model_func", "func"),
- ("_llm_model_kwargs", "kwargs"),
- ("_llm_model_max_async", "max_async"),
- ("_llm_timeout", "timeout"),
- )
- def _new_rag(tmp_path: Path, **kwargs) -> LightRAG:
- role_configs: dict[str, RoleLLMConfig] = {}
- for spec in ROLES:
- bucket = {}
- for suffix, target in _ROLE_FIELD_SUFFIXES:
- key = f"{spec.name}{suffix}"
- if key in kwargs:
- bucket[target] = kwargs.pop(key)
- if bucket:
- role_configs[spec.name] = RoleLLMConfig(**bucket)
- if role_configs:
- kwargs["role_llm_configs"] = role_configs
- # analyze_multimodal short-circuits when vlm_process_enable is False; this
- # helper drives several VLM-specific tests, so default the switch ON.
- kwargs.setdefault("vlm_process_enable", True)
- return LightRAG(
- working_dir=str(tmp_path),
- workspace=f"test-release-closure-{tmp_path.name}",
- llm_model_func=_mock_llm,
- embedding_func=EmbeddingFunc(
- embedding_dim=32,
- max_token_size=4096,
- func=_mock_embedding,
- ),
- tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
- **kwargs,
- )
- @pytest.mark.offline
- def test_parse_engine_routing_by_filename_and_env(monkeypatch):
- monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
- assert (
- resolve_stored_document_parser_engine("a.[docling-iet].docx", {}) == "docling"
- )
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- monkeypatch.setenv("LIGHTRAG_PARSER", "pdf:mineru-iet,*:native")
- assert resolve_stored_document_parser_engine("paper.pdf", {}) == "mineru"
- assert (
- resolve_stored_document_parser_engine("paper.pdf", {"parse_engine": "native"})
- == "legacy"
- )
- @pytest.mark.offline
- def test_parse_engine_rule_fallback_and_default_legacy(monkeypatch):
- monkeypatch.setenv("LIGHTRAG_PARSER", "pdf:native,*:legacy")
- assert resolve_stored_document_parser_engine("paper.pdf", {}) == "legacy"
- monkeypatch.setenv("LIGHTRAG_PARSER", "pptx:docling,*:legacy")
- monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
- assert resolve_stored_document_parser_engine("slides.pptx", {}) == "legacy"
- monkeypatch.delenv("LIGHTRAG_PARSER", raising=False)
- monkeypatch.delenv("MINERU_LOCAL_ENDPOINT", raising=False)
- assert resolve_stored_document_parser_engine("slides.pptx", {}) == "legacy"
- @pytest.mark.offline
- def test_canonicalize_parser_hinted_basename():
- assert canonicalize_parser_hinted_basename("abc.[native].docx") == "abc.docx"
- assert canonicalize_parser_hinted_basename("/tmp/a.b.[mineru].pdf") == "a.b.pdf"
- assert canonicalize_parser_hinted_basename("abc.[draft].docx") == "abc.[draft].docx"
- # Engine token is case-insensitive (normalize_parser_engine lower-cases).
- assert canonicalize_parser_hinted_basename("abc.[NATIVE].docx") == "abc.docx"
- # Engine sub-variants like "mineru-iet" normalize to a base engine.
- assert canonicalize_parser_hinted_basename("abc.[mineru-iet].pdf") == "abc.pdf"
- # No extension after the bracket: pattern requires ``.[engine].ext``.
- assert canonicalize_parser_hinted_basename("abc.[native]") == "abc.[native]"
- # Plain basename without any hint is returned unchanged.
- assert canonicalize_parser_hinted_basename("abc.docx") == "abc.docx"
- # Bracket without a leading dot is not a hint.
- assert canonicalize_parser_hinted_basename("[native].docx") == "[native].docx"
- # Nested hints: only the outermost segment is stripped.
- assert (
- canonicalize_parser_hinted_basename("name.[native].[mineru].pdf")
- == "name.[native].pdf"
- )
- # New options-only and engine+options forms strip cleanly too.
- assert canonicalize_parser_hinted_basename("foo.[-!].docx") == "foo.docx"
- assert canonicalize_parser_hinted_basename("foo.[native-iet].docx") == "foo.docx"
- assert canonicalize_parser_hinted_basename("foo.[mineru-R!].pdf") == "foo.pdf"
- # Options without the leading hyphen and unknown parsers are left alone.
- assert canonicalize_parser_hinted_basename("foo.[!].docx") == "foo.[!].docx"
- assert (
- canonicalize_parser_hinted_basename("foo.[native-].docx")
- == "foo.[native-].docx"
- )
- assert canonicalize_parser_hinted_basename("foo.[xyz].docx") == "foo.[xyz].docx"
- @pytest.mark.offline
- def test_filename_parser_directives_decodes_engine_and_options():
- from lightrag.parser.routing import filename_parser_directives
- assert filename_parser_directives("paper.[native-iet].docx") == ("native", "iet")
- assert filename_parser_directives("memo.[native-R!].md") == ("native", "R!")
- assert filename_parser_directives("report.[-!].pdf") == (None, "!")
- assert filename_parser_directives("doc.[mineru].docx") == ("mineru", "")
- assert filename_parser_directives("foo.docx") == (None, "")
- # Unsupported tokens and old options-only syntax stay unparsed.
- assert filename_parser_directives("foo.[!].docx") == (None, "")
- assert filename_parser_directives("foo.[draft].docx") == (None, "")
- @pytest.mark.offline
- def test_filename_hint_rejects_invalid_engine_qualified_options():
- """Engine-qualified hints with bad option chars must fail validation
- during parser directive resolution instead of silently falling back to
- parser rules/defaults.
- """
- from lightrag.parser.routing import (
- canonicalize_parser_hinted_basename,
- filename_parser_directives,
- resolve_file_parser_directives,
- )
- # Low-level helpers stay non-throwing for scan grouping/canonicalization.
- assert filename_parser_directives("foo.[native-FR].docx") == (None, "")
- assert filename_parser_directives("foo.[native-Q].docx") == (None, "")
- assert (
- canonicalize_parser_hinted_basename("foo.[native-FR].docx")
- == "foo.[native-FR].docx"
- )
- assert (
- canonicalize_parser_hinted_basename("foo.[native-Q].docx")
- == "foo.[native-Q].docx"
- )
- with pytest.raises(FilenameParserHintError, match="multiple chunking modes"):
- resolve_file_parser_directives("foo.[native-FR].docx")
- with pytest.raises(FilenameParserHintError, match="unsupported character"):
- resolve_file_parser_directives("foo.[native-Q].docx")
- with pytest.raises(FilenameParserHintError, match="options-only filename hints"):
- resolve_file_parser_directives("foo.[!].docx")
- with pytest.raises(FilenameParserHintError, match="options-only filename hints"):
- resolve_file_parser_directives("foo.[iet].docx")
- with pytest.raises(
- FilenameParserHintError, match="unsupported parser engine 'abc'"
- ):
- resolve_file_parser_directives("foo.[abc].docx")
- with pytest.raises(
- FilenameParserHintError, match="unsupported parser engine 'xyz'"
- ):
- resolve_file_parser_directives("foo.[xyz].docx")
- with pytest.raises(FilenameParserHintError, match="is empty"):
- resolve_file_parser_directives("foo.[].docx")
- with pytest.raises(FilenameParserHintError, match="empty process options"):
- resolve_file_parser_directives("foo.[-].docx")
- with pytest.raises(FilenameParserHintError, match="empty process options"):
- resolve_file_parser_directives("foo.[native-].docx")
- @pytest.mark.offline
- def test_filename_hint_missing_required_endpoint_rejects(monkeypatch):
- from lightrag.parser.routing import resolve_file_parser_directives
- monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
- with pytest.raises(FilenameParserHintError, match="requires DOCLING_ENDPOINT"):
- resolve_file_parser_directives("foo.[docling].docx")
- @pytest.mark.offline
- def test_parse_process_options_decodes_flags():
- from lightrag.parser.routing import parse_process_options
- opts = parse_process_options("iet")
- assert opts.images and opts.tables and opts.equations
- assert not opts.skip_kg
- assert opts.chunking == "F"
- opts = parse_process_options("R!")
- assert opts.skip_kg and opts.chunking == "R"
- assert not opts.images and not opts.tables and not opts.equations
- opts = parse_process_options("P")
- assert opts.chunking == "P"
- opts = parse_process_options("")
- assert not (opts.images or opts.tables or opts.equations or opts.skip_kg)
- assert opts.chunking == "F"
- @pytest.mark.offline
- def test_validate_process_options_rejects_invalid_combos():
- from lightrag.parser.routing import validate_process_options
- assert validate_process_options("iet") == []
- assert validate_process_options("R!") == []
- # F+R conflict is reported.
- errs = validate_process_options("FR")
- assert any("multiple chunking modes" in m for m in errs)
- # Lowercase chunking selectors are not valid.
- errs = validate_process_options("f")
- assert any("'f'" in m for m in errs)
- # Unknown chars are reported individually.
- errs = validate_process_options("xyz")
- assert sum(1 for m in errs if "unsupported character" in m) == 3
- @pytest.mark.offline
- def test_lightrag_parser_rule_supports_options_suffix(monkeypatch):
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
- # Valid options suffix passes validation.
- validate_parser_routing_config("docx:native-iet,*:legacy")
- # Invalid options suffix is rejected with the rule label and message.
- with pytest.raises(ParserRoutingConfigError, match="multiple chunking modes"):
- validate_parser_routing_config("docx:native-FR,*:legacy")
- with pytest.raises(ParserRoutingConfigError, match="unsupported character"):
- validate_parser_routing_config("docx:native-Q,*:legacy")
- @pytest.mark.offline
- def test_resolve_file_parser_directives_priority(monkeypatch):
- from lightrag.parser.routing import resolve_file_parser_directives
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native-iet,*:legacy")
- # Filename hint takes precedence for engine and options.
- engine, options = resolve_file_parser_directives("paper.[native-R!].docx")
- assert engine == "native"
- assert options == "R!"
- # No filename hint → fall through to LIGHTRAG_PARSER defaults for both.
- engine, options = resolve_file_parser_directives("plain.docx")
- assert engine == "native"
- assert options == "iet"
- # Options-only hint keeps engine from rule but uses hinted options.
- engine, options = resolve_file_parser_directives("plain.[-!].docx")
- assert engine == "native"
- assert options == "!"
- @pytest.mark.offline
- def test_doc_status_metadata_carry_over_helper():
- """``doc_status_transition_metadata`` preserves long-lived per-doc fields
- and layers in any transition-specific extras passed via ``extra=``.
- Empty / missing carry-over fields are dropped, not written as null.
- """
- from lightrag.utils_pipeline import doc_status_transition_metadata
- class _StubStatusDoc:
- def __init__(self, metadata):
- self.metadata = metadata
- # Carries process_options forward.
- md = doc_status_transition_metadata(_StubStatusDoc({"process_options": "iet"}))
- assert md == {"process_options": "iet"}
- # Carries the internal pending-parse source basename forward for retries.
- md = doc_status_transition_metadata(
- _StubStatusDoc({"source_file_name": "demo.[mineru].pdf"})
- )
- assert md == {"source_file_name": "demo.[mineru].pdf"}
- # Layers in transition extras while keeping the carry-over.
- md = doc_status_transition_metadata(
- _StubStatusDoc({"process_options": "R!"}),
- extra={"processing_start_time": 12345},
- )
- assert md == {"process_options": "R!", "processing_start_time": 12345}
- # No carry-over when metadata is missing or empty.
- assert doc_status_transition_metadata(_StubStatusDoc({})) == {}
- assert doc_status_transition_metadata(None) == {}
- # Empty / None process_options are not written as null.
- assert doc_status_transition_metadata(_StubStatusDoc({"process_options": ""})) == {}
- assert (
- doc_status_transition_metadata(_StubStatusDoc({"process_options": None})) == {}
- )
- @pytest.mark.offline
- def test_carry_over_keys_grouped_by_stage():
- """Strict tuple-equality guard on ``_DOC_STATUS_METADATA_CARRY_OVER_KEYS``.
- The tuple order is the WebUI ``DocumentStatusDetailsDialog`` render order,
- so per-stage fields must stay grouped (parse-stage trio then analyze-stage
- trio). Locking the order here forces any future field addition to update
- this assertion alongside the tuple, preventing silent regressions in the
- dialog's timeline-ordered display.
- """
- from lightrag.utils_pipeline import _DOC_STATUS_METADATA_CARRY_OVER_KEYS
- assert _DOC_STATUS_METADATA_CARRY_OVER_KEYS == (
- "process_options",
- "source_file_name",
- "parse_warnings",
- "chunk_opts",
- "parsing_start_time",
- "parsing_end_time",
- "parse_stage_skipped",
- "analyzing_start_time",
- "analyzing_end_time",
- "analyzing_stage_skipped",
- )
- @pytest.mark.offline
- def test_carry_over_helper_propagates_end_times_and_skipped():
- """Stage-end timestamps and skipped flags must survive carry-over so the
- PROCESSING / PROCESSED / FAILED upserts keep them visible for post-mortem
- stage-duration analysis.
- """
- from lightrag.utils_pipeline import doc_status_transition_metadata
- class _StubStatusDoc:
- def __init__(self, metadata):
- self.metadata = metadata
- md = doc_status_transition_metadata(
- _StubStatusDoc(
- {
- "parsing_start_time": 1700000000,
- "parsing_end_time": 1700000010,
- "analyzing_start_time": 1700000020,
- "analyzing_end_time": 1700000050,
- }
- )
- )
- assert md == {
- "parsing_start_time": 1700000000,
- "parsing_end_time": 1700000010,
- "analyzing_start_time": 1700000020,
- "analyzing_end_time": 1700000050,
- }
- # Skipped flags (bool True) also survive carry-over.
- md = doc_status_transition_metadata(
- _StubStatusDoc(
- {
- "parse_stage_skipped": True,
- "analyzing_stage_skipped": True,
- }
- )
- )
- assert md == {
- "parse_stage_skipped": True,
- "analyzing_stage_skipped": True,
- }
- def _status_value_text(status):
- """Helper: extract the value of a DocStatus enum or raw status string."""
- if hasattr(status, "value"):
- return status.value
- return str(status)
- @pytest.mark.offline
- def test_doc_status_metadata_survives_processed_transition(tmp_path):
- """End-to-end: a document enqueued with process_options must keep
- ``metadata.process_options`` set in ``doc_status`` after the pipeline
- drives it all the way to PROCESSED. This exercises the full state
- machine (PENDING → PARSING → ANALYZING → PROCESSING → PROCESSED) and
- asserts the carry-over works at every transition.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "Some content body for chunking.",
- file_paths="metadata_carry.txt",
- track_id="track-md-carry",
- process_options="iet!",
- )
- doc_id = compute_mdhash_id("metadata_carry.txt", prefix="doc-")
- pending_status = await rag.doc_status.get_by_id(doc_id)
- assert pending_status is not None
- assert (pending_status.get("metadata") or {}).get(
- "process_options"
- ) == "iet!"
- # Run the pipeline through to PROCESSED.
- await rag.apipeline_process_enqueue_documents()
- final_status = await rag.doc_status.get_by_id(doc_id)
- assert final_status is not None
- assert _status_value_text(final_status.get("status")) == "processed"
- metadata = final_status.get("metadata") or {}
- assert metadata.get("process_options") == "iet!", (
- f"process_options dropped during state-machine transitions; "
- f"final metadata: {metadata!r}"
- )
- # parse_native on FULL_DOCS_FORMAT_RAW does not actually parse —
- # it passes content through verbatim — so the skip branch fires
- # and ``parsing_end_time`` stays absent. ``parse_stage_skipped``
- # is the cache-hit / no-parse-work sentinel (same field used by
- # parse_mineru / parse_docling for raw-bundle cache hits).
- assert isinstance(metadata.get("parsing_start_time"), int)
- assert metadata.get("parse_stage_skipped") is True
- assert "parsing_end_time" not in metadata
- # parse_native on raw content returns blocks_path="", which makes
- # analyze_multimodal take the "no blocks_path" early-return branch
- # and set analyzing_stage_skipped=True (no analyzing_end_time).
- assert isinstance(metadata.get("analyzing_start_time"), int)
- assert metadata.get("analyzing_stage_skipped") is True
- assert "analyzing_end_time" not in metadata
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_analyze_soft_failure_writes_neither_end_time_nor_skipped(tmp_path):
- """If ``analyze_multimodal`` returns without setting either the
- ``multimodal_processed`` (completion) or ``analyzing_stage_skipped``
- (user/config skip) sentinel — e.g. the generic ``except Exception``
- soft-swallow path or a malformed-sidecar early return — the worker must
- treat the attempt as a soft failure and leave BOTH fields absent. This
- distinguishes "analyze actually completed" from "analyze attempted but
- bailed" without falsely claiming a duration.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- original_analyze_multimodal = rag.analyze_multimodal
- async def _soft_failing_analyze(*args, **kwargs):
- result = await original_analyze_multimodal(*args, **kwargs)
- # Strip both sentinels: simulate analyze_multimodal returning
- # parsed_data after the generic except Exception soft-swallow.
- result.pop("analyzing_stage_skipped", None)
- result.pop("multimodal_processed", None)
- return result
- rag.analyze_multimodal = _soft_failing_analyze # type: ignore[assignment]
- await rag.apipeline_enqueue_documents(
- "Soft-fail body for analyze stage.",
- file_paths="analyze_soft_fail.txt",
- track_id="track-analyze-softfail",
- process_options="iet!",
- )
- doc_id = compute_mdhash_id("analyze_soft_fail.txt", prefix="doc-")
- await rag.apipeline_process_enqueue_documents()
- final_status = await rag.doc_status.get_by_id(doc_id)
- assert final_status is not None
- assert _status_value_text(final_status.get("status")) == "processed"
- metadata = final_status.get("metadata") or {}
- assert isinstance(metadata.get("analyzing_start_time"), int)
- assert "analyzing_end_time" not in metadata, (
- f"soft-failed analyze incorrectly stamped analyzing_end_time; "
- f"final metadata: {metadata!r}"
- )
- assert "analyzing_stage_skipped" not in metadata, (
- f"soft-failed analyze incorrectly stamped analyzing_stage_skipped; "
- f"final metadata: {metadata!r}"
- )
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_apipeline_enqueue_persists_process_options(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "alpha body",
- file_paths="abc.[native-R!].docx",
- track_id="track-opts",
- process_options="R!",
- )
- doc_id = compute_mdhash_id("abc.docx", prefix="doc-")
- full_doc = await rag.full_docs.get_by_id(doc_id)
- assert full_doc is not None
- # full_docs stores the canonical (hint-stripped) basename only.
- assert full_doc["file_path"] == "abc.docx"
- assert "canonical_basename" not in full_doc
- assert full_doc.get("process_options") == "R!"
- status_doc = await rag.doc_status.get_by_id(doc_id)
- assert status_doc is not None
- metadata = (
- status_doc.get("metadata")
- if isinstance(status_doc, dict)
- else getattr(status_doc, "metadata", {})
- )
- assert metadata.get("process_options") == "R!"
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_purge_doc_chunks_and_kg_is_noop_for_empty_chunks(tmp_path):
- """``_purge_doc_chunks_and_kg`` with an empty chunk_ids set must be a
- no-op so callers (including the resume branch) can invoke it
- unconditionally without first checking for non-empty chunks_list.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Empty set: must return immediately without touching storage.
- await rag._purge_doc_chunks_and_kg(
- "doc-empty",
- set(),
- pipeline_status=pipeline_status,
- pipeline_status_lock=pipeline_status_lock,
- )
- # No exceptions → success. Calling twice in a row is also fine
- # since the helper is idempotent on the empty input.
- await rag._purge_doc_chunks_and_kg(
- "doc-empty",
- set(),
- pipeline_status=pipeline_status,
- pipeline_status_lock=pipeline_status_lock,
- )
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_purge_doc_chunks_and_kg_clears_chunks_for_unknown_doc(tmp_path):
- """When the doc has chunk_ids but no graph contributions yet
- (full_entities / full_relations empty), the helper must still clear
- the chunks from chunks_vdb / text_chunks without raising. This
- exercises the resume path for documents whose previous run was
- interrupted between chunking and entity extraction.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- # Seed text_chunks + chunks_vdb with two stale chunks.
- await rag.text_chunks.upsert(
- {
- "doc-X-chunk-0": {
- "content": "stale chunk 0",
- "chunk_order_index": 0,
- "full_doc_id": "doc-X",
- "tokens": 4,
- "file_path": "x.txt",
- },
- "doc-X-chunk-1": {
- "content": "stale chunk 1",
- "chunk_order_index": 1,
- "full_doc_id": "doc-X",
- "tokens": 4,
- "file_path": "x.txt",
- },
- }
- )
- await rag.chunks_vdb.upsert(
- {
- "doc-X-chunk-0": {
- "content": "stale chunk 0",
- "chunk_order_index": 0,
- "full_doc_id": "doc-X",
- "tokens": 4,
- "file_path": "x.txt",
- },
- "doc-X-chunk-1": {
- "content": "stale chunk 1",
- "chunk_order_index": 1,
- "full_doc_id": "doc-X",
- "tokens": 4,
- "file_path": "x.txt",
- },
- }
- )
- await rag.text_chunks.index_done_callback()
- await rag.chunks_vdb.index_done_callback()
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- await rag._purge_doc_chunks_and_kg(
- "doc-X",
- {"doc-X-chunk-0", "doc-X-chunk-1"},
- pipeline_status=pipeline_status,
- pipeline_status_lock=pipeline_status_lock,
- )
- # Both chunks gone from text_chunks.
- remaining = await rag.text_chunks.get_by_ids(
- ["doc-X-chunk-0", "doc-X-chunk-1"]
- )
- assert remaining == [None, None]
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_resume_purges_old_chunks_when_content_already_extracted(tmp_path):
- """When ``apipeline_process_enqueue_documents`` picks up a document
- whose content is already extracted (full_docs.format=raw with content)
- and whose doc_status carries a non-empty chunks_list from a previous
- half-finished run, the resume branch must call
- ``_purge_doc_chunks_and_kg`` with the old chunk-IDs *before* the
- chunking and entity-extraction stages run. This test wraps the
- helper so we can assert it is invoked exactly once with the expected
- inputs, then bails out so we don't have to mock the whole VLM /
- entity-extract stack.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- doc_id = compute_mdhash_id("resume.txt", prefix="doc-")
- # Seed full_docs as if extraction already completed.
- await rag.full_docs.upsert(
- {
- doc_id: {
- "content": "previously extracted body",
- "file_path": "resume.txt",
- "parse_format": "raw",
- "parse_engine": "legacy",
- "content_hash": "deadbeef",
- }
- }
- )
- # Seed doc_status as PROCESSING with chunks_list from a prior
- # half-finished run so the resume branch has something to purge.
- stale_chunks = [f"{doc_id}-chunk-{i:03d}" for i in range(2)]
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.PROCESSING,
- "content_summary": "previously extracted body",
- "content_length": len("previously extracted body"),
- "created_at": "2026-01-01T00:00:00+00:00",
- "updated_at": "2026-01-01T00:00:01+00:00",
- "file_path": "resume.txt",
- "track_id": "track-resume",
- "content_hash": "deadbeef",
- "chunks_list": stale_chunks,
- "chunks_count": len(stale_chunks),
- }
- }
- )
- # Wrap the helper to record invocations, and raise after the call
- # so the test exits cleanly without exercising downstream stages.
- calls: list[tuple[str, set[str]]] = []
- original = rag._purge_doc_chunks_and_kg
- class _ResumePurged(Exception):
- pass
- async def _wrapped(doc_id_arg, chunk_ids_arg, **kwargs):
- calls.append((doc_id_arg, set(chunk_ids_arg)))
- # Run the real helper so the side-effects (chunks gone from
- # storage) are observable, then short-circuit.
- await original(doc_id_arg, chunk_ids_arg, **kwargs)
- raise _ResumePurged()
- rag._purge_doc_chunks_and_kg = _wrapped # type: ignore[method-assign]
- # Pipeline will pick up the PROCESSING document, hit the resume
- # branch, call our wrapped purge, and our wrapper raises.
- await rag.apipeline_process_enqueue_documents()
- # Helper was invoked exactly once with the stale chunk-IDs.
- assert len(calls) == 1
- invoked_doc_id, invoked_chunks = calls[0]
- assert invoked_doc_id == doc_id
- assert invoked_chunks == set(stale_chunks)
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_resume_skips_purge_when_chunks_list_empty(tmp_path):
- """If the doc was extracted but never chunked (chunks_list empty),
- the resume branch must NOT call the purge helper — there's nothing
- to clean up.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- doc_id = compute_mdhash_id("noskip.txt", prefix="doc-")
- await rag.full_docs.upsert(
- {
- doc_id: {
- "content": "fresh body",
- "file_path": "noskip.txt",
- "parse_format": "raw",
- "parse_engine": "legacy",
- "content_hash": "fresh",
- }
- }
- )
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.PARSING,
- "content_summary": "fresh body",
- "content_length": len("fresh body"),
- "created_at": "2026-01-01T00:00:00+00:00",
- "updated_at": "2026-01-01T00:00:01+00:00",
- "file_path": "noskip.txt",
- "track_id": "track-noskip",
- "content_hash": "fresh",
- "chunks_list": [],
- "chunks_count": 0,
- }
- }
- )
- calls: list[tuple[str, set[str]]] = []
- async def _spy(doc_id_arg, chunk_ids_arg, **kwargs):
- calls.append((doc_id_arg, set(chunk_ids_arg)))
- # Don't actually purge; just record the call and let the
- # pipeline continue past this test boundary.
- raise RuntimeError("test stop after purge check")
- rag._purge_doc_chunks_and_kg = _spy # type: ignore[method-assign]
- try:
- await rag.apipeline_process_enqueue_documents()
- except Exception:
- # Whether the pipeline reaches our spy or fails downstream
- # doesn't matter for this test; we only care that the spy
- # was NOT called for an empty chunks_list.
- pass
- assert (
- calls == []
- ), "purge helper should not be called when chunks_list is empty"
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_apipeline_enqueue_allows_concurrent_with_busy(tmp_path):
- """``busy=True`` no longer blocks enqueue. Concurrent processing is
- explicitly permitted — the running loop's request_pending mechanism
- picks up newly-enqueued docs after the current batch. Enqueue
- nudges request_pending so a freshly-arrived doc is never stranded
- when the call site does not subsequently invoke
- ``apipeline_process_enqueue_documents``.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Simulate an in-flight indexing job.
- async with pipeline_status_lock:
- pipeline_status["busy"] = True
- pipeline_status["request_pending"] = False
- try:
- returned_track_id = await rag.apipeline_enqueue_documents(
- "concurrent with busy",
- file_paths="concurrent.txt",
- track_id="track-concurrent",
- )
- assert returned_track_id == "track-concurrent"
- # Enqueue nudged the running loop.
- assert pipeline_status.get("request_pending") is True
- finally:
- async with pipeline_status_lock:
- pipeline_status["busy"] = False
- pipeline_status["request_pending"] = False
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_apipeline_enqueue_rejects_when_scanning(tmp_path):
- """Scan is the only state that blocks new enqueues — scan reads
- doc_status to make classification decisions and would race with
- mid-flight writes. The last-line guard inside
- ``apipeline_enqueue_documents`` enforces this: HTTP endpoints catch
- it earlier and return 409, but core API callers must surface the
- invariant violation as a RuntimeError.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Scan classification phase rejects. ``scanning_exclusive``
- # is the field that gates the enqueue last-line guard, not
- # plain ``scanning`` (which covers the whole scan lifecycle
- # including its processing phase, where concurrent enqueue
- # is allowed).
- async with pipeline_status_lock:
- pipeline_status["scanning"] = True
- pipeline_status["scanning_exclusive"] = True
- try:
- with pytest.raises(RuntimeError, match="scan is classifying"):
- await rag.apipeline_enqueue_documents(
- "should not enqueue",
- file_paths="scan.txt",
- track_id="track-scan",
- )
- finally:
- async with pipeline_status_lock:
- pipeline_status["scanning"] = False
- pipeline_status["scanning_exclusive"] = False
- # Scan processing phase (scanning=True, scanning_exclusive=False)
- # ALLOWS concurrent enqueue — same as upload-during-busy.
- async with pipeline_status_lock:
- pipeline_status["scanning"] = True
- pipeline_status["scanning_exclusive"] = False
- try:
- track_processing = await rag.apipeline_enqueue_documents(
- "allowed during scan processing",
- file_paths="scan_processing.txt",
- track_id="track-scan-processing",
- )
- assert track_processing == "track-scan-processing"
- finally:
- async with pipeline_status_lock:
- pipeline_status["scanning"] = False
- # When idle, the same call succeeds — proving the guard is the
- # only thing blocking, not some side effect of the test setup.
- await rag.apipeline_enqueue_documents(
- "now allowed",
- file_paths="ok.txt",
- track_id="track-ok",
- )
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_enqueue_during_busy_sets_request_pending(tmp_path):
- """While the processing loop is running (busy=True), a concurrent
- enqueue must set ``request_pending`` so the loop knows to scan
- doc_status again after its current batch. This is the mechanism
- that makes "upload while pipeline is busy" actually drain the new
- work — without it, freshly enqueued docs would be stranded until
- an unrelated trigger.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- pipeline_status["busy"] = True
- pipeline_status["request_pending"] = False
- try:
- # First enqueue: nudges request_pending.
- await rag.apipeline_enqueue_documents(
- "first while busy",
- file_paths="first.txt",
- track_id="track-first",
- )
- assert pipeline_status.get("request_pending") is True
- # Second enqueue while busy: stays True (idempotent).
- async with pipeline_status_lock:
- pipeline_status["request_pending"] = False
- await rag.apipeline_enqueue_documents(
- "second while busy",
- file_paths="second.txt",
- track_id="track-second",
- )
- assert pipeline_status.get("request_pending") is True
- finally:
- async with pipeline_status_lock:
- pipeline_status["busy"] = False
- pipeline_status["request_pending"] = False
- # When idle, enqueue does NOT set request_pending — there is
- # no running loop to nudge.
- await rag.apipeline_enqueue_documents(
- "while idle",
- file_paths="idle.txt",
- track_id="track-idle",
- )
- assert pipeline_status.get("request_pending") is False
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_atomic_release_busy_or_consume_pending(tmp_path):
- """The loop-exit handoff is atomic via
- ``_atomic_release_busy_or_consume_pending``: the same critical
- section that reads ``request_pending`` also writes ``busy=False``.
- This closes the race where a concurrent enqueue could set
- ``request_pending=True`` between the loop's read of the flag and
- the finally block's ``busy=False`` write — leaving newly-enqueued
- docs stranded in PENDING with no running loop to consume them.
- The helper has two outcomes:
- * ``request_pending=True`` at entry → flag cleared, return False
- (caller must continue the loop, refetching doc_status).
- * ``request_pending=False`` at entry → ``busy`` cleared, return
- True (caller must break out without re-clearing busy).
- Tested directly because the closure pattern inside
- ``apipeline_process_enqueue_documents`` is otherwise hard to
- exercise from a unit test without orchestrating real concurrency.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Case 1: simulate the race — request_pending was set by a
- # concurrent enqueue while busy=True. Helper must consume
- # the flag and return False (continue loop) rather than
- # silently exit.
- async with pipeline_status_lock:
- pipeline_status["busy"] = True
- pipeline_status["request_pending"] = True
- released = await rag._atomic_release_busy_or_consume_pending(
- pipeline_status, pipeline_status_lock
- )
- assert released is False
- assert pipeline_status["busy"] is True # NOT released
- assert pipeline_status["request_pending"] is False # consumed
- # Case 2: clean exit path — no concurrent enqueue. Helper
- # releases busy under the SAME lock so any post-call
- # enqueue can either see busy=False (and trigger its own
- # process pass) or had to set request_pending BEFORE this
- # call (handled by Case 1). No stranded flag possible.
- async with pipeline_status_lock:
- pipeline_status["busy"] = True
- pipeline_status["request_pending"] = False
- released = await rag._atomic_release_busy_or_consume_pending(
- pipeline_status, pipeline_status_lock
- )
- assert released is True
- assert pipeline_status["busy"] is False # released
- assert pipeline_status["request_pending"] is False
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_apipeline_enqueue_rejects_when_destructive_busy(tmp_path):
- """``destructive_busy`` (set by /documents/clear and per-doc delete)
- must reject enqueue at the last-line guard. These jobs DROP
- storages and remove input files; concurrent enqueue would write to
- storages mid-drop and silently lose the document. Note: this is
- different from plain ``busy=True`` (the processing loop), which is
- explicitly compatible with concurrent enqueue.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- pipeline_status["busy"] = True
- pipeline_status["destructive_busy"] = True
- try:
- with pytest.raises(RuntimeError, match="clearing or deleting"):
- await rag.apipeline_enqueue_documents(
- "should not enqueue",
- file_paths="while_clearing.txt",
- track_id="track-clearing",
- )
- # ``from_scan`` does NOT bypass destructive_busy: scan
- # is also a writer and would race with the drop.
- with pytest.raises(RuntimeError, match="clearing or deleting"):
- await rag.apipeline_enqueue_documents(
- "should not enqueue",
- file_paths="while_clearing_scan.txt",
- track_id="track-clearing-scan",
- from_scan=True,
- )
- finally:
- async with pipeline_status_lock:
- pipeline_status["busy"] = False
- pipeline_status["destructive_busy"] = False
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_concurrent_enqueue_dedupes_same_content_different_filenames(tmp_path):
- """Two concurrent ``apipeline_enqueue_documents`` calls with the
- same content but different filenames must not both end up as
- PENDING. The dedup-and-upsert critical section is serialised by
- a workspace-scoped lock so the second call always sees the first's
- upserted row and is recorded as ``duplicate_kind=content_hash``.
- The race only matters now that concurrent enqueue is permitted
- (busy=True doesn't block, scan's processing phase doesn't block).
- Without the lock, two enqueues can both read doc_status before
- either upserts, both miss the content_hash dedup, and both write
- PENDING — bypassing the dedup that's supposed to land one of them
- as FAILED.
- Determinism trick: patch ``get_existing_doc_by_content_hash`` to
- yield via ``asyncio.sleep(0)`` before reading. This guarantees the
- asyncio scheduler interleaves the two coroutines at the dedup
- read, so without the serialise lock both would miss the existing
- row. With the lock, the second coroutine waits until the first
- has finished upserting, then sees the row.
- """
- async def _run():
- import lightrag.pipeline as pipeline_module
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- original = pipeline_module.get_existing_doc_by_content_hash
- async def yielding_get_by_content_hash(doc_status, content_hash):
- # Yield to the event loop so the SECOND enqueue gets a
- # chance to run its dedup read before we proceed. This
- # is the exact interleaving the lock must defeat.
- await asyncio.sleep(0)
- return await original(doc_status, content_hash)
- import unittest.mock
- with unittest.mock.patch.object(
- pipeline_module,
- "get_existing_doc_by_content_hash",
- yielding_get_by_content_hash,
- ):
- # Same content, two distinct filenames so the basename
- # dedup misses and the content_hash dedup is the gate.
- shared_content = "shared content for dedup race"
- results = await asyncio.gather(
- rag.apipeline_enqueue_documents(
- shared_content,
- file_paths="first.txt",
- track_id="track-first",
- ),
- rag.apipeline_enqueue_documents(
- shared_content,
- file_paths="second.txt",
- track_id="track-second",
- ),
- )
- # First call enqueues the doc and returns its track_id.
- # Second call sees the upserted row inside the
- # serialised dedup section, finds zero unique docs, and
- # returns None (the existing "no new unique docs"
- # early-exit path). The duplicate record is still
- # written to doc_status as FAILED.
- assert results[0] == "track-first"
- assert results[1] is None
- # Exactly ONE PENDING doc should exist for this content,
- # not two. The second enqueue must have been recorded as a
- # content_hash duplicate (FAILED with metadata).
- pending_docs = await rag.doc_status.get_docs_by_statuses(
- [DocStatus.PENDING]
- )
- assert len(pending_docs) == 1, (
- f"Expected exactly 1 PENDING doc after concurrent enqueue, "
- f"got {len(pending_docs)}: {list(pending_docs.keys())}"
- )
- # The duplicate record (FAILED + duplicate_kind=content_hash)
- # carries the second filename and the metadata pointer back
- # to the original.
- failed_docs = await rag.doc_status.get_docs_by_statuses([DocStatus.FAILED])
- duplicate_records = [
- d
- for d in failed_docs.values()
- if (
- getattr(d, "metadata", None)
- and d.metadata.get("duplicate_kind") == "content_hash"
- )
- ]
- assert len(duplicate_records) == 1, (
- f"Expected exactly 1 content_hash-duplicate FAILED row, "
- f"got {len(duplicate_records)}"
- )
- dup = duplicate_records[0]
- assert dup.metadata["is_duplicate"] is True
- assert dup.metadata["original_doc_id"]
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_apipeline_enqueue_from_scan_bypasses_scanning_guard(tmp_path):
- """The scan-owned background task sets ``scanning=True`` itself, so its
- own enqueue calls must be allowed through. External callers (without
- ``from_scan=True``) remain blocked. ``busy=True`` no longer rejects
- enqueue (concurrent processing is permitted under the new contract),
- so it is not exercised here.
- """
- async def _run():
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Scan classification phase: scanning_exclusive=True, but
- # ``from_scan=True`` lifts the guard so scan can enqueue
- # files it just discovered. Non-scan callers are still
- # rejected.
- async with pipeline_status_lock:
- pipeline_status["scanning"] = True
- pipeline_status["scanning_exclusive"] = True
- try:
- returned_track_id = await rag.apipeline_enqueue_documents(
- "scan-owned content",
- file_paths="scan_owned.txt",
- track_id="track-scan-owned",
- from_scan=True,
- )
- assert returned_track_id == "track-scan-owned"
- with pytest.raises(RuntimeError, match="scan is classifying"):
- await rag.apipeline_enqueue_documents(
- "external content",
- file_paths="external.txt",
- track_id="track-external",
- )
- finally:
- async with pipeline_status_lock:
- pipeline_status["scanning"] = False
- pipeline_status["scanning_exclusive"] = False
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_analyze_multimodal_overwrites_already_analyzed_items(tmp_path):
- """Re-running analyze_multimodal recomputes enabled modalities and
- overwrites any prior ``llm_analyze_result`` from the sidecar.
- """
- async def _run():
- vlm_calls = {"n": 0}
- extract_calls = {"n": 0}
- async def _vlm(_prompt, **_kwargs):
- vlm_calls["n"] += 1
- return json.dumps(
- {
- "name": "Image",
- "type": "Chart",
- "description": "details",
- }
- )
- async def _extract(_prompt, **_kwargs):
- extract_calls["n"] += 1
- return json.dumps(
- {
- "name": "Item",
- "description": "table content summary",
- }
- )
- rag = _new_rag(
- tmp_path,
- vlm_llm_model_func=_vlm,
- extract_llm_model_func=_extract,
- )
- await rag.initialize_storages()
- # Minimal blocks file with valid meta.
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- # 64x64 PNG so the image-pixel skip guard does NOT short-circuit
- # before the VLM call.
- img_path = tmp_path / "img1.png"
- import struct
- import zlib
- def _png_bytes(w: int, h: int) -> bytes:
- sig = b"\x89PNG\r\n\x1a\n"
- ihdr = struct.pack(">II", w, h) + b"\x08\x06\x00\x00\x00"
- crc = zlib.crc32(b"IHDR" + ihdr).to_bytes(4, "big")
- ihdr_chunk = struct.pack(">I", len(ihdr)) + b"IHDR" + ihdr + crc
- idat_payload = b"\x00" * (w * h * 4 + h)
- compressed = zlib.compress(idat_payload)
- crc_idat = zlib.crc32(b"IDAT" + compressed).to_bytes(4, "big")
- idat_chunk = (
- struct.pack(">I", len(compressed)) + b"IDAT" + compressed + crc_idat
- )
- iend_chunk = b"\x00\x00\x00\x00IEND\xaeB`\x82"
- return sig + ihdr_chunk + idat_chunk + iend_chunk
- img_path.write_bytes(_png_bytes(64, 64))
- # Drawings sidecar with ONE item already analyzed (status=success).
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "version": "1.0",
- "drawings": {
- "id1": {
- "id": "id1",
- "caption": "fig1",
- "path": str(img_path),
- "llm_analyze_result": {
- "name": "Existing",
- "type": "Photo",
- "description": "kept as-is",
- "analyze_time": 1700000000,
- "status": "success",
- "message": "",
- },
- }
- },
- }
- ),
- encoding="utf-8",
- )
- # Tables sidecar with one fresh item (no prior result).
- tables = tmp_path / "demo.tables.json"
- tables.write_text(
- json.dumps(
- {
- "version": "1.0",
- "tables": {
- "tbl1": {
- "id": "tbl1",
- "caption": "tbl",
- "content": "Header|Row",
- }
- },
- }
- ),
- encoding="utf-8",
- )
- parsed = {
- "doc_id": "doc-1",
- "file_path": "demo.pdf",
- "blocks_path": str(blocks),
- "content": "body",
- }
- await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="it")
- drawings_payload = json.loads(drawings.read_text(encoding="utf-8"))
- existing = drawings_payload["drawings"]["id1"]["llm_analyze_result"]
- # Existing result was overwritten by the new VLM result.
- assert existing["name"] == "Image"
- assert existing["description"] == "details"
- assert existing["status"] == "success"
- tables_payload = json.loads(tables.read_text(encoding="utf-8"))
- new_result = tables_payload["tables"]["tbl1"]["llm_analyze_result"]
- assert new_result["name"] == "Item"
- assert new_result["status"] == "success"
- # Drawings are recomputed through VLM; tables take the EXTRACT role
- # (per design §3.1), not VLM.
- assert vlm_calls["n"] == 1
- assert extract_calls["n"] == 1
- asyncio.run(_run())
- @pytest.mark.offline
- def test_enqueue_dedupes_by_filename_and_content_hash(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- # Distinct filenames with distinct content both get enqueued.
- await rag.apipeline_enqueue_documents(
- ["alpha body", "beta body"],
- file_paths=["first.txt", "second.txt"],
- track_id="track-a",
- )
- first_id = compute_mdhash_id("first.txt", prefix="doc-")
- second_id = compute_mdhash_id("second.txt", prefix="doc-")
- first_doc = await rag.full_docs.get_by_id(first_id)
- second_doc = await rag.full_docs.get_by_id(second_id)
- assert first_doc is not None
- assert second_doc is not None
- assert first_doc.get("content_hash")
- assert second_doc.get("content_hash")
- assert first_doc["content_hash"] != second_doc["content_hash"]
- # Same filename basename with new content is rejected (filename dedup).
- await rag.apipeline_enqueue_documents(
- "changed content",
- file_paths="/tmp/first.txt",
- track_id="track-b",
- )
- first_doc = await rag.full_docs.get_by_id(first_id)
- assert first_doc["content"] == "alpha body"
- # New filename but same content as an existing doc is rejected
- # (content_hash dedup).
- await rag.apipeline_enqueue_documents(
- "alpha body",
- file_paths="third.txt",
- track_id="track-c",
- )
- third_id = compute_mdhash_id("third.txt", prefix="doc-")
- assert await rag.full_docs.get_by_id(third_id) is None
- failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
- kinds = {
- getattr(doc, "metadata", {}).get("duplicate_kind")
- for doc in failed_docs.values()
- if getattr(doc, "metadata", {}).get("is_duplicate")
- }
- assert {"filename", "content_hash"}.issubset(kinds)
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_enqueue_dedupes_parser_hinted_filename_variants(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "alpha body",
- file_paths="abc.docx",
- track_id="track-a",
- )
- first_id = compute_mdhash_id("abc.docx", prefix="doc-")
- first_doc = await rag.full_docs.get_by_id(first_id)
- assert first_doc is not None
- # ``file_path`` is the canonical (hint-stripped) basename and
- # serves as the dedup key — no separate ``canonical_basename``
- # field is written.
- assert first_doc["file_path"] == "abc.docx"
- assert "canonical_basename" not in first_doc
- await rag.apipeline_enqueue_documents(
- "changed body",
- file_paths="/tmp/abc.[native].docx",
- track_id="track-b",
- )
- assert (await rag.full_docs.get_by_id(first_id))["content"] == "alpha body"
- failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
- # The duplicate record stores the canonical basename — hint is
- # not preserved anywhere in the new schema.
- assert any(
- getattr(doc, "metadata", {}).get("duplicate_kind") == "filename"
- and getattr(doc, "file_path", "") == "abc.docx"
- for doc in failed_docs.values()
- )
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_delete_result_uses_canonical_file_path(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "",
- file_paths=str(tmp_path / "abc.[native].docx"),
- docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
- parse_engine=PARSER_ENGINE_NATIVE,
- track_id="track-delete-source",
- )
- doc_id = compute_mdhash_id("abc.docx", prefix="doc-")
- result = await rag.adelete_by_doc_id(doc_id)
- assert result.status == "success"
- # New schema: file_path is the canonical (hint-stripped)
- # basename; the ``source_path`` field is no longer carried on
- # DeletionResult.
- assert result.file_path == "abc.docx"
- assert not hasattr(result, "source_path")
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_enqueue_without_file_paths_uses_content_ids(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- docs = ["alpha without source", "beta without source"]
- await rag.apipeline_enqueue_documents(docs, track_id="track-no-source")
- for content in docs:
- doc_id = compute_mdhash_id(content, prefix="doc-")
- full_doc = await rag.full_docs.get_by_id(doc_id)
- status = await rag.doc_status.get_by_id(doc_id)
- assert full_doc is not None
- assert status is not None
- assert full_doc["content"] == content
- assert full_doc["file_path"] == "unknown_source"
- assert status["file_path"] == "unknown_source"
- assert full_doc.get("content_hash")
- assert status.get("content_hash") == full_doc.get("content_hash")
- failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
- duplicate_failures = [
- doc
- for doc in failed_docs.values()
- if getattr(doc, "track_id", "") == "track-no-source"
- and getattr(doc, "metadata", {}).get("is_duplicate")
- ]
- assert duplicate_failures == []
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_legacy_empty_file_paths_do_not_block_unsourced_insert(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.doc_status.upsert(
- {
- "legacy-empty": {
- "status": DocStatus.PROCESSED,
- "content_summary": "legacy empty",
- "content_length": 0,
- "file_path": "",
- "track_id": "legacy",
- "created_at": "2025-01-01T00:00:00+00:00",
- "updated_at": "2025-01-01T00:00:00+00:00",
- "chunks_list": [],
- },
- "legacy-no-file": {
- "status": DocStatus.PROCESSED,
- "content_summary": "legacy no-file",
- "content_length": 0,
- "file_path": "no-file-path",
- "track_id": "legacy",
- "created_at": "2025-01-01T00:00:00+00:00",
- "updated_at": "2025-01-01T00:00:00+00:00",
- "chunks_list": [],
- },
- }
- )
- content = "fresh unsourced body"
- await rag.apipeline_enqueue_documents(content, track_id="track-fresh")
- doc_id = compute_mdhash_id(content, prefix="doc-")
- full_doc = await rag.full_docs.get_by_id(doc_id)
- status = await rag.doc_status.get_by_id(doc_id)
- assert full_doc is not None
- assert status is not None
- assert full_doc["file_path"] == "unknown_source"
- assert status["file_path"] == "unknown_source"
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_basename_lookup_requires_canonical_stored_file_path(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- # Storage does not normalize or apply legacy basename fallback.
- # Business-layer writes must persist the canonical basename.
- noncanonical_id = "doc-noncanonical-1"
- await rag.doc_status.upsert(
- {
- noncanonical_id: {
- "status": DocStatus.PROCESSED,
- "content_summary": "noncanonical",
- "content_length": 7,
- "file_path": "/inputs/legacy.txt",
- "track_id": "noncanonical-track",
- "created_at": "2025-01-01T00:00:00+00:00",
- "updated_at": "2025-01-01T00:00:00+00:00",
- "chunks_list": [],
- }
- }
- )
- match = await rag.doc_status.get_doc_by_file_basename("legacy.txt")
- assert match is None
- # Re-enqueueing through the business path stores the canonical
- # basename and is not blocked by a noncanonical storage row.
- await rag.apipeline_enqueue_documents(
- "fresh body",
- file_paths="legacy.txt",
- track_id="track-x",
- )
- new_id = compute_mdhash_id("legacy.txt", prefix="doc-")
- full_doc = await rag.full_docs.get_by_id(new_id)
- status = await rag.doc_status.get_by_id(new_id)
- assert full_doc is not None
- assert status is not None
- assert full_doc["file_path"] == "legacy.txt"
- assert status["file_path"] == "legacy.txt"
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_content_hash_lookup_via_storage(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "shared body",
- file_paths="alpha.txt",
- track_id="track-a",
- )
- alpha_id = compute_mdhash_id("alpha.txt", prefix="doc-")
- alpha_full = await rag.full_docs.get_by_id(alpha_id)
- assert alpha_full is not None
- content_hash = alpha_full["content_hash"]
- match = await rag.doc_status.get_doc_by_content_hash(content_hash)
- assert match is not None
- doc_id, _ = match
- assert doc_id == alpha_id
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_lightrag_format_uses_blocks_file_hash(tmp_path, monkeypatch):
- async def _run():
- input_dir = tmp_path / "input"
- parsed_dir = input_dir / "__parsed__"
- parsed_dir.mkdir(parents=True)
- monkeypatch.setenv("INPUT_DIR", str(input_dir))
- rag = _new_rag(tmp_path / "work")
- rag.workspace = "test-pending-parse-duplicate"
- await rag.initialize_storages()
- try:
- blocks_path = parsed_dir / "doc.blocks.jsonl"
- blocks_path.write_text(
- json.dumps({"type": "header"})
- + "\n"
- + json.dumps({"type": "content", "text": "hello"})
- + "\n",
- encoding="utf-8",
- )
- # Enqueue twice with different filenames pointing at the same
- # blocks file: the second one must be rejected as content_hash dup.
- # ``content`` arg is ignored on the LIGHTRAG path — the LightRAG
- # Document file is read to derive both content_hash and the
- # ``{{LRdoc}}`` summary — so any string here is fine.
- await rag.apipeline_enqueue_documents(
- "",
- file_paths="first.lightrag",
- docs_format="lightrag",
- lightrag_document_paths="__parsed__/doc.blocks.jsonl",
- track_id="track-a",
- )
- await rag.apipeline_enqueue_documents(
- "",
- file_paths="second.lightrag",
- docs_format="lightrag",
- lightrag_document_paths="__parsed__/doc.blocks.jsonl",
- track_id="track-b",
- )
- second_id = compute_mdhash_id("second.lightrag", prefix="doc-")
- assert await rag.full_docs.get_by_id(second_id) is None
- failed = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
- kinds = {
- getattr(doc, "metadata", {}).get("duplicate_kind")
- for doc in failed.values()
- if getattr(doc, "metadata", {}).get("is_duplicate")
- }
- assert "content_hash" in kinds
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_persist_parsed_full_docs_syncs_hash_to_doc_status(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- # Enqueue a pending_parse record: no content_hash should exist yet.
- await rag.apipeline_enqueue_documents(
- "",
- file_paths="pending.txt",
- docs_format="pending_parse",
- track_id="track-pending",
- )
- doc_id = compute_mdhash_id("pending.txt", prefix="doc-")
- full_doc = await rag.full_docs.get_by_id(doc_id)
- assert full_doc is not None
- assert full_doc.get("content_hash") in (None, "")
- status_pre = await rag.doc_status.get_by_id(doc_id)
- assert (status_pre or {}).get("content_hash") in (None, "")
- # Simulate a parse_* completion that converts the record to RAW.
- content = "extracted body text"
- await rag._persist_parsed_full_docs(
- doc_id,
- {
- "content": content,
- "file_path": "pending.txt",
- "parse_format": "raw",
- "parse_engine": "native",
- },
- )
- full_doc = await rag.full_docs.get_by_id(doc_id)
- status_post = await rag.doc_status.get_by_id(doc_id)
- expected_hash = compute_mdhash_id(content, prefix="")
- assert full_doc["content_hash"] == expected_hash
- assert (status_post or {}).get("content_hash") == expected_hash
- # The hash should be queryable via the storage index.
- match = await rag.doc_status.get_doc_by_content_hash(expected_hash)
- assert match is not None
- assert match[0] == doc_id
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_persist_parsed_full_docs_preserves_pending_metadata(tmp_path):
- """``_persist_parsed_full_docs`` must keep process_options seeded at
- enqueue time so downstream stages (analyze_multimodal, chunking
- selection, KG-skip) still see the user's original opt-ins after the
- parse-result record overwrites the pending_parse row.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "",
- file_paths="report.[native-iet!].docx",
- docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
- parse_engine=PARSER_ENGINE_NATIVE,
- process_options="iet!",
- track_id="track-merge",
- )
- doc_id = compute_mdhash_id("report.docx", prefix="doc-")
- pre = await rag.full_docs.get_by_id(doc_id)
- assert pre is not None
- assert pre.get("process_options") == "iet!"
- assert "canonical_basename" not in pre
- assert pre.get("file_path") == "report.docx"
- # Simulate a parse_* completion: pass only the fresh fields the
- # parsers actually emit and verify that pre-existing metadata
- # survives the upsert.
- await rag._persist_parsed_full_docs(
- doc_id,
- {
- "content": "extracted body",
- "file_path": "report.docx",
- "parse_format": "raw",
- "parse_engine": PARSER_ENGINE_NATIVE,
- "update_time": 12345,
- },
- )
- post = await rag.full_docs.get_by_id(doc_id)
- assert post is not None
- # Parser-supplied fields take precedence...
- assert post["content"] == "extracted body"
- assert post["parse_format"] == "raw"
- # ...while metadata seeded at enqueue time is preserved.
- assert post.get("process_options") == "iet!"
- assert post.get("file_path") == "report.docx"
- # And content_hash is freshly computed from the parsed body.
- assert post["content_hash"] == compute_mdhash_id(
- "extracted body", prefix=""
- )
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_state_machine_upsert_preserves_content_hash(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "raw body",
- file_paths="alpha.txt",
- track_id="track-state",
- )
- doc_id = compute_mdhash_id("alpha.txt", prefix="doc-")
- initial_hash = (await rag.doc_status.get_by_id(doc_id))["content_hash"]
- assert initial_hash
- # Simulate the production state-machine upsert pattern: read
- # status_doc, then write a new payload that includes content_hash.
- status_doc = (await rag.doc_status.get_docs_by_status(DocStatus.PENDING))[
- doc_id
- ]
- for next_status in (
- DocStatus.PARSING,
- DocStatus.ANALYZING,
- DocStatus.PROCESSED,
- ):
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": next_status,
- "content_summary": status_doc.content_summary,
- "content_length": status_doc.content_length,
- "created_at": status_doc.created_at,
- "updated_at": "now",
- "file_path": status_doc.file_path,
- "track_id": status_doc.track_id,
- "content_hash": status_doc.content_hash,
- }
- }
- )
- current = await rag.doc_status.get_by_id(doc_id)
- assert current.get("content_hash") == initial_hash
- # And the index lookup must still return this doc.
- match = await rag.doc_status.get_doc_by_content_hash(initial_hash)
- assert match is not None and match[0] == doc_id
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_pending_parse_duplicate_hash_fails_and_archives_source(tmp_path, monkeypatch):
- """Two PENDING_PARSE docx files with identical extracted bodies must be
- detected as content_hash duplicates and the loser archived.
- ``content_hash`` for LIGHTRAG-format docs is the MD5 of the normalized
- ``merged_text`` (sidecar item ids and ``<base>.blocks.assets/`` prefixes
- stripped via :func:`normalize_merged_text_for_hash`), so identical
- bodies under different filenames produce the same hash and
- ``_mark_duplicate_after_parse`` fires.
- """
- async def _run():
- input_dir = tmp_path / "input"
- input_dir.mkdir()
- monkeypatch.setenv("INPUT_DIR", str(input_dir))
- rag = _new_rag(tmp_path / "work")
- await rag.initialize_storages()
- try:
- from datetime import datetime, timezone
- import lightrag.lightrag as lightrag_module
- import lightrag.pipeline as pipeline_module
- class _FrozenDateTime(datetime):
- @classmethod
- def now(cls, tz=None): # noqa: D401
- return datetime(2026, 1, 1, tzinfo=tz or timezone.utc)
- monkeypatch.setattr(lightrag_module, "datetime", _FrozenDateTime)
- monkeypatch.setattr(pipeline_module, "datetime", _FrozenDateTime)
- # Both docx files emit the same blocks list, so combined with the
- # frozen datetime the resulting .blocks.jsonl bytes are equal.
- stable_block = {
- "uuid": "p1",
- "uuid_end": "p1",
- "heading": "",
- "content": "same extracted body",
- "type": "text",
- "parent_headings": [],
- "level": 0,
- "table_chunk_role": "none",
- }
- monkeypatch.setattr(
- "lightrag.parser.docx.parse_document.extract_docx_blocks",
- lambda *args, **kwargs: [dict(stable_block)],
- )
- # First original docx: enqueue, parse, mark PROCESSED.
- original_path = input_dir / "original.docx"
- original_path.write_bytes(b"original docx bytes")
- await rag.apipeline_enqueue_documents(
- "",
- file_paths=str(original_path),
- docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
- parse_engine=PARSER_ENGINE_NATIVE,
- track_id="track-original",
- )
- await rag.apipeline_process_enqueue_documents()
- original_id = compute_mdhash_id("original.docx", prefix="doc-")
- original_status = await rag.doc_status.get_by_id(original_id)
- assert original_status is not None
- original_status["status"] = DocStatus.PROCESSED
- await rag.doc_status.upsert({original_id: original_status})
- # Second docx: distinct filename so filename dedup misses, but
- # content_hash should match the first because content_list +
- # frozen datetime → identical .blocks.jsonl bytes.
- source_path = input_dir / "duplicate.docx"
- source_path.write_bytes(b"docx bytes")
- async def _fail_extract(*args, **kwargs):
- raise AssertionError("duplicate document should not reach extraction")
- monkeypatch.setattr(rag, "_process_extract_entities", _fail_extract)
- await rag.apipeline_enqueue_documents(
- "",
- file_paths=str(source_path),
- docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
- parse_engine=PARSER_ENGINE_NATIVE,
- track_id="track-dup",
- )
- await rag.apipeline_process_enqueue_documents()
- duplicate_id = compute_mdhash_id("duplicate.docx", prefix="doc-")
- duplicate_status = await rag.doc_status.get_by_id(duplicate_id)
- assert duplicate_status["status"] == DocStatus.FAILED
- assert duplicate_status["metadata"]["is_duplicate"] is True
- assert duplicate_status["metadata"]["duplicate_kind"] == "content_hash"
- assert duplicate_status["metadata"]["original_doc_id"] == original_id
- assert not source_path.exists()
- assert (input_dir / PARSED_DIR_NAME / source_path.name).exists()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_parser_routing_accepts_semicolon_rules(monkeypatch):
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
- rules = "*:mineru;html:docling"
- validate_parser_routing_config(rules)
- assert resolve_file_parser_engine("paper.pdf", parser_rules=rules) == "mineru"
- assert resolve_file_parser_engine("index.html", parser_rules=rules) == "docling"
- assert resolve_file_parser_engine("notes.txt", parser_rules=rules) == "legacy"
- @pytest.mark.offline
- def test_parser_routing_validation_requires_external_endpoints(monkeypatch):
- monkeypatch.delenv("MINERU_LOCAL_ENDPOINT", raising=False)
- monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
- with pytest.raises(ParserRoutingConfigError, match="MINERU_LOCAL_ENDPOINT"):
- validate_parser_routing_config("*:mineru;html:docling")
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- monkeypatch.delenv("DOCLING_ENDPOINT", raising=False)
- with pytest.raises(ParserRoutingConfigError, match="DOCLING_ENDPOINT"):
- validate_parser_routing_config("*:mineru;html:docling")
- @pytest.mark.offline
- def test_parser_routing_validation_honors_mineru_api_mode(monkeypatch):
- monkeypatch.setenv("DOCLING_ENDPOINT", "http://fake-docling")
- monkeypatch.setenv("MINERU_API_MODE", "official")
- monkeypatch.delenv("MINERU_API_TOKEN", raising=False)
- with pytest.raises(ParserRoutingConfigError, match="MINERU_API_TOKEN"):
- validate_parser_routing_config("pdf:mineru")
- monkeypatch.setenv("MINERU_API_TOKEN", "token")
- validate_parser_routing_config("pdf:mineru")
- monkeypatch.setenv("MINERU_API_MODE", "local")
- monkeypatch.delenv("MINERU_LOCAL_ENDPOINT", raising=False)
- with pytest.raises(ParserRoutingConfigError, match="MINERU_LOCAL_ENDPOINT"):
- validate_parser_routing_config("pdf:mineru")
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-local")
- validate_parser_routing_config("pdf:mineru")
- @pytest.mark.offline
- def test_parser_routing_validation_rejects_invalid_rules(monkeypatch):
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- with pytest.raises(ParserRoutingConfigError, match=r"\*\.pdf"):
- validate_parser_routing_config("*.pdf:mineru")
- with pytest.raises(ParserRoutingConfigError, match="unsupported parser engine"):
- validate_parser_routing_config("pdf:unknown")
- with pytest.raises(ParserRoutingConfigError, match="does not match any suffix"):
- validate_parser_routing_config("pdf:native")
- @pytest.mark.offline
- def test_three_phase_status_flow(tmp_path, monkeypatch):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- async def _fake_extract(*args, **kwargs):
- return []
- async def _fake_merge(*args, **kwargs):
- return None
- async def _fake_parse_native(doc_id, file_path, content_data):
- return {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": "raw",
- "content": "hello world",
- "blocks_path": "",
- }
- async def _fake_analyze(doc_id, file_path, parsed_data, **kwargs):
- parsed_data["multimodal_processed"] = True
- return parsed_data
- monkeypatch.setattr(rag, "_process_extract_entities", _fake_extract)
- monkeypatch.setattr("lightrag.pipeline.merge_nodes_and_edges", _fake_merge)
- monkeypatch.setattr(rag, "parse_native", _fake_parse_native)
- monkeypatch.setattr(rag, "analyze_multimodal", _fake_analyze)
- status_seq: list[str] = []
- original_upsert = rag.doc_status.upsert
- async def _record_upsert(data):
- for _, val in data.items():
- if isinstance(val, dict) and "status" in val:
- status_seq.append(str(val["status"]))
- return await original_upsert(data)
- monkeypatch.setattr(rag.doc_status, "upsert", _record_upsert)
- await rag.apipeline_enqueue_documents("sample text", file_paths="s.txt")
- await rag.apipeline_process_enqueue_documents()
- joined = " ".join(status_seq)
- assert "DocStatus.PARSING" in joined
- assert "DocStatus.ANALYZING" in joined
- assert "DocStatus.PROCESSING" in joined
- assert "DocStatus.PROCESSED" in joined
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_analyze_multimodal_invalid_json_hard_fails(tmp_path):
- """An invalid VLM response is a hard failure under the new contract:
- the sidecar item gets status='failure' and MultimodalAnalysisError
- bubbles up so the document fails (no silent conservative fallback)."""
- async def _run():
- calls = {"n": 0}
- async def _broken_vlm(prompt, **kwargs):
- calls["n"] += 1
- return "not-json"
- rag = _new_rag(tmp_path, vlm_llm_model_func=_broken_vlm)
- await rag.initialize_storages()
- # 64x64 PNG so the image-pixel skip guard does NOT short-circuit
- # before the VLM call.
- img_path = tmp_path / "img1.png"
- import struct
- import zlib
- def _png_bytes(w: int, h: int) -> bytes:
- sig = b"\x89PNG\r\n\x1a\n"
- ihdr = struct.pack(">II", w, h) + b"\x08\x06\x00\x00\x00"
- crc = zlib.crc32(b"IHDR" + ihdr).to_bytes(4, "big")
- ihdr_chunk = struct.pack(">I", len(ihdr)) + b"IHDR" + ihdr + crc
- idat_payload = b"\x00" * (w * h * 4 + h)
- compressed = zlib.compress(idat_payload)
- crc_idat = zlib.crc32(b"IDAT" + compressed).to_bytes(4, "big")
- idat_chunk = (
- struct.pack(">I", len(compressed)) + b"IDAT" + compressed + crc_idat
- )
- iend_chunk = b"\x00\x00\x00\x00IEND\xaeB`\x82"
- return sig + ihdr_chunk + idat_chunk + iend_chunk
- img_path.write_bytes(_png_bytes(64, 64))
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "version": "1.0",
- "drawings": {
- "id1": {
- "id": "id1",
- "caption": "图1 测试图",
- "footnotes": [],
- "path": str(img_path),
- }
- },
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- parsed = {
- "doc_id": "doc-1",
- "file_path": "demo.pdf",
- "blocks_path": str(blocks),
- "content": "body",
- }
- from lightrag.exceptions import MultimodalAnalysisError
- with pytest.raises(MultimodalAnalysisError):
- await rag.analyze_multimodal(
- "doc-1", "demo.pdf", parsed, process_options="i"
- )
- drawings_payload = json.loads(drawings.read_text(encoding="utf-8"))
- result = drawings_payload["drawings"]["id1"]["llm_analyze_result"]
- # No retry: VLM mock called exactly once.
- assert calls["n"] == 1
- # Sidecar carries a failure marker so a re-run sees the prior failure
- # and does not silently consume it.
- assert result["status"] == "failure"
- assert "missing or invalid field" in result["message"]
- asyncio.run(_run())
- @pytest.mark.offline
- def test_analyze_multimodal_uses_effective_vlm_max_async_when_role_none(tmp_path):
- async def _run():
- rag = _new_rag(
- tmp_path,
- llm_model_max_async=3,
- vlm_llm_model_max_async=None,
- )
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- json.dumps({"type": "meta", "format_version": "1.0"}) + "\n",
- encoding="utf-8",
- )
- parsed = {
- "doc_id": "doc-1",
- "file_path": "demo.pdf",
- "blocks_path": str(blocks),
- "content": "body",
- }
- result = await rag.analyze_multimodal(
- "doc-1", "demo.pdf", parsed, process_options="ite"
- )
- assert result["multimodal_processed"] is True
- asyncio.run(_run())
- @pytest.mark.offline
- def test_safe_vdb_operation_times_out_with_context():
- async def _run():
- async def _hang():
- await asyncio.sleep(0.2)
- with pytest.raises(TimeoutError) as exc_info:
- await safe_vdb_operation_with_exception(
- operation=_hang,
- operation_name="relationship_upsert",
- entity_name="A->B",
- max_retries=1,
- retry_delay=0,
- timeout_seconds=0.05,
- )
- assert "relationship_upsert" in str(exc_info.value)
- assert "A->B" in str(exc_info.value)
- assert "timeout" in str(exc_info.value).lower()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_relationship_vdb_timeout_has_120s_floor():
- assert _get_relationship_vdb_timeout_seconds({}) == 120.0
- assert (
- _get_relationship_vdb_timeout_seconds({"default_embedding_timeout": 10})
- == 120.0
- )
- assert (
- _get_relationship_vdb_timeout_seconds({"default_embedding_timeout": 50})
- == 150.0
- )
- @pytest.mark.offline
- def test_analyze_multimodal_unknown_image_type_folds_to_other(tmp_path):
- """Model output with an out-of-enum ``type`` is folded to ``Other``
- instead of failing the document (per design §3.4)."""
- async def _run():
- async def _vlm(_prompt, **_kwargs):
- return json.dumps(
- {
- "name": "Figure A",
- "type": "diagram", # not in IMAGE_TYPE_ENUM
- "description": "details",
- },
- ensure_ascii=False,
- )
- rag = _new_rag(tmp_path, vlm_llm_model_func=_vlm)
- await rag.initialize_storages()
- import struct
- import zlib
- def _png(w, h):
- sig = b"\x89PNG\r\n\x1a\n"
- ihdr = struct.pack(">II", w, h) + b"\x08\x06\x00\x00\x00"
- crc = zlib.crc32(b"IHDR" + ihdr).to_bytes(4, "big")
- return sig + struct.pack(">I", len(ihdr)) + b"IHDR" + ihdr + crc
- img_path = tmp_path / "img1.png"
- img_path.write_bytes(_png(64, 64))
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "version": "1.0",
- "drawings": {
- "id1": {
- "id": "id1",
- "caption": "图1 测试图",
- "footnotes": [],
- "path": str(img_path),
- }
- },
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- parsed = {
- "doc_id": "doc-1",
- "file_path": "demo.pdf",
- "blocks_path": str(blocks),
- "content": "body",
- }
- await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="i")
- payload = json.loads(drawings.read_text(encoding="utf-8"))
- result = payload["drawings"]["id1"]["llm_analyze_result"]
- assert result["status"] == "success"
- assert result["type"] == "Other"
- assert result["description"] == "details"
- assert "analyze_time" in result
- asyncio.run(_run())
- @pytest.mark.offline
- def test_analyze_multimodal_skips_tiny_image_without_vlm_call(tmp_path):
- """Images smaller than VLM_MIN_IMAGE_PIXEL (default 32px) are flagged
- status=skipped without invoking the VLM."""
- async def _run():
- calls = {"n": 0}
- async def _vlm(_prompt, **_kwargs):
- calls["n"] += 1
- return "{}"
- rag = _new_rag(tmp_path, vlm_llm_model_func=_vlm)
- await rag.initialize_storages()
- # 1x1 PNG.
- img_path = tmp_path / "tiny.png"
- img_path.write_bytes(
- b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01"
- b"\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\x9cc`\x00\x00"
- b"\x00\x02\x00\x01\xe2!\xbc3\x00\x00\x00\x00IEND\xaeB`\x82"
- )
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "version": "1.0",
- "drawings": {
- "id1": {
- "id": "id1",
- "caption": "tiny",
- "path": str(img_path),
- }
- },
- }
- ),
- encoding="utf-8",
- )
- parsed = {
- "doc_id": "doc-1",
- "file_path": "demo.pdf",
- "blocks_path": str(blocks),
- "content": "body",
- }
- await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="i")
- payload = json.loads(drawings.read_text(encoding="utf-8"))
- result = payload["drawings"]["id1"]["llm_analyze_result"]
- assert result["status"] == "skipped"
- assert "smaller than" in result["message"]
- assert calls["n"] == 0
- asyncio.run(_run())
- @pytest.mark.offline
- def test_write_lightrag_document_preserves_headings_and_table_dimensions(
- tmp_path, monkeypatch
- ):
- async def _run():
- monkeypatch.setenv("INPUT_DIR", str(tmp_path))
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- source_path = tmp_path / "demo.docx"
- source_path.write_bytes(b"docx bytes")
- content_list = [
- {"type": "section_header", "text": "第一章 绪论", "level": 1},
- {"type": "section_header", "text": "1.1 研究背景", "level": 2},
- {"type": "text", "text": "这是正文段落。"},
- {
- "type": "table",
- "table_caption": ["表1 指标说明"],
- "table_body": {
- "num_rows": 2,
- "num_cols": 3,
- "grid": [
- [{"text": "符号"}, {"text": "含义"}, {"text": "单位"}],
- [{"text": "A"}, {"text": "面积"}, {"text": "m2"}],
- ],
- },
- },
- {
- "type": "image",
- "img_path": "/tmp/a.png",
- "image_caption": ["图1 架构图"],
- },
- ]
- parsed = await rag._write_lightrag_document_from_content_list(
- doc_id="doc-1",
- file_path="demo.docx",
- content_list=content_list,
- engine="docling",
- )
- blocks_path = Path(parsed["blocks_path"])
- assert blocks_path == (
- tmp_path / PARSED_DIR_NAME / "demo.docx.parsed" / "demo.blocks.jsonl"
- )
- assert not source_path.exists()
- assert (tmp_path / PARSED_DIR_NAME / source_path.name).exists()
- blocks = [
- json.loads(line)
- for line in blocks_path.read_text(encoding="utf-8").splitlines()
- ]
- content_blocks = blocks[1:]
- body_block = next(x for x in content_blocks if x["content"] == "这是正文段落。")
- table_block = next(
- x for x in content_blocks if 'refid="tb-1-0001"' in x["content"]
- )
- image_block = next(
- x for x in content_blocks if 'id="im-1-0001"' in x["content"]
- )
- assert body_block["heading"] == "1.1 研究背景"
- assert body_block["parent_headings"] == ["第一章 绪论"]
- assert table_block["heading"] == "1.1 研究背景"
- assert image_block["heading"] == "1.1 研究背景"
- base = str(blocks_path)[: -len(".blocks.jsonl")]
- tables = json.loads(Path(base + ".tables.json").read_text(encoding="utf-8"))
- table_entry = tables["tables"]["tb-1-0001"]
- assert table_entry["heading"] == "1.1 研究背景"
- assert table_entry["dimension"] == [2, 3]
- assert table_entry["format"] == "json"
- assert json.loads(table_entry["content"]) == [
- ["符号", "含义", "单位"],
- ["A", "面积", "m2"],
- ]
- drawings = json.loads(Path(base + ".drawings.json").read_text(encoding="utf-8"))
- assert drawings["drawings"]["im-1-0001"]["heading"] == "1.1 研究背景"
- full_doc = await rag.full_docs.get_by_id("doc-1")
- expected_sidecar_dir = (
- tmp_path / PARSED_DIR_NAME / "demo.docx.parsed"
- ).resolve()
- assert full_doc["sidecar_location"].startswith("file://")
- assert full_doc["sidecar_location"].endswith("/")
- assert str(expected_sidecar_dir) in full_doc["sidecar_location"]
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_write_lightrag_document_strips_parser_hint_from_artifact_names(
- tmp_path, monkeypatch
- ):
- async def _run():
- monkeypatch.setenv("INPUT_DIR", str(tmp_path))
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- source_path = tmp_path / "demo.[native].docx"
- source_path.write_bytes(b"docx bytes")
- parsed = await rag._write_lightrag_document_from_content_list(
- doc_id="doc-hinted",
- file_path="demo.[native].docx",
- content_list=[{"type": "text", "text": "body"}],
- engine="native",
- )
- blocks_path = Path(parsed["blocks_path"])
- assert blocks_path == (
- tmp_path / PARSED_DIR_NAME / "demo.docx.parsed" / "demo.blocks.jsonl"
- )
- assert not source_path.exists()
- assert (tmp_path / PARSED_DIR_NAME / source_path.name).exists()
- full_doc = await rag.full_docs.get_by_id("doc-hinted")
- expected_sidecar_dir = (
- tmp_path / PARSED_DIR_NAME / "demo.docx.parsed"
- ).resolve()
- assert full_doc["sidecar_location"].startswith("file://")
- assert full_doc["sidecar_location"].endswith("/")
- assert str(expected_sidecar_dir) in full_doc["sidecar_location"]
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_analyze_multimodal_table_without_image_uses_textual_analysis(tmp_path):
- async def _run():
- # Tables now route to the EXTRACT role, not VLM (per design §3.1).
- async def _extract(_prompt, **_kwargs):
- return json.dumps(
- {
- "name": "model_benchmark_metrics",
- "description": "表格包含三列,分别为符号、代表意义和单位,列出了 A、F、e 等符号。",
- },
- ensure_ascii=False,
- )
- async def _vlm_unused(_prompt, **_kwargs):
- raise AssertionError("VLM must not be called for tables")
- rag = _new_rag(
- tmp_path,
- vlm_llm_model_func=_vlm_unused,
- extract_llm_model_func=_extract,
- )
- await rag.initialize_storages()
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- tables = tmp_path / "demo.tables.json"
- tables.write_text(
- json.dumps(
- {
- "version": "1.0",
- "tables": {
- "id1": {
- "id": "id1",
- "caption": "表1 指标说明",
- "footnotes": ["单位:国际标准单位"],
- "content": "<table><tr><th>符号</th><th>代表意义</th><th>单位</th></tr><tr><td>A</td><td>面积</td><td>m2</td></tr></table>",
- }
- },
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- parsed = {
- "doc_id": "doc-1",
- "file_path": "demo.pdf",
- "blocks_path": str(blocks),
- "content": "body",
- }
- await rag.analyze_multimodal("doc-1", "demo.pdf", parsed, process_options="t")
- payload = json.loads(tables.read_text(encoding="utf-8"))
- result = payload["tables"]["id1"]["llm_analyze_result"]
- assert result["status"] == "success"
- assert result["name"] == "model_benchmark_metrics"
- assert "符号、代表意义和单位" in result["description"]
- # Cache id was written back so document delete can clean it up.
- assert any(
- cid.startswith("default:analysis:")
- for cid in payload["tables"]["id1"].get("llm_cache_list", [])
- )
- asyncio.run(_run())
- @pytest.mark.offline
- def test_parser_source_resolver_finds_hint_variant_by_canonical_name(
- tmp_path, monkeypatch
- ):
- input_dir = tmp_path / "input"
- input_dir.mkdir()
- monkeypatch.setenv("INPUT_DIR", str(input_dir))
- hinted = input_dir / "demo.[mineru].pdf"
- hinted.write_bytes(b"fake-pdf")
- rag = _new_rag(tmp_path / "work")
- resolved = rag._resolve_source_file_for_parser(
- "demo.pdf",
- parser_engine=PARSER_ENGINE_MINERU,
- )
- assert Path(resolved) == hinted
- @pytest.mark.offline
- def test_parser_source_resolver_prefers_exact_canonical_file(tmp_path, monkeypatch):
- input_dir = tmp_path / "input"
- input_dir.mkdir()
- monkeypatch.setenv("INPUT_DIR", str(input_dir))
- exact = input_dir / "demo.pdf"
- hinted = input_dir / "demo.[mineru].pdf"
- exact.write_bytes(b"canonical")
- hinted.write_bytes(b"hinted")
- rag = _new_rag(tmp_path / "work")
- resolved = rag._resolve_source_file_for_parser(
- "demo.pdf",
- parser_engine=PARSER_ENGINE_MINERU,
- )
- assert Path(resolved) == exact
- @pytest.mark.offline
- def test_parse_mineru_to_lightrag_document(tmp_path, monkeypatch):
- """End-to-end: parse_mineru routes through MinerURawClient + sidecar
- writer and produces spec-compliant *.parsed/ + *.mineru_raw/ artifacts.
- With the unified pipeline (introduced alongside the MinerU raw bundle
- cache), the MinerU download choreography happens inside
- :meth:`MinerURawClient.download_into`. We stub that method directly.
- """
- from lightrag.parser.external.mineru import compute_size_and_hash
- from lightrag.parser.external.mineru.cache import current_mineru_options_signature
- from lightrag.parser.external.mineru.client import MinerURawClient
- from lightrag.parser.external.mineru.manifest import (
- Manifest,
- ManifestFile,
- write_manifest,
- )
- async def _run():
- input_dir = tmp_path / "input"
- input_dir.mkdir()
- monkeypatch.setenv("INPUT_DIR", str(input_dir))
- rag = _new_rag(tmp_path / "work")
- await rag.initialize_storages()
- src_file = input_dir / "demo.pdf"
- src_file.write_bytes(b"fake-pdf")
- async def _fake_download(self, raw_dir, source_file_path, **_kwargs):
- assert source_file_path == src_file
- raw_dir.mkdir(parents=True, exist_ok=True)
- content_list = [
- {"type": "text", "text": "第一段正文"},
- {
- "type": "image",
- "img_path": "assets/img1.png",
- "image_caption": ["图1 架构图"],
- "image_footnote": ["示意图"],
- },
- {
- "type": "table",
- "table_body": "<table><tr><td>A</td></tr></table>",
- "table_caption": ["表1 指标"],
- "table_footnote": ["单位:%"],
- },
- {"type": "equation", "text": "$$E=mc^2$$"},
- ]
- (raw_dir / "content_list.json").write_text(
- json.dumps(content_list, ensure_ascii=False),
- encoding="utf-8",
- )
- (raw_dir / "assets").mkdir()
- (raw_dir / "assets" / "img1.png").write_bytes(b"\x89PNGfake")
- src_size, src_hash = compute_size_and_hash(source_file_path)
- crit_size, crit_hash = compute_size_and_hash(raw_dir / "content_list.json")
- manifest = Manifest(
- source_content_hash=src_hash,
- source_size_bytes=src_size,
- source_filename_at_parse=source_file_path.name,
- critical_file=ManifestFile(
- path="content_list.json", size=crit_size, sha256=crit_hash
- ),
- files=[
- ManifestFile(
- path="assets/img1.png",
- size=(raw_dir / "assets" / "img1.png").stat().st_size,
- )
- ],
- total_size_bytes=crit_size,
- task_id="fake-task",
- api_mode="local",
- options_signature=current_mineru_options_signature(),
- )
- write_manifest(raw_dir, manifest)
- return manifest
- monkeypatch.setattr(MinerURawClient, "download_into", _fake_download)
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- parsed = await rag.parse_mineru(
- doc_id="doc-1",
- file_path=str(src_file),
- content_data={"content": ""},
- )
- assert parsed["parse_format"] == "lightrag"
- assert parsed["blocks_path"]
- blocks_path = Path(parsed["blocks_path"])
- assert blocks_path.exists()
- lines = blocks_path.read_text(encoding="utf-8").splitlines()
- meta = json.loads(lines[0])
- assert meta["type"] == "meta"
- assert meta["format"] == "lightrag"
- assert meta["drawing_file"] is True
- assert meta["table_file"] is True
- assert meta["equation_file"] is True
- base = str(blocks_path)[: -len(".blocks.jsonl")]
- drawings = json.loads(Path(base + ".drawings.json").read_text(encoding="utf-8"))
- tables = json.loads(Path(base + ".tables.json").read_text(encoding="utf-8"))
- equations = json.loads(
- Path(base + ".equations.json").read_text(encoding="utf-8")
- )
- assert drawings["drawings"]
- assert tables["tables"]
- assert equations["equations"]
- full_doc = await rag.full_docs.get_by_id("doc-1")
- assert full_doc["parse_format"] == "lightrag"
- # Per docs/FileProcessingConfiguration-zh.md spec, ``content`` is now
- # ``{{LRdoc}}`` followed by a leading-text summary of the document.
- assert full_doc["content"].startswith("{{LRdoc}}")
- assert full_doc["sidecar_location"].startswith("file://")
- assert full_doc["sidecar_location"].endswith("/")
- assert str(blocks_path.parent.resolve()) in full_doc["sidecar_location"]
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_parse_mineru_uses_hint_source_and_canonical_upload_name(tmp_path, monkeypatch):
- from lightrag.parser.external.mineru import compute_size_and_hash
- from lightrag.parser.external.mineru.cache import current_mineru_options_signature
- from lightrag.parser.external.mineru.client import MinerURawClient
- from lightrag.parser.external.mineru.manifest import (
- Manifest,
- ManifestFile,
- write_manifest,
- )
- async def _run():
- input_dir = tmp_path / "input"
- input_dir.mkdir()
- monkeypatch.setenv("INPUT_DIR", str(input_dir))
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
- rag = _new_rag(tmp_path / "work")
- await rag.initialize_storages()
- hinted_name = "LightRAG - Simple and Fast RAG.[mineru].pdf"
- canonical_name = "LightRAG - Simple and Fast RAG.pdf"
- src_file = input_dir / hinted_name
- src_file.write_bytes(b"fake-pdf")
- async def _fake_download(self, raw_dir, source_file_path, **kwargs):
- assert source_file_path == src_file
- assert kwargs.get("upload_name") == canonical_name
- raw_dir.mkdir(parents=True, exist_ok=True)
- content_list = [{"type": "text", "text": "第一段正文"}]
- content_path = raw_dir / "content_list.json"
- content_path.write_text(
- json.dumps(content_list, ensure_ascii=False),
- encoding="utf-8",
- )
- src_size, src_hash = compute_size_and_hash(source_file_path)
- crit_size, crit_hash = compute_size_and_hash(content_path)
- manifest = Manifest(
- source_content_hash=src_hash,
- source_size_bytes=src_size,
- source_filename_at_parse=kwargs.get("upload_name"),
- critical_file=ManifestFile(
- path="content_list.json",
- size=crit_size,
- sha256=crit_hash,
- ),
- files=[],
- total_size_bytes=crit_size,
- task_id="fake-task",
- api_mode="local",
- options_signature=current_mineru_options_signature(),
- )
- write_manifest(raw_dir, manifest)
- return manifest
- monkeypatch.setattr(MinerURawClient, "download_into", _fake_download)
- await rag.apipeline_enqueue_documents(
- "",
- file_paths=str(src_file),
- track_id="track-hint",
- docs_format=FULL_DOCS_FORMAT_PENDING_PARSE,
- parse_engine=PARSER_ENGINE_MINERU,
- )
- doc_id = compute_mdhash_id(canonical_name, prefix="doc-")
- status = await rag.doc_status.get_by_id(doc_id)
- assert status is not None
- assert status["file_path"] == canonical_name
- assert status["metadata"]["source_file_name"] == hinted_name
- content_data = await rag.full_docs.get_by_id(doc_id)
- assert content_data is not None
- content_data["source_file_name"] = status["metadata"]["source_file_name"]
- parsed = await rag.parse_mineru(
- doc_id=doc_id,
- file_path=status["file_path"],
- content_data=content_data,
- )
- blocks_path = Path(parsed["blocks_path"])
- expected_parsed_dir = input_dir / PARSED_DIR_NAME / f"{canonical_name}.parsed"
- expected_raw_dir = (
- input_dir
- / PARSED_DIR_NAME
- / ("LightRAG - Simple and Fast RAG.pdf.mineru_raw")
- )
- archived_source = input_dir / PARSED_DIR_NAME / hinted_name
- assert blocks_path.parent == expected_parsed_dir
- assert expected_raw_dir.is_dir()
- assert not src_file.exists()
- assert archived_source.is_file()
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_mm_chunks_and_modality_relations_from_sidecars(tmp_path):
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps(
- {
- "type": "meta",
- "format": "lightrag",
- "version": "1.0",
- "doc_id": "doc-1",
- },
- ensure_ascii=False,
- ),
- json.dumps(
- {
- "type": "content",
- "blockid": "b1",
- "format": "plain_text",
- "content": "正文",
- },
- ensure_ascii=False,
- ),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "version": "1.0",
- "drawings": {
- "d1": {
- "id": "d1",
- "heading": "章节A",
- "caption": "图1 架构",
- "llm_cache_list": [
- "default:analysis:abc123",
- ],
- "llm_analyze_result": {
- "name": "系统架构图",
- "type": "Chart",
- "description": "模块交互关系",
- "analyze_time": 1700000000,
- "status": "success",
- "message": "",
- },
- }
- },
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- mm_chunks = rag._build_mm_chunks_from_sidecars(
- doc_id="doc-1",
- file_path="demo.pdf",
- blocks_path=str(blocks),
- base_order_index=2,
- )
- assert len(mm_chunks) == 1
- chunk = mm_chunks[0]
- # New nested schema: heading + sidecar + llm_cache_list merge.
- assert chunk["content"].startswith("[Image Name]")
- assert "[Image Type]Chart" in chunk["content"]
- assert chunk["sidecar"] == {
- "type": "drawing",
- "id": "d1",
- "refs": [{"type": "drawing", "id": "d1"}],
- }
- assert chunk["heading"] == {
- "level": 0,
- "heading": "章节A",
- "parent_headings": [],
- }
- assert chunk["llm_cache_list"] == ["default:analysis:abc123"]
- # Multimodal entity injection now lives in
- # operate.extract_entities._process_single_content; this test only
- # covers chunk assembly. The companion regression below
- # (test_parse_mm_display_name_matches_chunk_format) pins the
- # builder/consumer format contract.
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_parse_mm_display_name_matches_chunk_format():
- """Pin the builder/consumer contract: the chunk content emitted by
- ``_build_mm_chunks_from_sidecars`` must parse cleanly via
- ``operate._parse_mm_display_name`` for all three modalities.
- Regression for the case where the renderer's label format diverged
- from the consumer's regex and display names silently fell back to
- sidecar ids.
- """
- drawing_content = (
- "[Image Name]系统架构图\n[Image Type]Chart\n\n模块交互关系\n\n"
- "[Image Footnotes]脚注1; 脚注2"
- )
- assert _parse_mm_display_name(drawing_content, "d1") == "系统架构图"
- table_content = "[Table Name]性能对比表\n\n各方法的指标对比"
- assert _parse_mm_display_name(table_content, "t1") == "性能对比表"
- equation_content = "E = mc^2\n[Equation Name]质能方程\n\n爱因斯坦的质能等效公式"
- assert _parse_mm_display_name(equation_content, "e1") == "质能方程"
- # Fallbacks: missing marker, empty content, marker with blank name.
- assert _parse_mm_display_name("no marker here", "fallback-id") == "fallback-id"
- assert _parse_mm_display_name("", "fallback-id") == "fallback-id"
- assert _parse_mm_display_name("[Image Name] ", "fallback-id") == "fallback-id"
- @pytest.mark.offline
- def test_parse_mm_display_name_on_real_builder_output(tmp_path):
- """End-to-end pin: feed actual chunks from
- ``_build_mm_chunks_from_sidecars`` for all three modalities straight
- into the consumer's parser, and require the analysis ``name`` field
- to come back. This locks the bidirectional builder/consumer contract
- so that renaming the ``[Image|Table|Equation] Name`` label without
- updating the regex in ``operate._parse_mm_display_name`` immediately
- breaks here instead of silently degrading relation descriptions.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- blocks = tmp_path / "demo.pdf.blocks.jsonl"
- blocks.write_text("", encoding="utf-8")
- heading = {"level": 1, "heading": "章节A", "parent_headings": []}
- (tmp_path / "demo.pdf.drawings.json").write_text(
- json.dumps(
- {
- "drawings": {
- "d1": {
- "heading": heading,
- "footnotes": [],
- "llm_cache_list": [],
- "llm_analyze_result": {
- "name": "系统架构图",
- "type": "Chart",
- "description": "模块交互关系",
- "analyze_time": 1700000000,
- "status": "success",
- "message": "",
- },
- }
- }
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- (tmp_path / "demo.pdf.tables.json").write_text(
- json.dumps(
- {
- "tables": {
- "t1": {
- "heading": heading,
- "footnotes": [],
- "llm_cache_list": [],
- "llm_analyze_result": {
- "name": "性能对比表",
- "description": "各方法的指标对比",
- "analyze_time": 1700000001,
- "status": "success",
- "message": "",
- },
- }
- }
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- (tmp_path / "demo.pdf.equations.json").write_text(
- json.dumps(
- {
- "equations": {
- "e1": {
- "heading": heading,
- "footnotes": [],
- "llm_cache_list": [],
- "llm_analyze_result": {
- "name": "质能方程",
- "equation": "E = mc^2",
- "description": "爱因斯坦的质能等效公式",
- "analyze_time": 1700000002,
- "status": "success",
- "message": "",
- },
- }
- }
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- mm_chunks = rag._build_mm_chunks_from_sidecars(
- doc_id="doc-1",
- file_path="demo.pdf",
- blocks_path=str(blocks),
- base_order_index=0,
- process_options="ite",
- )
- # Index by sidecar type for stable assertions independent of
- # iteration order in the builder.
- by_type = {chunk["sidecar"]["type"]: chunk for chunk in mm_chunks}
- assert set(by_type.keys()) == {"drawing", "table", "equation"}
- expected = {
- "drawing": ("d1", "系统架构图"),
- "table": ("t1", "性能对比表"),
- "equation": ("e1", "质能方程"),
- }
- for kind, (sidecar_id, name) in expected.items():
- display = _parse_mm_display_name(by_type[kind]["content"], sidecar_id)
- assert display == name, (
- f"{kind}: parser failed to extract '{name}' from builder "
- f"output, got '{display}' (content: {by_type[kind]['content']!r})"
- )
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_parse_mineru_empty_service_result_raises_without_fallback(
- tmp_path, monkeypatch
- ):
- """When MinerU produces no content_list.json the IR builder raises and the
- pipeline propagates the error — no silent fallback to raw text.
- With the unified pipeline, an "empty result" surfaces as a missing
- critical file inside ``*.mineru_raw/``; the IR builder's
- ``normalize_from_workdir`` raises :class:`FileNotFoundError` and the
- parse fails fast.
- """
- from lightrag.parser.external.mineru.client import MinerURawClient
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- src_file = tmp_path / "demo.pdf"
- src_file.write_bytes(b"fake-pdf")
- async def _fake_download(self, raw_dir, source_file_path, **_kwargs):
- # Simulate a "MinerU returned nothing useful" bundle: dir is
- # touched but no content_list.json is produced.
- raw_dir.mkdir(parents=True, exist_ok=True)
- monkeypatch.setattr(MinerURawClient, "download_into", _fake_download)
- monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake")
- with pytest.raises(FileNotFoundError, match="content_list.json"):
- await rag.parse_mineru(
- doc_id="doc-local-1",
- file_path=str(src_file),
- content_data={"content": "native fallback content"},
- )
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_build_chunks_dict_preserves_existing_llm_cache_list():
- """Regression: build_chunks_dict_from_chunking_result must not overwrite
- a chunk's pre-existing llm_cache_list — multimodal chunks arrive with
- analysis cache ids already attached so document deletion can clean
- them up via the per-chunk llm_cache_list."""
- from lightrag.utils_pipeline import build_chunks_dict_from_chunking_result
- chunking_result = [
- {
- "chunk_order_index": 0,
- "content": "first chunk",
- "tokens": 4,
- },
- {
- "chunk_id": "doc-1-mm-drawing-000",
- "chunk_order_index": 1,
- "content": "second chunk",
- "tokens": 6,
- "llm_cache_list": [
- "default:analysis:abc",
- "default:analysis:abc", # dedup verification
- "default:analysis:def",
- ],
- },
- ]
- chunks = build_chunks_dict_from_chunking_result(
- chunking_result, doc_id="doc-1", file_path="demo.pdf"
- )
- # Order is chunking_result order; locate by chunk_id.
- mm_chunk = next(
- v for v in chunks.values() if v.get("chunk_id") == "doc-1-mm-drawing-000"
- )
- text_chunk = next(
- v for v in chunks.values() if v.get("chunk_id") != "doc-1-mm-drawing-000"
- )
- assert mm_chunk["llm_cache_list"] == [
- "default:analysis:abc",
- "default:analysis:def",
- ]
- # Plain text chunks still start with an empty list (no pre-existing ids).
- assert text_chunk["llm_cache_list"] == []
- @pytest.mark.offline
- def test_build_mm_chunks_respects_process_options_filter(tmp_path):
- """Regression: _build_mm_chunks_from_sidecars must gate sidecar reads
- by the active process_options. A document re-processed after opting
- out of i/t/e MUST NOT pick up stale success results from a prior pass.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- # Both modalities carry a stale ``success`` from a prior pass.
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "drawings": {
- "d1": {
- "id": "d1",
- "llm_analyze_result": {
- "name": "old",
- "type": "Chart",
- "description": "stale drawing",
- "analyze_time": 1700000000,
- "status": "success",
- "message": "",
- },
- }
- }
- }
- ),
- encoding="utf-8",
- )
- tables = tmp_path / "demo.tables.json"
- tables.write_text(
- json.dumps(
- {
- "tables": {
- "t1": {
- "id": "t1",
- "llm_analyze_result": {
- "name": "old",
- "description": "stale table",
- "analyze_time": 1700000000,
- "status": "success",
- "message": "",
- },
- }
- }
- }
- ),
- encoding="utf-8",
- )
- # process_options="t" → only tables are considered; the drawing
- # success entry must NOT generate a chunk.
- only_tables = rag._build_mm_chunks_from_sidecars(
- doc_id="doc-1",
- file_path="demo.pdf",
- blocks_path=str(blocks),
- base_order_index=0,
- process_options="t",
- )
- assert len(only_tables) == 1
- assert only_tables[0]["sidecar"]["type"] == "table"
- # Empty/None process_options → no modalities active → no chunks.
- none_active = rag._build_mm_chunks_from_sidecars(
- doc_id="doc-1",
- file_path="demo.pdf",
- blocks_path=str(blocks),
- base_order_index=0,
- process_options="",
- )
- assert none_active == []
- # Backwards-compat: callers that pass process_options=None see
- # every modality (legacy behaviour for ad-hoc unit tests).
- legacy = rag._build_mm_chunks_from_sidecars(
- doc_id="doc-1",
- file_path="demo.pdf",
- blocks_path=str(blocks),
- base_order_index=0,
- )
- assert {ch["sidecar"]["type"] for ch in legacy} == {"drawing", "table"}
- await rag.finalize_storages()
- asyncio.run(_run())
- @pytest.mark.offline
- def test_strip_internal_multimodal_markup_cleans_table_id():
- """Regression: parser-emitted ``<table id="tb-...">`` tags must have
- their internal id stripped before the entity-extraction prompt sees
- them. ``format`` / ``caption`` and the row body stay verbatim so the
- extractor still recognizes the structured element."""
- from lightrag.chunk_schema import (
- strip_internal_multimodal_markup_for_extraction,
- )
- source = (
- '<table id="tb-1-0001" format="json" caption="Indicator metrics">'
- '[["a","b"],["1","2"]]'
- "</table>"
- )
- cleaned = strip_internal_multimodal_markup_for_extraction(source)
- assert "tb-1-0001" not in cleaned
- assert 'format="json"' in cleaned
- assert 'caption="Indicator metrics"' in cleaned
- # Row body preserved.
- assert '[["a","b"],["1","2"]]' in cleaned
- @pytest.mark.offline
- def test_strip_internal_multimodal_markup_cite_default_unwraps():
- """Default (keep_cite_tag=False) is the entity-extraction path: the
- ``<cite>`` wrapper is stripped so the extractor does not surface it
- as a structural entity — only the visible label survives.
- The surrounding-context path overrides this via keep_cite_tag=True
- (verified in tests/pipeline/test_multimodal_surrounding_context.py); this
- test pins the default to prevent regressions on the extraction
- path when callers refactor the function signature.
- """
- from lightrag.chunk_schema import (
- strip_internal_multimodal_markup_for_extraction,
- )
- source = (
- 'see <cite type="table" refid="tb-1-0001">表 1</cite> and '
- '<cite type="equation" refid="eq-1-0002">公式 2</cite> for details.'
- )
- cleaned = strip_internal_multimodal_markup_for_extraction(source)
- # Wrappers and ids both gone; visible labels survive as plain text.
- assert "<cite" not in cleaned
- assert "refid=" not in cleaned
- assert "tb-1-0001" not in cleaned
- assert "eq-1-0002" not in cleaned
- assert "表 1" in cleaned
- assert "公式 2" in cleaned
- @pytest.mark.offline
- def test_strip_internal_multimodal_markup_cite_keep_tag_strips_refid_only():
- """keep_cite_tag=True (surrounding-context path): preserve the
- ``<cite type="…">label</cite>`` wrapper but drop the parser-
- internal ``refid``. Other identifier transformations
- (``<table id=…>`` / ``<drawing id=…/>`` / ``<equation id=…>``) are
- unaffected by the flag and still apply."""
- from lightrag.chunk_schema import (
- strip_internal_multimodal_markup_for_extraction,
- )
- source = (
- 'see <cite type="table" refid="tb-1-0001">表 1</cite>; '
- '<drawing id="im-1" path="a.png" src="a" caption="Fig" />'
- )
- cleaned = strip_internal_multimodal_markup_for_extraction(
- source, keep_cite_tag=True
- )
- assert '<cite type="table">表 1</cite>' in cleaned
- assert "refid=" not in cleaned
- assert "tb-1-0001" not in cleaned
- # Non-cite cleaning still applies in this mode.
- assert '<drawing caption="Fig" />' in cleaned
- assert 'id="im-1"' not in cleaned
- assert "path=" not in cleaned
- @pytest.mark.offline
- def test_reinsert_without_process_options_skips_stale_mm_chunks(tmp_path):
- """Regression for the call-site fallback in process_single_document.
- A document re-inserted without ``process_options`` is signalled by a
- missing / falsy ``content_data["process_options"]`` field. The
- pipeline must pass ``""`` (not ``None``) to
- ``_build_mm_chunks_from_sidecars`` so the builder honors the
- "no modalities" contract: stale ``status="success"`` sidecar entries
- from an earlier i/t/e pass MUST NOT be re-indexed.
- The new builder happens to handle ``None`` by enabling every
- modality for ad-hoc callers (unit tests), so this test pins the
- call-site behaviour rather than the helper's default — passing the
- same falsy value via ``or ""`` makes the intent explicit.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- blocks = tmp_path / "demo.blocks.jsonl"
- blocks.write_text(
- "\n".join(
- [
- json.dumps({"type": "meta", "format_version": "1.0"}),
- json.dumps({"type": "content", "content": "body"}),
- ]
- )
- + "\n",
- encoding="utf-8",
- )
- # Stale success from an earlier multimodal run.
- drawings = tmp_path / "demo.drawings.json"
- drawings.write_text(
- json.dumps(
- {
- "drawings": {
- "d1": {
- "id": "d1",
- "llm_analyze_result": {
- "name": "old",
- "type": "Chart",
- "description": "stale drawing",
- "analyze_time": 1700000000,
- "status": "success",
- "message": "",
- },
- }
- }
- }
- ),
- encoding="utf-8",
- )
- # Simulate the call-site contract: ``content_data`` has no
- # ``process_options`` key, so ``.get(...) or ""`` yields "".
- content_data: dict[str, str] = {}
- effective = (content_data or {}).get("process_options") or ""
- assert effective == ""
- mm_chunks = rag._build_mm_chunks_from_sidecars(
- doc_id="doc-1",
- file_path="demo.pdf",
- blocks_path=str(blocks),
- base_order_index=0,
- process_options=effective,
- )
- assert mm_chunks == []
- await rag.finalize_storages()
- asyncio.run(_run())
|