pipeline.py 214 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810
  1. """Document ingestion pipeline mixin for the LightRAG class.
  2. This module isolates the document parse/enqueue/extraction pipeline so that
  3. ``lightrag.py`` stays focused on storage management, querying, and editing.
  4. The mixin is wired into :class:`lightrag.LightRAG` via multiple inheritance
  5. and relies on attributes/methods that the main class provides
  6. (``self.full_docs``, ``self.doc_status``, ``self.tokenizer``,
  7. ``self.parse_native``-related fields, ``self._insert_done``,
  8. ``self._process_extract_entities``, etc.).
  9. """
  10. from __future__ import annotations
  11. import asyncio
  12. import base64
  13. import hashlib
  14. import inspect
  15. import json
  16. import json_repair
  17. import mimetypes
  18. import os
  19. import re
  20. import shutil
  21. import time
  22. import traceback
  23. from dataclasses import dataclass
  24. from datetime import datetime, timezone
  25. from pathlib import Path
  26. from typing import Any
  27. from lightrag.base import DocProcessingStatus, DocStatus
  28. from lightrag.constants import (
  29. FULL_DOCS_FORMAT_LIGHTRAG,
  30. FULL_DOCS_FORMAT_PENDING_PARSE,
  31. FULL_DOCS_FORMAT_RAW,
  32. PARSED_DIR_NAME,
  33. PARSER_ENGINE_DOCLING,
  34. PARSER_ENGINE_MINERU,
  35. PARSER_ENGINE_NATIVE,
  36. )
  37. from lightrag.exceptions import MultimodalAnalysisError, PipelineCancelledException
  38. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  39. from lightrag.operate import merge_nodes_and_edges
  40. from lightrag.parser.routing import (
  41. resolve_file_parser_directives,
  42. resolve_stored_document_parser_engine,
  43. )
  44. from lightrag.utils import (
  45. CacheData,
  46. _serialize_cache_variant,
  47. compute_args_hash,
  48. compute_mdhash_id,
  49. enforce_chunk_token_limit_before_embedding,
  50. generate_cache_key,
  51. generate_track_id,
  52. get_content_summary,
  53. get_env_value,
  54. get_llm_cache_identity,
  55. handle_cache,
  56. logger,
  57. sanitize_text_for_encoding,
  58. save_to_cache,
  59. serialize_llm_cache_identity,
  60. )
  61. from lightrag.utils_pipeline import (
  62. archive_docx_source_after_full_docs_sync,
  63. archive_source_after_full_docs_sync,
  64. build_chunks_dict_from_chunking_result,
  65. chunk_fields_from_status_doc,
  66. compute_text_content_hash,
  67. doc_status_field,
  68. doc_status_transition_metadata,
  69. get_duplicate_doc_by_content_hash,
  70. get_existing_doc_by_content_hash,
  71. get_existing_doc_by_file_basename,
  72. has_known_document_source,
  73. input_dir_path,
  74. load_lightrag_document_content,
  75. make_lightrag_doc_content,
  76. normalize_document_file_path,
  77. parsed_artifact_dir_for,
  78. resolve_doc_file_path,
  79. sidecar_blocks_path,
  80. sidecar_uri_for,
  81. strip_lightrag_doc_prefix,
  82. )
  83. # Document statuses the pipeline considers "in-flight or pending" — used by
  84. # both the initial snapshot and every refetch after a request_pending
  85. # continuation. Module-level so we don't reconstruct the list on every
  86. # pipeline entry.
  87. _INFLIGHT_DOC_STATUSES = (
  88. DocStatus.PROCESSING,
  89. DocStatus.FAILED,
  90. DocStatus.PENDING,
  91. DocStatus.PARSING,
  92. DocStatus.ANALYZING,
  93. )
  94. def _call_source_file_resolver(
  95. owner: Any,
  96. file_path: str,
  97. *,
  98. source_file_name: str | None = None,
  99. parser_engine: str | None = None,
  100. ) -> str:
  101. """Call parser source resolver while tolerating legacy test doubles."""
  102. resolver = owner._resolve_source_file_for_parser
  103. params = inspect.signature(resolver).parameters
  104. supports_context = "source_file_name" in params or any(
  105. param.kind == inspect.Parameter.VAR_KEYWORD for param in params.values()
  106. )
  107. if supports_context:
  108. return resolver(
  109. file_path,
  110. source_file_name=source_file_name,
  111. parser_engine=parser_engine,
  112. )
  113. return resolver(source_file_name or file_path)
  114. # Map ``process_options.chunking`` selector → ``extraction_meta.chunking_method``
  115. # string used by the pipeline observability layer and the resume path.
  116. _CHUNKING_METHOD_LABELS: dict[str, str] = {
  117. "F": "fixed_token",
  118. "R": "recursive_character",
  119. "V": "semantic_vector",
  120. "P": "paragraph_semantic",
  121. }
  122. _CHUNK_LOG_KEY_ALIASES: dict[str, str] = {
  123. "chunk_overlap_token_size": "overlap",
  124. "breakpoint_threshold_type": "break",
  125. "breakpoint_threshold_amount": "amount",
  126. "buffer_size": "buf",
  127. "split_by_character": "split_by",
  128. "split_by_character_only": "split_only",
  129. "separators": "seps",
  130. "sentence_split_regex": "regex",
  131. }
  132. def _format_chunking_params(
  133. chunk_size: int,
  134. params: dict[str, Any],
  135. ) -> str:
  136. """Format the ``size=..., key=value, ...`` portion shared by the chunking
  137. start log line and ``doc_status.metadata['chunk_opts']``.
  138. Drops keys with ``None``/empty values so the line stays scannable;
  139. callers pass the strategy-specific kwargs they're about to splat
  140. into the chunker so the output mirrors the actual call. Long keys are
  141. aliased to short forms via ``_CHUNK_LOG_KEY_ALIASES``.
  142. """
  143. pieces = [f"size={chunk_size}"]
  144. for key, value in params.items():
  145. if value is None:
  146. continue
  147. if isinstance(value, (list, dict, str)) and len(value) == 0:
  148. continue
  149. short = _CHUNK_LOG_KEY_ALIASES.get(key, key)
  150. pieces.append(
  151. f"{short}={value!r}" if isinstance(value, str) else f"{short}={value}"
  152. )
  153. return ", ".join(pieces)
  154. @dataclass
  155. class _BatchRunContext:
  156. """Per-batch shared state for the parse/analyze/process worker pipeline.
  157. Bundles the cross-cutting handles (pipeline_status, locks, queues,
  158. semaphore) so worker methods accept a single ``ctx`` argument instead of
  159. ~8 individually plumbed parameters. ``processed_count`` mutates inside
  160. each batch and is always read/written under ``pipeline_status_lock``.
  161. """
  162. pipeline_status: dict
  163. pipeline_status_lock: Any
  164. semaphore: asyncio.Semaphore
  165. total_files: int
  166. q_native: asyncio.Queue
  167. q_mineru: asyncio.Queue
  168. q_docling: asyncio.Queue
  169. q_analyze: asyncio.Queue
  170. q_process: asyncio.Queue
  171. processed_count: int = 0
  172. class _PipelineMixin:
  173. """Mixin providing document ingestion pipeline methods for LightRAG.
  174. Designed to be combined as a base of LightRAG only. Relies on
  175. LightRAG-provided attributes (``self.full_docs``, ``self.doc_status``,
  176. ``self.tokenizer``, ``self.parser_*``, ``self.workspace`` ...) and on the
  177. shared methods ``self._insert_done`` / ``self._process_extract_entities``
  178. which remain in the main class and are resolved through MRO.
  179. """
  180. # ============================================================
  181. # Public document ingestion API (entry points)
  182. # ============================================================
  183. async def apipeline_enqueue_documents(
  184. self,
  185. input: str | list[str],
  186. ids: list[str] | None = None,
  187. file_paths: str | list[str] | None = None,
  188. track_id: str | None = None,
  189. docs_format: str = FULL_DOCS_FORMAT_RAW,
  190. lightrag_document_paths: str | list[str] | None = None,
  191. parse_engine: str | list[str] | None = None,
  192. process_options: str | list[str] | None = None,
  193. chunk_options: dict | list[dict] | None = None,
  194. from_scan: bool = False,
  195. ) -> str:
  196. """
  197. Pipeline for Processing Documents
  198. 1. Validate ids if provided or generate MD5 hash IDs and remove duplicate contents (skip content dedup when format is lightrag)
  199. 2. Generate document initial status
  200. 3. Filter out already processed documents
  201. 4. Enqueue document in status
  202. Args:
  203. input: Single document string or list of document strings (can be empty when docs_format is lightrag)
  204. ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated (from content or file_path when lightrag)
  205. file_paths: list of file paths corresponding to each document, used for citation
  206. track_id: tracking ID for monitoring processing status
  207. docs_format: "raw" (default) or "lightrag"; when "lightrag" content may be empty and content-dedup is skipped
  208. lightrag_document_paths: paths to LightRAG Document (e.g. .blocks.jsonl dir or base path), when docs_format is lightrag
  209. parse_engine: file extraction engine already used or target engine for pending_parse
  210. process_options: per-document processing options string (i/t/e/!/F/R/V/P);
  211. accepted as a single string broadcast to every input or as a list
  212. aligned with ``input``. Stored verbatim on ``full_docs`` and
  213. mirrored to ``doc_status.metadata['process_options']``.
  214. chunk_options: per-document chunker parameter snapshot.
  215. Accepted as ``dict`` (broadcast to every input) or
  216. ``list[dict]`` (aligned with ``input``). When ``None``,
  217. each doc's snapshot is built via
  218. :func:`lightrag.parser.routing.resolve_chunk_options`
  219. from ``self.addon_params['chunker']``. Persisted to
  220. ``full_docs[doc_id]['chunk_options']`` and consumed by
  221. :meth:`process_single_document` to drive the file
  222. chunkers (F / R / V / P). Callers that need to bake
  223. F-strategy runtime args (``split_by_character`` /
  224. ``split_by_character_only``) into the snapshot — e.g.
  225. :meth:`LightRAG.ainsert` — should call
  226. :func:`resolve_chunk_options` themselves and pass the
  227. result here; this function is intentionally chunker-
  228. config agnostic. See
  229. ``docs/FileProcessingConfiguration-zh.md`` for the schema.
  230. from_scan: when True, the caller is the scan-owned background task
  231. that already holds ``pipeline_status["scanning"]``. Scan
  232. does additional doc_status reads during its classification
  233. phase (PROCESSED detection, FAILED-stub deletion, etc.)
  234. so external writers are blocked via
  235. ``scanning_exclusive``. Scan's own enqueues happen in
  236. its processing phase, after classification has cleared
  237. ``scanning_exclusive``, but ``from_scan=True`` is still
  238. forwarded as a defence-in-depth bypass so an unexpected
  239. scan-owned write inside the classification window is
  240. allowed through. External callers must leave this False.
  241. Returns:
  242. str: tracking ID for monitoring processing status
  243. Raises:
  244. RuntimeError: if a scan is in progress (and ``from_scan`` is
  245. False), or if a destructive job (clear / delete) is in
  246. flight. Concurrent indexing (``busy=True`` from the
  247. processing loop) is permitted — the running loop is
  248. notified via ``request_pending`` and picks up the
  249. newly-enqueued doc after its current batch finishes.
  250. """
  251. # Concurrency contract: enqueue may proceed concurrently with the
  252. # processing loop because (a) full_docs is upserted before
  253. # doc_status, so a consistency check never sees a ghost row, and
  254. # (b) the running loop re-queries doc_status by status after each
  255. # batch and sets ``request_pending`` whenever new work arrives
  256. # while busy. Two states still block enqueue:
  257. # * ``scanning_exclusive`` — scan task is in its CLASSIFICATION
  258. # phase, reading doc_status to classify files and possibly
  259. # deleting stale stubs. Concurrent enqueue would race
  260. # against scan's reads / mutations. ``from_scan=True``
  261. # lifts this guard for the scan task's own enqueues.
  262. # ``scanning`` alone (the processing phase) does NOT block,
  263. # identical to the upload-during-busy case.
  264. # * ``destructive_busy`` — clear / delete is dropping storages
  265. # or removing input files; a concurrent write would be
  266. # silently clobbered.
  267. pipeline_status = await get_namespace_data(
  268. "pipeline_status", workspace=self.workspace
  269. )
  270. pipeline_status_lock = get_namespace_lock(
  271. "pipeline_status", workspace=self.workspace
  272. )
  273. async with pipeline_status_lock:
  274. if not from_scan and pipeline_status.get("scanning_exclusive"):
  275. raise RuntimeError(
  276. "Cannot enqueue while scan is classifying files; "
  277. "wait for the classification phase to finish "
  278. "before retrying."
  279. )
  280. if pipeline_status.get("destructive_busy"):
  281. raise RuntimeError(
  282. "Cannot enqueue while pipeline is clearing or "
  283. "deleting documents; wait for the running job to "
  284. "finish before retrying."
  285. )
  286. # Generate track_id if not provided
  287. if track_id is None or track_id.strip() == "":
  288. track_id = generate_track_id("enqueue")
  289. if isinstance(input, str):
  290. input = [input]
  291. if isinstance(ids, str):
  292. ids = [ids]
  293. if isinstance(file_paths, str):
  294. file_paths = [file_paths]
  295. if isinstance(lightrag_document_paths, str):
  296. lightrag_document_paths = (
  297. [lightrag_document_paths] if lightrag_document_paths else None
  298. )
  299. if isinstance(parse_engine, str):
  300. parse_engine = [parse_engine] * len(input)
  301. if isinstance(process_options, str):
  302. process_options = [process_options] * len(input)
  303. if isinstance(chunk_options, dict):
  304. chunk_options = [chunk_options] * len(input)
  305. # If file_paths is provided, ensure it matches the number of documents
  306. if file_paths is not None:
  307. if isinstance(file_paths, str):
  308. file_paths = [file_paths]
  309. if len(file_paths) != len(input):
  310. raise ValueError(
  311. "Number of file paths must match the number of documents"
  312. )
  313. file_paths = [
  314. path.strip() if isinstance(path, str) else "" for path in file_paths
  315. ]
  316. file_paths = [path if path else "unknown_source" for path in file_paths]
  317. else:
  318. file_paths = ["unknown_source"] * len(input)
  319. is_lightrag_format = docs_format == FULL_DOCS_FORMAT_LIGHTRAG
  320. if is_lightrag_format and lightrag_document_paths is not None:
  321. if len(lightrag_document_paths) != len(input):
  322. raise ValueError(
  323. "Number of lightrag_document_paths must match the number of documents"
  324. )
  325. if parse_engine is not None and len(parse_engine) != len(input):
  326. raise ValueError(
  327. "Number of parse engines must match the number of documents"
  328. )
  329. if process_options is not None and len(process_options) != len(input):
  330. raise ValueError(
  331. "Number of process options must match the number of documents"
  332. )
  333. if chunk_options is not None and len(chunk_options) != len(input):
  334. raise ValueError(
  335. "Number of chunk_options dicts must match the number of documents"
  336. )
  337. def _parse_engine_at(index: int) -> str | None:
  338. if parse_engine is None:
  339. return None
  340. engine = str(parse_engine[index] or "").strip().lower()
  341. return engine or None
  342. def _process_options_at(index: int) -> str:
  343. if process_options is None:
  344. return ""
  345. from lightrag.parser.routing import sanitize_process_options
  346. return sanitize_process_options(process_options[index])
  347. def _chunk_options_at(index: int) -> dict[str, Any]:
  348. """Resolve the per-doc slim chunk_options snapshot.
  349. Projects the chunker config down to the one strategy
  350. sub-dict selected by the doc's ``process_options`` (F by
  351. default) — the persisted ``full_docs[doc_id]['chunk_options']``
  352. carries only the params actually consumed at process time.
  353. When the caller supplied ``chunk_options`` we slim it
  354. against the per-doc options (deep-copying internally so two
  355. docs broadcast from a single dict cannot share mutable
  356. sub-dicts); otherwise we build a fresh snapshot from
  357. ``self.addon_params['chunker']``.
  358. F-strategy runtime args (``split_by_character`` /
  359. ``split_by_character_only`` from :meth:`LightRAG.ainsert`)
  360. are baked into the snapshot upstream — ainsert calls
  361. :func:`lightrag.parser.routing.resolve_chunk_options` itself
  362. and passes the result via ``chunk_options=``. This function
  363. is purely a persistence helper; chunker-config construction
  364. is not its concern.
  365. """
  366. from lightrag.parser.routing import (
  367. resolve_chunk_options,
  368. slim_chunk_options,
  369. )
  370. doc_options = _process_options_at(index)
  371. if chunk_options is not None:
  372. return slim_chunk_options(chunk_options[index], doc_options)
  373. return resolve_chunk_options(self.addon_params, process_options=doc_options)
  374. # 1. Validate ids and build contents (when lightrag: no content dedup, content may be empty)
  375. if ids is not None:
  376. if len(ids) != len(input):
  377. raise ValueError("Number of IDs must match the number of documents")
  378. if len(ids) != len(set(ids)):
  379. raise ValueError("IDs must be unique")
  380. # Canonicalize every input filename once: the stored ``file_path``
  381. # is hint-stripped and serves UI display, filename dedup, and the
  382. # deterministic doc_id seed in one go.
  383. file_paths_canonical = [
  384. normalize_document_file_path(path) for path in file_paths
  385. ]
  386. contents: dict[str, dict[str, Any]] = {}
  387. source_to_doc_id: dict[str, str] = {}
  388. content_hash_to_doc_id: dict[str, str] = {}
  389. duplicate_attempts: list[dict[str, Any]] = []
  390. # Per-doc I/O failures from the lightrag-format branch. Populated when
  391. # ``load_lightrag_document_content`` cannot read the user-supplied
  392. # blocks.jsonl; flushed as FAILED stubs via
  393. # ``apipeline_enqueue_error_documents`` inside the critical section so
  394. # the UI surfaces the root cause instead of a silent empty document.
  395. lightrag_load_errors: list[dict[str, Any]] = []
  396. def _add_content(
  397. index: int,
  398. content: str,
  399. doc_format: str,
  400. *,
  401. sidecar_location: str | None = None,
  402. ) -> None:
  403. file_path_canonical = file_paths_canonical[index]
  404. # Body length excludes the {{LRdoc}} marker so duplicate-attempt
  405. # bookkeeping reports the same units as raw documents.
  406. # strip_lightrag_doc_prefix is a no-op for non-lightrag formats.
  407. body_length = len(strip_lightrag_doc_prefix(content, doc_format))
  408. # Compute content hash: skip for pending_parse (content extracted later).
  409. # RAW and LIGHTRAG both hash the bare merged text so the same body
  410. # carried by different envelopes (raw text vs sidecar) dedupes
  411. # against itself across formats.
  412. content_hash: str | None = None
  413. if doc_format in (FULL_DOCS_FORMAT_RAW, FULL_DOCS_FORMAT_LIGHTRAG):
  414. content_hash = compute_text_content_hash(
  415. strip_lightrag_doc_prefix(content or "", doc_format)
  416. )
  417. known_source = has_known_document_source(file_path_canonical)
  418. if ids is not None:
  419. doc_id = ids[index]
  420. elif known_source:
  421. doc_id = compute_mdhash_id(file_path_canonical, prefix="doc-")
  422. elif doc_format == FULL_DOCS_FORMAT_RAW:
  423. doc_id = compute_mdhash_id(content or "", prefix="doc-")
  424. elif content_hash:
  425. doc_id = compute_mdhash_id(content_hash, prefix="doc-")
  426. else:
  427. doc_id = compute_mdhash_id(
  428. f"{file_path_canonical}-{track_id}-{index}", prefix="doc-"
  429. )
  430. if known_source and file_path_canonical in source_to_doc_id:
  431. duplicate_attempts.append(
  432. {
  433. "doc_id": doc_id,
  434. "original_doc_id": source_to_doc_id[file_path_canonical],
  435. "file_path": file_path_canonical,
  436. "content_length": body_length,
  437. "existing_status": "batch_duplicate",
  438. "existing_track_id": "",
  439. "duplicate_kind": "filename",
  440. }
  441. )
  442. return
  443. if content_hash and content_hash in content_hash_to_doc_id:
  444. duplicate_attempts.append(
  445. {
  446. "doc_id": doc_id,
  447. "original_doc_id": content_hash_to_doc_id[content_hash],
  448. "file_path": file_path_canonical,
  449. "content_length": body_length,
  450. "existing_status": "batch_duplicate",
  451. "existing_track_id": "",
  452. "duplicate_kind": "content_hash",
  453. }
  454. )
  455. return
  456. if known_source:
  457. source_to_doc_id[file_path_canonical] = doc_id
  458. if content_hash:
  459. content_hash_to_doc_id[content_hash] = doc_id
  460. content_data: dict[str, Any] = {
  461. "content": content,
  462. "file_path": file_path_canonical,
  463. "parse_format": doc_format,
  464. }
  465. if content_hash:
  466. content_data["content_hash"] = content_hash
  467. if sidecar_location:
  468. content_data["sidecar_location"] = sidecar_location
  469. if engine := _parse_engine_at(index):
  470. content_data["parse_engine"] = engine
  471. if doc_format == FULL_DOCS_FORMAT_PENDING_PARSE:
  472. source_file_name = Path(str(file_paths[index] or "").strip()).name
  473. if has_known_document_source(source_file_name):
  474. content_data["source_file_name"] = source_file_name
  475. options_str = _process_options_at(index)
  476. if options_str:
  477. content_data["process_options"] = options_str
  478. # Always snapshot chunk_options at enqueue time — independent
  479. # of whether process_options selected a specific strategy —
  480. # so the per-doc parameters are frozen even when ``F``
  481. # (default) is used.
  482. content_data["chunk_options"] = _chunk_options_at(index)
  483. contents[doc_id] = content_data
  484. if is_lightrag_format:
  485. # LightRAG Document: no content hash dedup; content may be empty
  486. for i in range(len(file_paths)):
  487. path = file_paths[i]
  488. raw_path = (
  489. lightrag_document_paths[i] if lightrag_document_paths else ""
  490. ) or path
  491. # Resolve to an absolute path so the sidecar URI carries
  492. # full location info; relative paths are interpreted under
  493. # input_dir.
  494. p = Path(raw_path)
  495. if not p.is_absolute():
  496. p = input_dir_path() / p
  497. # The user may point at the ``*.blocks.jsonl`` file itself
  498. # or at its containing ``*.parsed/`` directory. Sidecars
  499. # are addressed by directory, so step up when given a file.
  500. sidecar_dir = (
  501. p.parent
  502. if p.suffix == ".jsonl" and p.name.endswith(".blocks.jsonl")
  503. else p
  504. )
  505. sidecar_location = sidecar_uri_for(sidecar_dir)
  506. # Per docs/FileProcessingConfiguration-zh.md, full_docs.content
  507. # for format=lightrag must be "{{LRdoc}}" + the merged body.
  508. # If the blocks file cannot be read (permission, truncation,
  509. # invalid JSON line), recording an empty body would let an
  510. # untrue "{{LRdoc}}" record land in full_docs and desync from
  511. # the on-disk blocks.jsonl. Instead, skip this doc and flush
  512. # a FAILED stub via apipeline_enqueue_error_documents after
  513. # the critical section so /documents surfaces the cause and
  514. # /documents/scan retries cleanly once the file is fixed.
  515. try:
  516. merged_text, _ = await load_lightrag_document_content(
  517. sidecar_location
  518. )
  519. except Exception as exc:
  520. error_msg = f"load_lightrag_document_content failed: {exc}"
  521. logger.warning(f"[apipeline_enqueue] {error_msg} ({raw_path})")
  522. file_size = 0
  523. blocks_path_str = sidecar_blocks_path(sidecar_location)
  524. if blocks_path_str:
  525. try:
  526. file_size = Path(blocks_path_str).stat().st_size
  527. except OSError:
  528. file_size = 0
  529. lightrag_load_errors.append(
  530. {
  531. "file_path": path,
  532. "error_description": (
  533. "Failed to load LightRAG Document blocks"
  534. ),
  535. "original_error": error_msg,
  536. "file_size": file_size,
  537. }
  538. )
  539. continue
  540. summary_content = make_lightrag_doc_content(merged_text)
  541. _add_content(
  542. i,
  543. summary_content,
  544. FULL_DOCS_FORMAT_LIGHTRAG,
  545. sidecar_location=sidecar_location,
  546. )
  547. elif ids is not None:
  548. for i, doc in enumerate(input):
  549. cleaned_content = sanitize_text_for_encoding(doc)
  550. _add_content(
  551. i,
  552. cleaned_content,
  553. FULL_DOCS_FORMAT_RAW,
  554. )
  555. elif docs_format == FULL_DOCS_FORMAT_PENDING_PARSE:
  556. for i, doc in enumerate(input):
  557. _add_content(
  558. i,
  559. doc or "",
  560. FULL_DOCS_FORMAT_PENDING_PARSE,
  561. )
  562. else:
  563. for i, doc in enumerate(input):
  564. cleaned_content = sanitize_text_for_encoding(doc)
  565. _add_content(i, cleaned_content, FULL_DOCS_FORMAT_RAW)
  566. # 2. Generate document initial status (without content)
  567. def _initial_doc_status(content_data: dict[str, Any]) -> dict[str, Any]:
  568. # For lightrag-format full_docs the persisted content carries the
  569. # ``{{LRdoc}}`` marker; strip it so summary/length match raw
  570. # semantics (the marker is full_docs internal bookkeeping and
  571. # must not leak into doc_status). strip_lightrag_doc_prefix
  572. # internally checks parse_format, so non-lightrag formats pass
  573. # through untouched.
  574. body_text = strip_lightrag_doc_prefix(
  575. content_data.get("content", ""),
  576. content_data.get("parse_format"),
  577. )
  578. base: dict[str, Any] = {
  579. "status": DocStatus.PENDING,
  580. "content_summary": get_content_summary(body_text),
  581. "content_length": len(body_text),
  582. "created_at": datetime.now(timezone.utc).isoformat(),
  583. "updated_at": datetime.now(timezone.utc).isoformat(),
  584. "file_path": content_data["file_path"],
  585. "track_id": track_id,
  586. }
  587. if content_data.get("content_hash"):
  588. base["content_hash"] = content_data["content_hash"]
  589. metadata: dict[str, Any] = {}
  590. options_str = content_data.get("process_options") or ""
  591. if options_str:
  592. # Mirror process_options into doc_status.metadata so admin UIs
  593. # can surface the per-document strategy without a full_docs lookup.
  594. metadata["process_options"] = options_str
  595. source_file_name = content_data.get("source_file_name")
  596. if source_file_name:
  597. metadata["source_file_name"] = source_file_name
  598. if metadata:
  599. base["metadata"] = metadata
  600. return base
  601. new_docs: dict[str, Any] = {
  602. id_: _initial_doc_status(content_data)
  603. for id_, content_data in contents.items()
  604. }
  605. # Serialise the dedup-read-then-upsert critical section across
  606. # concurrent enqueue calls within the same workspace. Without
  607. # this, two enqueues for the same content (e.g. /upload during
  608. # scan's processing phase, or two uploads via /text + /upload)
  609. # can both read doc_status before either upserts, both miss the
  610. # content_hash dedup, and both end up writing PENDING rows for
  611. # the same content — bypassing the dedup that's supposed to
  612. # land one of them as ``duplicate_kind=content_hash`` FAILED.
  613. #
  614. # The lock is workspace-scoped and only spans steps 3-4 below
  615. # (filter_keys → upserts). It does NOT block concurrent
  616. # processing (``apipeline_process_enqueue_documents`` reads
  617. # doc_status independently) or scan classification
  618. # (``scanning_exclusive`` already gates concurrent enqueue).
  619. # Lock order: enqueue_serialize → pipeline_status_lock (the
  620. # request_pending nudge inside is fine; no caller holds
  621. # pipeline_status_lock first then needs enqueue_serialize).
  622. enqueue_serialize_lock = get_namespace_lock(
  623. "enqueue_serialize", workspace=self.workspace
  624. )
  625. async with enqueue_serialize_lock:
  626. # 3. Filter out already processed documents
  627. # Get docs ids
  628. all_new_doc_ids = set(new_docs.keys())
  629. # Exclude IDs of documents that are already enqueued. The previous
  630. # ``reprocess_existing_non_processed`` flag has been removed: any
  631. # same-name record (regardless of status) is treated as a duplicate
  632. # here. Recovering half-processed documents is now the job of the
  633. # pipeline's resume logic, which runs in apipeline_process_enqueue_documents
  634. # rather than this enqueue path.
  635. unique_new_doc_ids = await self.doc_status.filter_keys(all_new_doc_ids)
  636. for doc_id in list(unique_new_doc_ids):
  637. content_data = contents[doc_id]
  638. # 3a. Filename-based dedup: same basename always treated as duplicate.
  639. match = await get_existing_doc_by_file_basename(
  640. self.doc_status, content_data["file_path"]
  641. )
  642. if match:
  643. existing_doc_id, existing_doc = match
  644. unique_new_doc_ids.discard(doc_id)
  645. duplicate_attempts.append(
  646. {
  647. "doc_id": doc_id,
  648. "original_doc_id": existing_doc_id,
  649. "file_path": content_data["file_path"],
  650. "content_length": new_docs.get(doc_id, {}).get(
  651. "content_length", 0
  652. ),
  653. "existing_status": doc_status_field(
  654. existing_doc, "status", "unknown"
  655. ),
  656. "existing_track_id": doc_status_field(
  657. existing_doc, "track_id", ""
  658. ),
  659. "duplicate_kind": "filename",
  660. }
  661. )
  662. continue
  663. # 3b. Content-hash dedup: different filename but same body still dupes.
  664. content_hash = content_data.get("content_hash")
  665. if not content_hash:
  666. continue
  667. hash_match = await get_existing_doc_by_content_hash(
  668. self.doc_status, content_hash
  669. )
  670. if hash_match:
  671. existing_doc_id, existing_doc = hash_match
  672. unique_new_doc_ids.discard(doc_id)
  673. duplicate_attempts.append(
  674. {
  675. "doc_id": doc_id,
  676. "original_doc_id": existing_doc_id,
  677. "file_path": content_data["file_path"],
  678. "content_length": new_docs.get(doc_id, {}).get(
  679. "content_length", 0
  680. ),
  681. "existing_status": doc_status_field(
  682. existing_doc, "status", "unknown"
  683. ),
  684. "existing_track_id": doc_status_field(
  685. existing_doc, "track_id", ""
  686. ),
  687. "duplicate_kind": "content_hash",
  688. }
  689. )
  690. # Handle duplicate documents - create trackable records with current track_id
  691. ignored_ids = list(all_new_doc_ids - unique_new_doc_ids)
  692. for doc_id in ignored_ids:
  693. if any(
  694. attempt.get("doc_id") == doc_id for attempt in duplicate_attempts
  695. ):
  696. continue
  697. existing_doc = await self.doc_status.get_by_id(doc_id)
  698. duplicate_attempts.append(
  699. {
  700. "doc_id": doc_id,
  701. "original_doc_id": doc_id,
  702. "file_path": new_docs.get(doc_id, {}).get(
  703. "file_path", "unknown_source"
  704. ),
  705. "content_length": new_docs.get(doc_id, {}).get(
  706. "content_length", 0
  707. ),
  708. "existing_status": (
  709. existing_doc.get("status", "unknown")
  710. if existing_doc
  711. else "unknown"
  712. ),
  713. "existing_track_id": (
  714. existing_doc.get("track_id", "") if existing_doc else ""
  715. ),
  716. "duplicate_kind": "filename",
  717. }
  718. )
  719. if duplicate_attempts:
  720. duplicate_docs: dict[str, Any] = {}
  721. for index, attempt in enumerate(duplicate_attempts):
  722. doc_id = attempt["doc_id"]
  723. file_path = attempt.get("file_path") or "unknown_source"
  724. duplicate_kind = attempt.get("duplicate_kind") or "filename"
  725. logger.warning(
  726. f"Duplicate document detected ({duplicate_kind}): "
  727. f"{doc_id} ({file_path})"
  728. )
  729. # Create a new record with unique ID for this duplicate attempt
  730. dup_record_id = compute_mdhash_id(
  731. f"{doc_id}-{track_id}-{index}-{file_path}", prefix="dup-"
  732. )
  733. if duplicate_kind == "content_hash":
  734. error_prefix = (
  735. "Identical content already exists under another filename."
  736. )
  737. else:
  738. error_prefix = "File name already exists."
  739. duplicate_docs[dup_record_id] = {
  740. "status": DocStatus.FAILED,
  741. "content_summary": (
  742. f"[DUPLICATE:{duplicate_kind}] Original document: "
  743. f"{attempt.get('original_doc_id', doc_id)}"
  744. ),
  745. "content_length": attempt.get("content_length", 0),
  746. "chunks_count": 0,
  747. "chunks_list": [],
  748. "created_at": datetime.now(timezone.utc).isoformat(),
  749. "updated_at": datetime.now(timezone.utc).isoformat(),
  750. "file_path": file_path,
  751. "track_id": track_id, # Use current track_id for tracking
  752. "error_msg": (
  753. f"{error_prefix} "
  754. f"Original doc_id: {attempt.get('original_doc_id', doc_id)}, "
  755. f"Status: {attempt.get('existing_status', 'unknown')}"
  756. ),
  757. "metadata": {
  758. "is_duplicate": True,
  759. "duplicate_kind": duplicate_kind,
  760. "original_doc_id": attempt.get("original_doc_id", doc_id),
  761. "original_track_id": attempt.get("existing_track_id", ""),
  762. },
  763. }
  764. # Store duplicate records in doc_status
  765. if duplicate_docs:
  766. await self.doc_status.upsert(duplicate_docs)
  767. logger.info(
  768. f"Created {len(duplicate_docs)} duplicate document records with track_id: {track_id}"
  769. )
  770. # Flush lightrag-format I/O failures as FAILED stubs. Done
  771. # inside the critical section so concurrent enqueues either see
  772. # the failure rows in full or not at all, and so a subsequent
  773. # /documents/scan finds the stub-without-full_docs combination
  774. # that document_routes treats as "delete and re-extract".
  775. if lightrag_load_errors:
  776. await self.apipeline_enqueue_error_documents(
  777. lightrag_load_errors, track_id=track_id
  778. )
  779. # Filter new_docs to only include documents with unique IDs
  780. new_docs = {
  781. doc_id: new_docs[doc_id]
  782. for doc_id in unique_new_doc_ids
  783. if doc_id in new_docs
  784. }
  785. if not new_docs:
  786. logger.warning("No new unique documents were found.")
  787. # If FAILED stubs were just flushed (lightrag-format I/O
  788. # errors), the caller needs the track_id to query their
  789. # status; a bare ``return None`` would also be interpreted
  790. # by document_routes upload paths as "all duplicate —
  791. # archive the source", silently hiding the failure.
  792. if lightrag_load_errors:
  793. return track_id
  794. return
  795. # 4. Store document content in full_docs and status in doc_status
  796. full_docs_data = {
  797. doc_id: {
  798. "content": contents[doc_id].get("content", ""),
  799. "file_path": contents[doc_id]["file_path"],
  800. "parse_format": contents[doc_id].get(
  801. "parse_format", FULL_DOCS_FORMAT_RAW
  802. ),
  803. }
  804. for doc_id in new_docs.keys()
  805. }
  806. for doc_id in new_docs.keys():
  807. if contents[doc_id].get("content_hash"):
  808. full_docs_data[doc_id]["content_hash"] = contents[doc_id][
  809. "content_hash"
  810. ]
  811. if contents[doc_id].get("sidecar_location"):
  812. full_docs_data[doc_id]["sidecar_location"] = contents[doc_id][
  813. "sidecar_location"
  814. ]
  815. if contents[doc_id].get("parse_engine"):
  816. full_docs_data[doc_id]["parse_engine"] = contents[doc_id][
  817. "parse_engine"
  818. ]
  819. if contents[doc_id].get("process_options"):
  820. full_docs_data[doc_id]["process_options"] = contents[doc_id][
  821. "process_options"
  822. ]
  823. # ``chunk_options`` is always populated by ``_add_content``
  824. # at enqueue time so it's persisted unconditionally.
  825. if contents[doc_id].get("chunk_options") is not None:
  826. full_docs_data[doc_id]["chunk_options"] = contents[doc_id][
  827. "chunk_options"
  828. ]
  829. await self.full_docs.upsert(full_docs_data)
  830. # Persist data to disk immediately
  831. await self.full_docs.index_done_callback()
  832. # Store document status (without content)
  833. await self.doc_status.upsert(new_docs)
  834. logger.debug(f"Stored {len(new_docs)} new unique documents")
  835. # Notify any in-flight processing loop that new work has arrived.
  836. # The loop checks ``request_pending`` after each batch and will
  837. # re-query doc_status to pick up these PENDING rows. Without
  838. # this nudge a caller that does not subsequently call
  839. # ``apipeline_process_enqueue_documents`` (or whose call races
  840. # with the loop's just-finished batch) could leave the new docs
  841. # stranded until the next unrelated trigger.
  842. async with pipeline_status_lock:
  843. if pipeline_status.get("busy"):
  844. pipeline_status["request_pending"] = True
  845. return track_id
  846. async def apipeline_enqueue_error_documents(
  847. self,
  848. error_files: list[dict[str, Any]],
  849. track_id: str | None = None,
  850. ) -> None:
  851. """
  852. Record file extraction errors in doc_status storage.
  853. This function creates error document entries in the doc_status storage for files
  854. that failed during the extraction process. Each error entry contains information
  855. about the failure to help with debugging and monitoring.
  856. Args:
  857. error_files: List of dictionaries containing error information for each failed file.
  858. Each dictionary should contain:
  859. - file_path: Original file name/path
  860. - error_description: Brief error description (for content_summary)
  861. - original_error: Full error message (for error_msg)
  862. - file_size: File size in bytes (for content_length, 0 if unknown)
  863. track_id: Optional tracking ID for grouping related operations
  864. Returns:
  865. None
  866. """
  867. if not error_files:
  868. logger.debug("No error files to record")
  869. return
  870. # Generate track_id if not provided
  871. if track_id is None or track_id.strip() == "":
  872. track_id = generate_track_id("error")
  873. error_docs: dict[str, Any] = {}
  874. current_time = datetime.now(timezone.utc).isoformat()
  875. for error_file in error_files:
  876. file_path = normalize_document_file_path(
  877. error_file.get("file_path", "unknown_file")
  878. )
  879. error_description = error_file.get(
  880. "error_description", "File extraction failed"
  881. )
  882. original_error = error_file.get("original_error", "Unknown error")
  883. file_size = error_file.get("file_size", 0)
  884. # Generate unique doc_id with "error-" prefix
  885. doc_id_content = f"{file_path}-{error_description}"
  886. doc_id = compute_mdhash_id(doc_id_content, prefix="error-")
  887. error_docs[doc_id] = {
  888. "status": DocStatus.FAILED,
  889. "content_summary": error_description,
  890. "content_length": file_size,
  891. "error_msg": original_error,
  892. "chunks_count": 0, # No chunks for failed files
  893. "chunks_list": [],
  894. "created_at": current_time,
  895. "updated_at": current_time,
  896. "file_path": file_path,
  897. "track_id": track_id,
  898. "metadata": {
  899. "error_type": "file_extraction_error",
  900. },
  901. }
  902. # Store error documents in doc_status
  903. if error_docs:
  904. await self.doc_status.upsert(error_docs)
  905. # Log each error for debugging
  906. for doc_id, error_doc in error_docs.items():
  907. logger.error(
  908. f"File processing error: - ID: {doc_id} {error_doc['file_path']}"
  909. )
  910. async def apipeline_process_enqueue_documents(self) -> None:
  911. """
  912. Process pending documents by splitting them into chunks, processing
  913. each chunk for entity and relation extraction, and updating the
  914. document status.
  915. 1. Get all pending, failed, and abnormally terminated processing documents.
  916. 2. Validate document data consistency and fix any issues
  917. 3. Split document content into chunks
  918. 4. Process each chunk for entity and relation extraction
  919. 5. Update the document status
  920. """
  921. pipeline_status = await get_namespace_data(
  922. "pipeline_status", workspace=self.workspace
  923. )
  924. pipeline_status_lock = get_namespace_lock(
  925. "pipeline_status", workspace=self.workspace
  926. )
  927. async with pipeline_status_lock:
  928. # Ensure only one worker is processing documents
  929. if not pipeline_status.get("busy", False):
  930. to_process_docs: dict[
  931. str, DocProcessingStatus
  932. ] = await self.doc_status.get_docs_by_statuses(
  933. list(_INFLIGHT_DOC_STATUSES)
  934. )
  935. if not to_process_docs:
  936. logger.info("No documents to process")
  937. return
  938. pipeline_status.update(
  939. {
  940. "busy": True,
  941. "job_name": "Default Job",
  942. "job_start": datetime.now(timezone.utc).isoformat(),
  943. "docs": 0,
  944. "batchs": 0, # Total number of files to be processed
  945. "cur_batch": 0, # Number of files already processed
  946. "request_pending": False, # Clear any previous request
  947. "cancellation_requested": False, # Initialize cancellation flag
  948. "latest_message": "",
  949. }
  950. )
  951. # Cleaning history_messages without breaking it as a shared list object
  952. del pipeline_status["history_messages"][:]
  953. else:
  954. # Another process is busy, just set request flag and return
  955. pipeline_status["request_pending"] = True
  956. logger.info(
  957. "Another process is already processing the document queue. Request queued."
  958. )
  959. return
  960. # Tracks whether the loop has already released ``busy`` under
  961. # the same critical section that observed request_pending=False.
  962. # This makes the exit handoff atomic: a concurrent enqueue can
  963. # either set request_pending BEFORE we release (in which case
  964. # the loop continues with a fresh snapshot) or AFTER (in which
  965. # case it sees busy=False and starts a new loop via its own
  966. # process_enqueue call). Without this, a small window between
  967. # "loop reads request_pending=False" and "finally clears busy"
  968. # could strand newly-enqueued PENDING docs.
  969. busy_released_in_loop = False
  970. try:
  971. # Process documents until no more documents or requests
  972. while True:
  973. # Check for cancellation request at the start of main loop
  974. async with pipeline_status_lock:
  975. if pipeline_status.get("cancellation_requested", False):
  976. pipeline_status["request_pending"] = False
  977. pipeline_status["cancellation_requested"] = False
  978. log_message = "Pipeline cancelled by user"
  979. logger.info(log_message)
  980. pipeline_status["latest_message"] = log_message
  981. pipeline_status["history_messages"].append(log_message)
  982. # Exit directly, skipping request_pending check
  983. return
  984. if not to_process_docs:
  985. log_message = "All enqueued documents have been processed"
  986. logger.info(log_message)
  987. pipeline_status["latest_message"] = log_message
  988. pipeline_status["history_messages"].append(log_message)
  989. if await self._atomic_release_busy_or_consume_pending(
  990. pipeline_status, pipeline_status_lock
  991. ):
  992. busy_released_in_loop = True
  993. break
  994. to_process_docs = await self.doc_status.get_docs_by_statuses(
  995. list(_INFLIGHT_DOC_STATUSES)
  996. )
  997. continue
  998. # Validate document data consistency and fix any issues
  999. to_process_docs = await self._validate_and_fix_document_consistency(
  1000. to_process_docs, pipeline_status, pipeline_status_lock
  1001. )
  1002. if not to_process_docs:
  1003. log_message = (
  1004. "No valid documents to process after consistency check"
  1005. )
  1006. logger.info(log_message)
  1007. pipeline_status["latest_message"] = log_message
  1008. pipeline_status["history_messages"].append(log_message)
  1009. if await self._atomic_release_busy_or_consume_pending(
  1010. pipeline_status, pipeline_status_lock
  1011. ):
  1012. busy_released_in_loop = True
  1013. break
  1014. to_process_docs = await self.doc_status.get_docs_by_statuses(
  1015. list(_INFLIGHT_DOC_STATUSES)
  1016. )
  1017. continue
  1018. log_message = f"Processing {len(to_process_docs)} document(s)"
  1019. logger.info(log_message)
  1020. pipeline_status["docs"] = len(to_process_docs)
  1021. pipeline_status["batchs"] = len(to_process_docs)
  1022. pipeline_status["cur_batch"] = 0
  1023. pipeline_status["latest_message"] = log_message
  1024. pipeline_status["history_messages"].append(log_message)
  1025. await self._run_pipeline_batch(
  1026. to_process_docs,
  1027. pipeline_status=pipeline_status,
  1028. pipeline_status_lock=pipeline_status_lock,
  1029. )
  1030. # Atomic exit handoff: if request_pending was set during
  1031. # this batch (e.g. a concurrent enqueue while busy=True),
  1032. # clear it and refetch. Otherwise release ``busy`` under
  1033. # the SAME lock so a concurrent enqueue cannot squeeze a
  1034. # request_pending=True past us into a now-stranded state.
  1035. if await self._atomic_release_busy_or_consume_pending(
  1036. pipeline_status, pipeline_status_lock
  1037. ):
  1038. busy_released_in_loop = True
  1039. break
  1040. log_message = "Processing additional documents due to pending request"
  1041. logger.info(log_message)
  1042. pipeline_status["latest_message"] = log_message
  1043. pipeline_status["history_messages"].append(log_message)
  1044. # Check for pending documents again
  1045. to_process_docs = await self.doc_status.get_docs_by_statuses(
  1046. list(_INFLIGHT_DOC_STATUSES)
  1047. )
  1048. finally:
  1049. log_message = "Enqueued document processing pipeline stopped"
  1050. logger.info(log_message)
  1051. # If the loop already released ``busy`` under the atomic exit
  1052. # check, don't clobber it here — a concurrent enqueue may have
  1053. # observed busy=False and started a new processing pass that
  1054. # has set busy=True for itself. Cancellation flag and log
  1055. # bookkeeping are always safe to update.
  1056. async with pipeline_status_lock:
  1057. if not busy_released_in_loop:
  1058. pipeline_status["busy"] = False
  1059. pipeline_status["cancellation_requested"] = (
  1060. False # Always reset cancellation flag
  1061. )
  1062. pipeline_status["latest_message"] = log_message
  1063. pipeline_status["history_messages"].append(log_message)
  1064. # ============================================================
  1065. # Pipeline orchestration
  1066. # ============================================================
  1067. async def _run_pipeline_batch(
  1068. self,
  1069. to_process_docs: dict[str, DocProcessingStatus],
  1070. *,
  1071. pipeline_status: dict,
  1072. pipeline_status_lock,
  1073. ) -> None:
  1074. """Run one batch of pending documents through the parse → analyze →
  1075. process queues.
  1076. Three cascading layers of queues:
  1077. - Layer 1: Content Parsing (parse_native / parse_mineru / parse_docling)
  1078. - Layer 2: Multimodal Analyze (analyze_multimodal)
  1079. - Layer 3: Entity / Relation Extraction (process_single_document)
  1080. """
  1081. total_files = len(to_process_docs)
  1082. pipeline_status["job_name"] = self._format_job_name(
  1083. to_process_docs, total_files
  1084. )
  1085. ctx = _BatchRunContext(
  1086. pipeline_status=pipeline_status,
  1087. pipeline_status_lock=pipeline_status_lock,
  1088. semaphore=asyncio.Semaphore(self.max_parallel_insert),
  1089. total_files=total_files,
  1090. q_native=asyncio.Queue(maxsize=self.queue_size_default),
  1091. q_mineru=asyncio.Queue(maxsize=self.queue_size_default),
  1092. q_docling=asyncio.Queue(maxsize=self.queue_size_default),
  1093. q_analyze=asyncio.Queue(maxsize=self.queue_size_default),
  1094. q_process=asyncio.Queue(maxsize=self.queue_size_insert),
  1095. )
  1096. workers: list[asyncio.Task] = []
  1097. for _ in range(max(1, self.max_parallel_parse_native)):
  1098. workers.append(
  1099. asyncio.create_task(self._parse_worker("native", ctx.q_native, ctx))
  1100. )
  1101. for _ in range(max(1, self.max_parallel_parse_mineru)):
  1102. workers.append(
  1103. asyncio.create_task(self._parse_worker("mineru", ctx.q_mineru, ctx))
  1104. )
  1105. for _ in range(max(1, self.max_parallel_parse_docling)):
  1106. workers.append(
  1107. asyncio.create_task(self._parse_worker("docling", ctx.q_docling, ctx))
  1108. )
  1109. for _ in range(max(1, self.max_parallel_analyze)):
  1110. workers.append(asyncio.create_task(self._analyze_worker(ctx)))
  1111. for _ in range(max(1, self.max_parallel_insert)):
  1112. workers.append(asyncio.create_task(self._process_worker(ctx)))
  1113. # Add pending files to the correct parsing queue
  1114. for doc_id, status_doc in to_process_docs.items():
  1115. content_data = await self.full_docs.get_by_id(doc_id) or {}
  1116. engine = resolve_stored_document_parser_engine(
  1117. file_path=getattr(status_doc, "file_path", "unknown_source"),
  1118. content_data=content_data,
  1119. )
  1120. if engine == "mineru":
  1121. await ctx.q_mineru.put((doc_id, status_doc))
  1122. elif engine == "docling":
  1123. await ctx.q_docling.put((doc_id, status_doc))
  1124. else:
  1125. await ctx.q_native.put((doc_id, status_doc))
  1126. await asyncio.gather(
  1127. ctx.q_native.join(), ctx.q_mineru.join(), ctx.q_docling.join()
  1128. )
  1129. await ctx.q_analyze.join()
  1130. await ctx.q_process.join()
  1131. for w in workers:
  1132. w.cancel()
  1133. await asyncio.gather(*workers, return_exceptions=True)
  1134. async def _validate_and_fix_document_consistency(
  1135. self,
  1136. to_process_docs: dict[str, DocProcessingStatus],
  1137. pipeline_status: dict,
  1138. pipeline_status_lock: asyncio.Lock,
  1139. ) -> dict[str, DocProcessingStatus]:
  1140. """Validate and fix document data consistency by deleting inconsistent entries, but preserve failed documents"""
  1141. inconsistent_docs = []
  1142. failed_docs_to_preserve = []
  1143. successful_deletions = 0
  1144. # Check each document's data consistency
  1145. for doc_id, status_doc in to_process_docs.items():
  1146. # Check if corresponding content exists in full_docs
  1147. content_data = await self.full_docs.get_by_id(doc_id)
  1148. if not content_data:
  1149. # Check if this is a failed document that should be preserved
  1150. if (
  1151. hasattr(status_doc, "status")
  1152. and status_doc.status == DocStatus.FAILED
  1153. ):
  1154. failed_docs_to_preserve.append(doc_id)
  1155. else:
  1156. inconsistent_docs.append(doc_id)
  1157. # Log information about failed documents that will be preserved
  1158. if failed_docs_to_preserve:
  1159. async with pipeline_status_lock:
  1160. preserve_message = f"Preserving {len(failed_docs_to_preserve)} failed document entries for manual review"
  1161. logger.info(preserve_message)
  1162. pipeline_status["latest_message"] = preserve_message
  1163. pipeline_status["history_messages"].append(preserve_message)
  1164. # Remove failed documents from processing list but keep them in doc_status
  1165. for doc_id in failed_docs_to_preserve:
  1166. to_process_docs.pop(doc_id, None)
  1167. # Delete inconsistent document entries(excluding failed documents)
  1168. if inconsistent_docs:
  1169. async with pipeline_status_lock:
  1170. summary_message = (
  1171. f"Inconsistent document entries found: {len(inconsistent_docs)}"
  1172. )
  1173. logger.info(summary_message)
  1174. pipeline_status["latest_message"] = summary_message
  1175. pipeline_status["history_messages"].append(summary_message)
  1176. successful_deletions = 0
  1177. for doc_id in inconsistent_docs:
  1178. try:
  1179. status_doc = to_process_docs[doc_id]
  1180. file_path = resolve_doc_file_path(status_doc=status_doc)
  1181. # Delete doc_status entry
  1182. await self.doc_status.delete([doc_id])
  1183. successful_deletions += 1
  1184. # Log successful deletion
  1185. async with pipeline_status_lock:
  1186. log_message = (
  1187. f"Deleted inconsistent entry: {doc_id} ({file_path})"
  1188. )
  1189. logger.info(log_message)
  1190. pipeline_status["latest_message"] = log_message
  1191. pipeline_status["history_messages"].append(log_message)
  1192. # Remove from processing list
  1193. to_process_docs.pop(doc_id, None)
  1194. except Exception as e:
  1195. # Log deletion failure
  1196. async with pipeline_status_lock:
  1197. error_message = f"Failed to delete entry: {doc_id} - {str(e)}"
  1198. logger.error(error_message)
  1199. pipeline_status["latest_message"] = error_message
  1200. pipeline_status["history_messages"].append(error_message)
  1201. # Final summary log
  1202. # async with pipeline_status_lock:
  1203. # final_message = f"Successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents"
  1204. # logger.info(final_message)
  1205. # pipeline_status["latest_message"] = final_message
  1206. # pipeline_status["history_messages"].append(final_message)
  1207. # Reset interrupted documents that pass consistency checks to PENDING status
  1208. docs_to_reset = {}
  1209. reset_count = 0
  1210. for doc_id, status_doc in to_process_docs.items():
  1211. # Check if document has corresponding content in full_docs (consistency check)
  1212. content_data = await self.full_docs.get_by_id(doc_id)
  1213. if content_data: # Document passes consistency check
  1214. # Check if document is in interrupted status
  1215. if hasattr(status_doc, "status") and status_doc.status in [
  1216. DocStatus.PROCESSING,
  1217. DocStatus.FAILED,
  1218. DocStatus.PARSING,
  1219. DocStatus.ANALYZING,
  1220. ]:
  1221. preserved_chunks_list, preserved_chunks_count = (
  1222. chunk_fields_from_status_doc(status_doc)
  1223. )
  1224. resolved_file_path = resolve_doc_file_path(
  1225. status_doc=status_doc,
  1226. content_data=content_data,
  1227. )
  1228. # Prepare document for status reset to PENDING
  1229. docs_to_reset[doc_id] = {
  1230. "status": DocStatus.PENDING,
  1231. "content_summary": status_doc.content_summary,
  1232. "content_length": status_doc.content_length,
  1233. "chunks_count": preserved_chunks_count,
  1234. "chunks_list": preserved_chunks_list,
  1235. "created_at": status_doc.created_at,
  1236. "updated_at": datetime.now(timezone.utc).isoformat(),
  1237. "file_path": resolved_file_path,
  1238. "track_id": getattr(status_doc, "track_id", ""),
  1239. "content_hash": getattr(status_doc, "content_hash", None),
  1240. # Clear transient error / processing fields but preserve
  1241. # long-lived per-doc metadata (process_options) seeded
  1242. # at enqueue time.
  1243. "error_msg": "",
  1244. "metadata": doc_status_transition_metadata(status_doc),
  1245. }
  1246. # Update the status in to_process_docs as well
  1247. status_doc.status = DocStatus.PENDING
  1248. status_doc.file_path = resolved_file_path
  1249. reset_count += 1
  1250. # Update doc_status storage if there are documents to reset
  1251. if docs_to_reset:
  1252. await self.doc_status.upsert(docs_to_reset)
  1253. async with pipeline_status_lock:
  1254. reset_message = (
  1255. f"Reset {reset_count} documents from "
  1256. "PARSING/ANALYZING/PROCESSING/FAILED to PENDING status"
  1257. )
  1258. logger.info(reset_message)
  1259. pipeline_status["latest_message"] = reset_message
  1260. pipeline_status["history_messages"].append(reset_message)
  1261. return to_process_docs
  1262. async def _atomic_release_busy_or_consume_pending(
  1263. self,
  1264. pipeline_status: dict,
  1265. pipeline_status_lock,
  1266. ) -> bool:
  1267. """Atomically decide whether to release ``busy`` or consume a
  1268. pending request.
  1269. Closes the loop-exit handoff race: a concurrent enqueue that
  1270. sets ``request_pending`` while the processing loop is on its
  1271. way out will be observed in the same critical section that
  1272. releases ``busy``, so the loop sees it and refetches instead
  1273. of stranding the new doc in PENDING.
  1274. Returns:
  1275. True when ``busy`` has been cleared under the same lock
  1276. that observed ``request_pending=False`` — caller must
  1277. break out of the loop and skip clearing ``busy`` in its
  1278. finally block.
  1279. False when ``request_pending`` was set: the flag is
  1280. cleared and the caller must refetch ``doc_status`` and
  1281. continue the loop.
  1282. """
  1283. async with pipeline_status_lock:
  1284. if pipeline_status.get("request_pending", False):
  1285. pipeline_status["request_pending"] = False
  1286. return False
  1287. pipeline_status["busy"] = False
  1288. return True
  1289. @staticmethod
  1290. def _format_job_name(
  1291. to_process_docs: dict[str, DocProcessingStatus],
  1292. total_files: int,
  1293. ) -> str:
  1294. """Build the ``job_name`` shown in pipeline_status for one batch."""
  1295. first_doc = next(iter(to_process_docs.values()))
  1296. first_doc_path = first_doc.file_path
  1297. if first_doc_path:
  1298. path_prefix = first_doc_path[:20] + (
  1299. "..." if len(first_doc_path) > 20 else ""
  1300. )
  1301. else:
  1302. path_prefix = "unknown_source"
  1303. return f"{path_prefix}[{total_files} files]"
  1304. # ============================================================
  1305. # Cascading queue workers (Layer 1 -> 2 -> 3)
  1306. # ============================================================
  1307. async def _parse_worker(
  1308. self,
  1309. engine: str,
  1310. in_q: asyncio.Queue,
  1311. ctx: _BatchRunContext,
  1312. ) -> None:
  1313. """Layer 1 worker: consume (doc_id, status_doc) and emit parsed data.
  1314. Marks PARSING, runs the engine-specific parser (mineru / docling /
  1315. native), refreshes ``content_hash`` if the parser patched it, and
  1316. either short-circuits via ``_mark_duplicate_after_parse`` or hands
  1317. off to ``q_analyze``. Writes FAILED on exception.
  1318. """
  1319. while True:
  1320. item = await in_q.get()
  1321. try:
  1322. doc_id_w, status_doc_w = item
  1323. file_path_w = getattr(status_doc_w, "file_path", "unknown_source")
  1324. # Boundary cancellation check: skip parsing the next queued doc
  1325. # without invoking the engine, mark it FAILED with a friendly
  1326. # "User cancelled" message, and let the finally task_done()
  1327. # drain the queue so q.join() in _run_pipeline_batch returns.
  1328. if await self._cancellation_requested(
  1329. ctx.pipeline_status, ctx.pipeline_status_lock
  1330. ):
  1331. await self._mark_doc_cancelled_in_stage(
  1332. doc_id=doc_id_w,
  1333. status_doc=status_doc_w,
  1334. file_path=file_path_w,
  1335. stage_label="parse",
  1336. pipeline_status=ctx.pipeline_status,
  1337. pipeline_status_lock=ctx.pipeline_status_lock,
  1338. )
  1339. continue
  1340. content_data_w = await self.full_docs.get_by_id(doc_id_w)
  1341. if not content_data_w:
  1342. raise Exception(
  1343. f"Document content not found in full_docs for doc_id: {doc_id_w}"
  1344. )
  1345. if isinstance(status_doc_w.metadata, dict):
  1346. source_file_name_w = status_doc_w.metadata.get("source_file_name")
  1347. if source_file_name_w and not content_data_w.get(
  1348. "source_file_name"
  1349. ):
  1350. content_data_w["source_file_name"] = source_file_name_w
  1351. # Stamp parsing_start_time on the in-memory status_doc so
  1352. # carry-over (_DOC_STATUS_METADATA_CARRY_OVER_KEYS) writes it
  1353. # into doc_status here and preserves it across every
  1354. # subsequent state transition for stage-duration analysis.
  1355. if not isinstance(status_doc_w.metadata, dict):
  1356. status_doc_w.metadata = {}
  1357. # Drop stale per-attempt fields from any prior failed/retried
  1358. # attempt before stamping the new parsing_start_time. All of
  1359. # these are written by either this worker (cache-hit /
  1360. # cache-miss branches below) or the downstream analyze worker,
  1361. # and would otherwise be carried forward via carry-over,
  1362. # skewing stage-duration metrics and the raw-cache-hit /
  1363. # skipped signals for the new attempt. The cache-hit mirror
  1364. # block only writes ``parse_stage_skipped`` when the parser
  1365. # actually returns a hit; the cache-miss branch only writes
  1366. # ``parsing_end_time`` when parse actually runs; the analyze
  1367. # worker writes its pair on re-entry.
  1368. for _stale_key in (
  1369. "parsing_end_time",
  1370. "parse_stage_skipped",
  1371. "analyzing_start_time",
  1372. "analyzing_end_time",
  1373. "analyzing_stage_skipped",
  1374. ):
  1375. status_doc_w.metadata.pop(_stale_key, None)
  1376. status_doc_w.metadata["parsing_start_time"] = int(time.time())
  1377. await self._upsert_doc_status_transition(
  1378. doc_id=doc_id_w,
  1379. status=DocStatus.PARSING,
  1380. status_doc=status_doc_w,
  1381. file_path=file_path_w,
  1382. )
  1383. async with ctx.pipeline_status_lock:
  1384. log_message = f"Parsing ({engine}): {doc_id_w}"
  1385. logger.info(log_message)
  1386. ctx.pipeline_status["latest_message"] = log_message
  1387. ctx.pipeline_status["history_messages"].append(log_message)
  1388. if engine == "mineru":
  1389. parsed_data_w = await self.parse_mineru(
  1390. doc_id_w, file_path_w, content_data_w
  1391. )
  1392. elif engine == "docling":
  1393. parsed_data_w = await self.parse_docling(
  1394. doc_id_w, file_path_w, content_data_w
  1395. )
  1396. else:
  1397. parsed_data_w = await self.parse_native(
  1398. doc_id_w, file_path_w, content_data_w
  1399. )
  1400. # Mirror non-fatal parser warnings (e.g. legacy docx tables
  1401. # missing w14:paraId) onto the in-memory status_doc so the
  1402. # ANALYZING / PROCESSING / PROCESSED / FAILED upserts carry
  1403. # the field through ``doc_status_transition_metadata``.
  1404. parse_warnings_payload_w = parsed_data_w.get("parse_warnings")
  1405. if parse_warnings_payload_w:
  1406. if not isinstance(status_doc_w.metadata, dict):
  1407. status_doc_w.metadata = {}
  1408. status_doc_w.metadata["parse_warnings"] = parse_warnings_payload_w
  1409. # Mirror raw-bundle cache-hit flag from mineru/docling so the
  1410. # next upsert (ANALYZING) carries it into doc_status; cache-
  1411. # miss runs (including parse_native, which has no cache
  1412. # concept) stamp ``parsing_end_time`` instead so post-mortem
  1413. # can derive the parse-stage duration. The two fields are
  1414. # mutually exclusive per attempt.
  1415. if not isinstance(status_doc_w.metadata, dict):
  1416. status_doc_w.metadata = {}
  1417. if parsed_data_w.get("parse_stage_skipped"):
  1418. status_doc_w.metadata["parse_stage_skipped"] = True
  1419. else:
  1420. status_doc_w.metadata["parsing_end_time"] = int(time.time())
  1421. # parse_* may have patched content_hash for
  1422. # pending_parse → raw transitions.
  1423. refreshed = await self.doc_status.get_by_id(doc_id_w)
  1424. if refreshed:
  1425. refreshed_hash = (
  1426. refreshed.get("content_hash")
  1427. if isinstance(refreshed, dict)
  1428. else getattr(refreshed, "content_hash", None)
  1429. )
  1430. if refreshed_hash:
  1431. status_doc_w.content_hash = refreshed_hash
  1432. if await self._mark_duplicate_after_parse(
  1433. doc_id=doc_id_w,
  1434. status_doc=status_doc_w,
  1435. file_path=file_path_w,
  1436. content_hash=status_doc_w.content_hash,
  1437. content_length=len(parsed_data_w.get("content", "")),
  1438. content_data=content_data_w,
  1439. pipeline_status=ctx.pipeline_status,
  1440. pipeline_status_lock=ctx.pipeline_status_lock,
  1441. ):
  1442. continue
  1443. await ctx.q_analyze.put((doc_id_w, status_doc_w, parsed_data_w))
  1444. except PipelineCancelledException:
  1445. # Cancellation raised from inside the parse engine (future-
  1446. # proofing — engines don't currently call _raise_if_cancelled,
  1447. # but if they do, route through the same friendly message
  1448. # path as the boundary check above instead of the generic
  1449. # except block below.
  1450. await self._mark_doc_cancelled_in_stage(
  1451. doc_id=doc_id_w,
  1452. status_doc=status_doc_w,
  1453. file_path=getattr(status_doc_w, "file_path", "unknown_source"),
  1454. stage_label="parse",
  1455. pipeline_status=ctx.pipeline_status,
  1456. pipeline_status_lock=ctx.pipeline_status_lock,
  1457. )
  1458. except Exception as e:
  1459. logger.error(f"Parse worker failed ({engine}): {e}")
  1460. try:
  1461. await self._upsert_doc_status_transition(
  1462. doc_id=doc_id_w,
  1463. status=DocStatus.FAILED,
  1464. status_doc=status_doc_w,
  1465. file_path=getattr(status_doc_w, "file_path", "unknown_source"),
  1466. extra_fields={"error_msg": str(e)},
  1467. )
  1468. except Exception:
  1469. pass
  1470. finally:
  1471. in_q.task_done()
  1472. async def _analyze_worker(self, ctx: _BatchRunContext) -> None:
  1473. """Layer 2 worker: run multimodal analysis (VLM) and feed q_process.
  1474. Refreshes ``content_summary`` / ``content_length`` from the parsed
  1475. body (pending_parse → lightrag / raw documents start with empty
  1476. summary / zero length at enqueue) so PROCESSING / PROCESSED upserts
  1477. end up with real values.
  1478. """
  1479. while True:
  1480. item = await ctx.q_analyze.get()
  1481. try:
  1482. doc_id_w, status_doc_w, parsed_data_w = item
  1483. file_path_w = getattr(status_doc_w, "file_path", "unknown_source")
  1484. # Boundary cancellation check: same pattern as _parse_worker.
  1485. # Items already past PARSING that are still queued for analyze
  1486. # are short-circuited to FAILED here so the multimodal VLM
  1487. # path is not entered after the user clicked cancel.
  1488. if await self._cancellation_requested(
  1489. ctx.pipeline_status, ctx.pipeline_status_lock
  1490. ):
  1491. await self._mark_doc_cancelled_in_stage(
  1492. doc_id=doc_id_w,
  1493. status_doc=status_doc_w,
  1494. file_path=file_path_w,
  1495. stage_label="analyze",
  1496. pipeline_status=ctx.pipeline_status,
  1497. pipeline_status_lock=ctx.pipeline_status_lock,
  1498. )
  1499. continue
  1500. refreshed_content_w = parsed_data_w.get("content", "") or ""
  1501. refreshed_summary_w = get_content_summary(refreshed_content_w)
  1502. refreshed_length_w = len(refreshed_content_w)
  1503. status_doc_w.content_summary = refreshed_summary_w
  1504. status_doc_w.content_length = refreshed_length_w
  1505. # Stamp analyzing_start_time so per-stage durations stay
  1506. # derivable from doc_status even after PROCESSED / FAILED;
  1507. # carry-over preserves it across later upserts.
  1508. if not isinstance(status_doc_w.metadata, dict):
  1509. status_doc_w.metadata = {}
  1510. status_doc_w.metadata["analyzing_start_time"] = int(time.time())
  1511. await self._upsert_doc_status_transition(
  1512. doc_id=doc_id_w,
  1513. status=DocStatus.ANALYZING,
  1514. status_doc=status_doc_w,
  1515. file_path=file_path_w,
  1516. )
  1517. analyzed = await self.analyze_multimodal(
  1518. doc_id=doc_id_w,
  1519. file_path=file_path_w,
  1520. parsed_data=parsed_data_w,
  1521. pipeline_status=ctx.pipeline_status,
  1522. pipeline_status_lock=ctx.pipeline_status_lock,
  1523. )
  1524. # Mirror analyze-stage outcome as a 3-way decision so the
  1525. # ``analyzing_end_time`` stamp only ever lands on attempts
  1526. # that genuinely completed:
  1527. # - ``analyzing_stage_skipped`` (set by analyze_multimodal at
  1528. # its three early-return branches: no blocks_path, blocks
  1529. # file missing, no i/t/e options) → user/config skipped;
  1530. # stamp the skipped flag.
  1531. # - ``multimodal_processed`` (set by analyze_multimodal only
  1532. # after the full processing loop succeeds) → genuine
  1533. # completion; stamp ``analyzing_end_time``.
  1534. # - Neither flag → analyze_multimodal soft-swallowed an
  1535. # exception (generic ``except Exception``) or hit a
  1536. # malformed/empty sidecar early return. Failure is not a
  1537. # skip AND not a completion, so write neither field.
  1538. # The skipped/end_time pair is mutually exclusive.
  1539. if not isinstance(status_doc_w.metadata, dict):
  1540. status_doc_w.metadata = {}
  1541. if analyzed.pop("analyzing_stage_skipped", False):
  1542. status_doc_w.metadata["analyzing_stage_skipped"] = True
  1543. elif analyzed.get("multimodal_processed"):
  1544. status_doc_w.metadata["analyzing_end_time"] = int(time.time())
  1545. await ctx.q_process.put((doc_id_w, status_doc_w, analyzed))
  1546. except PipelineCancelledException:
  1547. # In-flight cancellation surfaced from analyze_multimodal
  1548. # (poll loop detected cancellation_requested mid-VLM).
  1549. # Route through the friendly message path so error_msg and
  1550. # history_messages match the boundary-check branch.
  1551. await self._mark_doc_cancelled_in_stage(
  1552. doc_id=doc_id_w,
  1553. status_doc=status_doc_w,
  1554. file_path=getattr(status_doc_w, "file_path", "unknown_source"),
  1555. stage_label="analyze",
  1556. pipeline_status=ctx.pipeline_status,
  1557. pipeline_status_lock=ctx.pipeline_status_lock,
  1558. )
  1559. except Exception as e:
  1560. # Mirror _parse_worker: failures here must transition the
  1561. # document to FAILED with a diagnostic ``error_msg``, otherwise
  1562. # MultimodalAnalysisError (raised by analyze_multimodal under
  1563. # the new hard-failure contract) would leave the doc stuck in
  1564. # ANALYZING forever.
  1565. logger.error(f"Analyze worker failed: {e}")
  1566. try:
  1567. await self._upsert_doc_status_transition(
  1568. doc_id=doc_id_w,
  1569. status=DocStatus.FAILED,
  1570. status_doc=status_doc_w,
  1571. file_path=getattr(status_doc_w, "file_path", "unknown_source"),
  1572. extra_fields={"error_msg": str(e)},
  1573. )
  1574. except Exception:
  1575. pass
  1576. finally:
  1577. ctx.q_analyze.task_done()
  1578. async def _process_worker(self, ctx: _BatchRunContext) -> None:
  1579. """Layer 3 worker: dispatch each ready document to single-doc processing."""
  1580. while True:
  1581. item = await ctx.q_process.get()
  1582. try:
  1583. doc_id_w, status_doc_w, parsed_data_w = item
  1584. await self.process_single_document(
  1585. doc_id=doc_id_w,
  1586. status_doc=status_doc_w,
  1587. parsed_data=parsed_data_w,
  1588. ctx=ctx,
  1589. )
  1590. finally:
  1591. ctx.q_process.task_done()
  1592. # ============================================================
  1593. # Single-document state machine
  1594. # ============================================================
  1595. async def process_single_document(
  1596. self,
  1597. *,
  1598. doc_id: str,
  1599. status_doc: DocProcessingStatus,
  1600. parsed_data: dict[str, Any],
  1601. ctx: _BatchRunContext,
  1602. ) -> None:
  1603. """Single-document state machine: chunking → KG extraction → merge.
  1604. Always invoked from ``_process_worker`` with ``parsed_data`` already
  1605. populated by ``_parse_worker`` + ``_analyze_worker``. Drives the
  1606. PROCESSING → PROCESSED state machine, with FAILED fallbacks at both
  1607. the extract and merge stage boundaries.
  1608. """
  1609. from lightrag.parser.routing import parse_process_options
  1610. file_path = resolve_doc_file_path(status_doc=status_doc)
  1611. current_file_number = 0
  1612. file_extraction_stage_ok = False
  1613. processing_start_time = int(time.time())
  1614. first_stage_tasks: list[asyncio.Task] = []
  1615. entity_relation_task: asyncio.Task | None = None
  1616. chunks: dict[str, Any] = {}
  1617. content_data: dict[str, Any] | None = None
  1618. extraction_meta: dict[str, Any] = {}
  1619. chunk_results: list = []
  1620. doc_process_opts = parse_process_options("")
  1621. def get_failed_chunk_snapshot() -> tuple[list[str], int]:
  1622. if chunks:
  1623. chunk_ids = list(chunks.keys())
  1624. return chunk_ids, len(chunk_ids)
  1625. return chunk_fields_from_status_doc(status_doc)
  1626. async with ctx.semaphore:
  1627. try:
  1628. # Resolve file_path from full_docs before honoring a queued
  1629. # cancellation so corrupted doc_status placeholders do not
  1630. # get written back again during retry/cancel flows.
  1631. content_data = await self.full_docs.get_by_id(doc_id)
  1632. if content_data:
  1633. file_path = resolve_doc_file_path(
  1634. status_doc=status_doc,
  1635. content_data=content_data,
  1636. )
  1637. status_doc.file_path = file_path
  1638. # Check for cancellation before starting document processing.
  1639. # file_path is resolved before this check so queued documents
  1640. # do not lose their source path on early cancellation.
  1641. await self._raise_if_cancelled(
  1642. ctx.pipeline_status, ctx.pipeline_status_lock
  1643. )
  1644. async with ctx.pipeline_status_lock:
  1645. ctx.processed_count += 1
  1646. current_file_number = ctx.processed_count
  1647. ctx.pipeline_status["cur_batch"] = ctx.processed_count
  1648. log_message = (
  1649. f"Extracting stage {current_file_number}/"
  1650. f"{ctx.total_files}: {file_path}"
  1651. )
  1652. logger.info(log_message)
  1653. ctx.pipeline_status["history_messages"].append(log_message)
  1654. log_message = f"Processing d-id: {doc_id}"
  1655. logger.info(log_message)
  1656. ctx.pipeline_status["latest_message"] = log_message
  1657. ctx.pipeline_status["history_messages"].append(log_message)
  1658. # Prevent memory growth: keep only latest 5000 messages
  1659. # when exceeding 10000. Trim in place so Manager.list-
  1660. # backed shared state remains appendable and visible
  1661. # across processes.
  1662. if len(ctx.pipeline_status["history_messages"]) > 10000:
  1663. logger.info(
  1664. f"Trimming pipeline history from {len(ctx.pipeline_status['history_messages'])} to 5000 messages"
  1665. )
  1666. del ctx.pipeline_status["history_messages"][:-5000]
  1667. content = parsed_data.get("content", "")
  1668. # Decode per-document processing options once; later stages
  1669. # (multimodal hook / KG extraction) re-read them from
  1670. # full_docs as well.
  1671. doc_process_opts = parse_process_options(
  1672. (content_data or {}).get("process_options", "")
  1673. )
  1674. # Resume guard: if content was already extracted under
  1675. # earlier process_options, purge stale chunks + KG before
  1676. # rebuilding.
  1677. await self._purge_stale_extraction_if_resuming(
  1678. doc_id=doc_id,
  1679. status_doc=status_doc,
  1680. file_path=file_path,
  1681. content_data=content_data,
  1682. pipeline_status=ctx.pipeline_status,
  1683. pipeline_status_lock=ctx.pipeline_status_lock,
  1684. )
  1685. # Chunker dispatch is driven by whether ``process_options``
  1686. # explicitly named a chunking strategy:
  1687. # - Explicit selector (F/R/V/P present in the raw
  1688. # options string): dispatch to a chunker that
  1689. # follows the standardized file-chunker contract
  1690. # ``(tokenizer, content, chunk_token_size, *,
  1691. # <strategy kwargs>)``, with kwargs supplied from
  1692. # the per-doc ``chunk_options`` snapshot persisted
  1693. # at enqueue time.
  1694. # - No selector supplied: honor the
  1695. # externally-customizable ``self.chunking_func``
  1696. # with its legacy 6-arg signature so existing
  1697. # callers (typically :meth:`ainsert` for raw text)
  1698. # keep working unchanged. Legacy callers still
  1699. # read parameters from ``chunk_options`` first
  1700. # (per-doc snapshot), with ctx values as fallback
  1701. # for already-enqueued docs predating chunk_options.
  1702. chunk_opts = (content_data or {}).get("chunk_options")
  1703. if not isinstance(chunk_opts, dict) or not chunk_opts:
  1704. # Backwards compatibility: rows enqueued before the
  1705. # chunk_options snapshot was added fall back to a
  1706. # fresh build from current addon_params['chunker'],
  1707. # scoped to the per-doc strategy decoded above so
  1708. # the slim shape stays consistent with newly
  1709. # enqueued rows. F-strategy split args fall back
  1710. # to whatever lives in
  1711. # ``addon_params['chunker']['fixed_token']``;
  1712. # runtime overrides are an ainsert-time concern and
  1713. # don't apply at process time for legacy rows.
  1714. from lightrag.parser.routing import resolve_chunk_options
  1715. chunk_opts = resolve_chunk_options(
  1716. self.addon_params, process_options=doc_process_opts
  1717. )
  1718. resolved_chunk_size = int(
  1719. chunk_opts.get("chunk_token_size") or self.chunk_token_size
  1720. )
  1721. # Captured per-strategy below; persisted to
  1722. # ``doc_status.metadata['chunk_opts']`` via ``extraction_meta``
  1723. # so admin/list APIs can see the actual chunker params used.
  1724. chunk_opts_str: str = ""
  1725. if doc_process_opts.chunking_explicit:
  1726. from lightrag.chunker import (
  1727. chunking_by_fixed_token,
  1728. chunking_by_paragraph_semantic,
  1729. chunking_by_recursive_character,
  1730. chunking_by_semantic_vector,
  1731. )
  1732. strategy = doc_process_opts.chunking
  1733. if strategy == "P":
  1734. # P carries its own ``chunk_token_size`` (CHUNK_P_SIZE
  1735. # env or ``addon_params['chunker']['paragraph_semantic']``);
  1736. # pop it out of the kwargs so we don't pass it
  1737. # both positionally and via ``**`` splat (which
  1738. # would TypeError). Unlike R/V, ``default_chunker_config``
  1739. # always populates this slot — falling back to
  1740. # ``resolved_chunk_size`` (global CHUNK_SIZE) here is
  1741. # only a safety net for snapshots predating that
  1742. # change; new docs always carry ``DEFAULT_CHUNK_P_SIZE``.
  1743. p_opts = dict(chunk_opts.get("paragraph_semantic") or {})
  1744. p_chunk_size = int(
  1745. p_opts.pop("chunk_token_size", resolved_chunk_size)
  1746. )
  1747. p_blocks_path = (
  1748. str(parsed_data.get("blocks_path") or "").strip() or None
  1749. )
  1750. chunk_opts_str = _format_chunking_params(p_chunk_size, p_opts)
  1751. logger.info(f"Chunking P: {chunk_opts_str}, doc_id: {doc_id}")
  1752. chunking_result = chunking_by_paragraph_semantic(
  1753. self.tokenizer,
  1754. content,
  1755. p_chunk_size,
  1756. blocks_path=p_blocks_path,
  1757. **p_opts,
  1758. )
  1759. elif strategy == "R":
  1760. # R carries its own optional ``chunk_token_size``
  1761. # override (CHUNK_R_SIZE env or
  1762. # ``addon_params['chunker']['recursive_character']``);
  1763. # pop it out of the kwargs so we don't pass it
  1764. # both positionally and via ``**`` splat (which
  1765. # would TypeError). Fall back to the shared
  1766. # top-level resolved size when unset.
  1767. r_opts = dict(chunk_opts.get("recursive_character") or {})
  1768. r_chunk_size = int(
  1769. r_opts.pop("chunk_token_size", resolved_chunk_size)
  1770. )
  1771. chunk_opts_str = _format_chunking_params(r_chunk_size, r_opts)
  1772. logger.info(f"Chunking R: {chunk_opts_str}, doc_id: {doc_id}")
  1773. chunking_result = chunking_by_recursive_character(
  1774. self.tokenizer,
  1775. content,
  1776. r_chunk_size,
  1777. **r_opts,
  1778. )
  1779. elif strategy == "V":
  1780. # V carries its own optional ``chunk_token_size``
  1781. # advisory ceiling override (CHUNK_V_SIZE env or
  1782. # ``addon_params['chunker']['semantic_vector']``);
  1783. # same pop-then-splat pattern as P/R.
  1784. v_opts = dict(chunk_opts.get("semantic_vector") or {})
  1785. v_chunk_size = int(
  1786. v_opts.pop("chunk_token_size", resolved_chunk_size)
  1787. )
  1788. chunk_opts_str = _format_chunking_params(v_chunk_size, v_opts)
  1789. logger.info(f"Chunking V: {chunk_opts_str}, doc_id: {doc_id}")
  1790. chunking_result = await chunking_by_semantic_vector(
  1791. self.tokenizer,
  1792. content,
  1793. v_chunk_size,
  1794. embedding_func=self.embedding_func,
  1795. **v_opts,
  1796. )
  1797. else: # "F"
  1798. # F honors its own ``chunk_token_size`` override
  1799. # (``addon_params['chunker']['fixed_token']`` or a
  1800. # caller-supplied ``chunk_options``) exactly like
  1801. # R/V/P: pop it out of the kwargs so we don't pass it
  1802. # both positionally and via ``**`` splat (which would
  1803. # TypeError), falling back to the shared top-level
  1804. # resolved size when unset.
  1805. f_opts = dict(chunk_opts.get("fixed_token") or {})
  1806. f_chunk_size = int(
  1807. f_opts.pop("chunk_token_size", resolved_chunk_size)
  1808. )
  1809. chunk_opts_str = _format_chunking_params(f_chunk_size, f_opts)
  1810. logger.info(f"Chunking F: {chunk_opts_str}, doc_id: {doc_id}")
  1811. chunking_result = chunking_by_fixed_token(
  1812. self.tokenizer,
  1813. content,
  1814. f_chunk_size,
  1815. **f_opts,
  1816. )
  1817. else:
  1818. f_opts = chunk_opts.get("fixed_token") or {}
  1819. # Honor the F-strategy ``chunk_token_size`` override (from
  1820. # ``CHUNK_F_SIZE`` env or an explicit
  1821. # ``addon_params['chunker']['fixed_token']`` / per-doc
  1822. # ``chunk_options``) on this legacy path too, falling back
  1823. # to the shared top-level resolved size when unset. This
  1824. # keeps ``LightRAG.ainsert`` — which intentionally does NOT
  1825. # pass a ``process_options`` selector (so the user's
  1826. # ``chunking_func`` still runs) — consistent with the
  1827. # explicit-F branch instead of silently ignoring
  1828. # ``fixed_token.chunk_token_size``. ``f_opts`` is read
  1829. # field-by-field here (not splatted), so there is no
  1830. # positional/kwarg collision.
  1831. legacy_chunk_size = int(
  1832. f_opts.get("chunk_token_size", resolved_chunk_size)
  1833. )
  1834. chunk_opts_str = _format_chunking_params(
  1835. legacy_chunk_size,
  1836. {
  1837. "split_by_character": f_opts.get("split_by_character"),
  1838. "split_by_character_only": f_opts.get(
  1839. "split_by_character_only", False
  1840. ),
  1841. "overlap": f_opts.get(
  1842. "chunk_overlap_token_size",
  1843. self.chunk_overlap_token_size,
  1844. ),
  1845. },
  1846. )
  1847. logger.info(
  1848. f"Chunking F(legacy): {chunk_opts_str}, doc_id: {doc_id}"
  1849. )
  1850. chunking_result = self.chunking_func(
  1851. self.tokenizer,
  1852. content,
  1853. f_opts.get("split_by_character"),
  1854. f_opts.get("split_by_character_only", False),
  1855. f_opts.get(
  1856. "chunk_overlap_token_size",
  1857. self.chunk_overlap_token_size,
  1858. ),
  1859. legacy_chunk_size,
  1860. )
  1861. if inspect.isawaitable(chunking_result):
  1862. chunking_result = await chunking_result
  1863. if not isinstance(chunking_result, (list, tuple)):
  1864. raise TypeError(
  1865. f"chunking_func must return a list or tuple of dicts, "
  1866. f"got {type(chunking_result)}"
  1867. )
  1868. # Reflect the format actually persisted in full_docs.
  1869. # Previously a structured-parse fallback always tagged
  1870. # parse_format=raw, which silently mislabelled lightrag docs;
  1871. # _build_mm_chunks_from_sidecars below gates on the persisted
  1872. # format via the sidecar presence check, so the tag must
  1873. # reflect what was actually stored.
  1874. persisted_format = (
  1875. content_data.get("parse_format")
  1876. if isinstance(content_data, dict)
  1877. else FULL_DOCS_FORMAT_RAW
  1878. ) or FULL_DOCS_FORMAT_RAW
  1879. persisted_engine = (
  1880. content_data.get("parse_engine")
  1881. if isinstance(content_data, dict)
  1882. else None
  1883. )
  1884. extraction_meta = {
  1885. "parse_format": persisted_format,
  1886. "parse_engine": persisted_engine
  1887. or (
  1888. "native"
  1889. if persisted_format == FULL_DOCS_FORMAT_LIGHTRAG
  1890. else "legacy"
  1891. ),
  1892. "chunking_method": (
  1893. # Explicit selector in process_options: reflect
  1894. # the dispatched strategy. ``fixed_token_fallback``
  1895. # is preserved as a defensive label in case a
  1896. # future selector char slips past the validator.
  1897. _CHUNKING_METHOD_LABELS.get(
  1898. doc_process_opts.chunking, "fixed_token_fallback"
  1899. )
  1900. if doc_process_opts.chunking_explicit
  1901. # No selector: chunking_func was invoked, which
  1902. # defaults to chunking_by_token_size but may be
  1903. # customized by the caller.
  1904. else "legacy_chunking_func"
  1905. ),
  1906. # Mirrors the chunking start log line (params portion only,
  1907. # without the strategy prefix or file path) so admins can
  1908. # see the actual chunker params used. Carried across
  1909. # transitions via ``_DOC_STATUS_METADATA_CARRY_OVER_KEYS``.
  1910. "chunk_opts": chunk_opts_str,
  1911. }
  1912. blocks_path = str(parsed_data.get("blocks_path") or "").strip()
  1913. if blocks_path:
  1914. max_order = -1
  1915. for ch in chunking_result:
  1916. if isinstance(ch, dict) and isinstance(
  1917. ch.get("chunk_order_index"), int
  1918. ):
  1919. max_order = max(max_order, int(ch["chunk_order_index"]))
  1920. # Default to "" (no modalities) when full_docs has no
  1921. # ``process_options`` key for this doc: a reinsert that
  1922. # omits i/t/e must NOT re-index stale successful sidecars
  1923. # left over from an earlier multimodal run. The builder's
  1924. # None branch is reserved for ad-hoc callers (unit tests)
  1925. # that intentionally want every modality considered.
  1926. mm_chunks = self._build_mm_chunks_from_sidecars(
  1927. doc_id=doc_id,
  1928. file_path=file_path,
  1929. blocks_path=blocks_path,
  1930. base_order_index=max_order + 1,
  1931. process_options=(content_data or {}).get("process_options")
  1932. or "",
  1933. )
  1934. if mm_chunks:
  1935. chunking_result = list(chunking_result) + mm_chunks
  1936. extraction_meta["mm_chunks"] = len(mm_chunks)
  1937. # Final hard guard before embedding: split any oversize
  1938. # chunk while preserving heading hierarchy metadata.
  1939. if (
  1940. self.embedding_token_limit is not None
  1941. and self.embedding_token_limit > 0
  1942. ):
  1943. original_chunk_count = len(chunking_result)
  1944. chunking_result = enforce_chunk_token_limit_before_embedding(
  1945. chunking_result=chunking_result,
  1946. tokenizer=self.tokenizer,
  1947. max_tokens=self.embedding_token_limit,
  1948. )
  1949. if len(chunking_result) != original_chunk_count:
  1950. logger.info(
  1951. "Applied hard fallback split before embedding for "
  1952. f"d-id: {doc_id}, chunks {original_chunk_count} -> {len(chunking_result)} "
  1953. f"(limit={self.embedding_token_limit})"
  1954. )
  1955. # Compact "pre -> post" summary mirrors the log
  1956. # middle segment. Field is only present when a
  1957. # hard split actually occurred, so its presence
  1958. # alone signals the trigger.
  1959. extraction_meta["hard_fallback_split"] = (
  1960. f"{original_chunk_count} -> {len(chunking_result)}"
  1961. )
  1962. chunks = build_chunks_dict_from_chunking_result(
  1963. chunking_result, doc_id=doc_id, file_path=file_path
  1964. )
  1965. if not chunks:
  1966. logger.warning("No document chunks to process")
  1967. processing_start_time = int(time.time())
  1968. await self._raise_if_cancelled(
  1969. ctx.pipeline_status, ctx.pipeline_status_lock
  1970. )
  1971. # Stage 1: persist doc_status PROCESSING + chunks in parallel.
  1972. doc_status_task = asyncio.create_task(
  1973. self._upsert_doc_status_transition(
  1974. doc_id=doc_id,
  1975. status=DocStatus.PROCESSING,
  1976. status_doc=status_doc,
  1977. file_path=file_path,
  1978. extra_fields={
  1979. "chunks_count": len(chunks),
  1980. "chunks_list": list(chunks.keys()),
  1981. },
  1982. metadata_extra={
  1983. "processing_start_time": processing_start_time,
  1984. **extraction_meta,
  1985. },
  1986. )
  1987. )
  1988. chunks_vdb_task = asyncio.create_task(self.chunks_vdb.upsert(chunks))
  1989. text_chunks_task = asyncio.create_task(self.text_chunks.upsert(chunks))
  1990. first_stage_tasks = [
  1991. doc_status_task,
  1992. chunks_vdb_task,
  1993. text_chunks_task,
  1994. ]
  1995. entity_relation_task = None
  1996. await asyncio.gather(*first_stage_tasks)
  1997. # Stage 2: entity/relation extraction (after text_chunks are
  1998. # saved). When the user opted out via process_options '!',
  1999. # skip extraction entirely; chunks remain in the vector
  2000. # store so naive / mix retrieval still works.
  2001. if doc_process_opts.skip_kg:
  2002. logger.info(
  2003. f"[skip_kg] process_options '!' set for d-id: {doc_id}; "
  2004. f"skipping entity/relation extraction"
  2005. )
  2006. chunk_results = []
  2007. extraction_meta["skip_kg"] = True
  2008. else:
  2009. entity_relation_task = asyncio.create_task(
  2010. self._process_extract_entities(
  2011. chunks,
  2012. ctx.pipeline_status,
  2013. ctx.pipeline_status_lock,
  2014. )
  2015. )
  2016. chunk_results = await entity_relation_task
  2017. file_extraction_stage_ok = True
  2018. except Exception as e:
  2019. pending_tasks = first_stage_tasks + (
  2020. [entity_relation_task] if entity_relation_task else []
  2021. )
  2022. await self._finalize_doc_failure(
  2023. doc_id=doc_id,
  2024. status_doc=status_doc,
  2025. file_path=file_path,
  2026. error=e,
  2027. stage_label="extract",
  2028. current_file_number=current_file_number,
  2029. total_files=ctx.total_files,
  2030. failed_chunks_snapshot=get_failed_chunk_snapshot(),
  2031. pending_tasks=pending_tasks,
  2032. metadata_extra={
  2033. "processing_start_time": processing_start_time,
  2034. "processing_end_time": int(time.time()),
  2035. },
  2036. pipeline_status=ctx.pipeline_status,
  2037. pipeline_status_lock=ctx.pipeline_status_lock,
  2038. )
  2039. # Concurrency is controlled by keyed lock for individual
  2040. # entities and relationships.
  2041. if file_extraction_stage_ok:
  2042. try:
  2043. await self._raise_if_cancelled(
  2044. ctx.pipeline_status, ctx.pipeline_status_lock
  2045. )
  2046. # Use chunk_results from entity_relation_task. When
  2047. # skip_kg is set, chunk_results is empty so there are no
  2048. # nodes/edges to merge — but we still need to flush the
  2049. # chunks_vdb / text_chunks writes (already done above)
  2050. # and reach PROCESSED.
  2051. if not doc_process_opts.skip_kg:
  2052. await merge_nodes_and_edges(
  2053. chunk_results=chunk_results,
  2054. knowledge_graph_inst=self.chunk_entity_relation_graph,
  2055. entity_vdb=self.entities_vdb,
  2056. relationships_vdb=self.relationships_vdb,
  2057. global_config=self._build_global_config(),
  2058. full_entities_storage=self.full_entities,
  2059. full_relations_storage=self.full_relations,
  2060. doc_id=doc_id,
  2061. pipeline_status=ctx.pipeline_status,
  2062. pipeline_status_lock=ctx.pipeline_status_lock,
  2063. llm_response_cache=self.llm_response_cache,
  2064. entity_chunks_storage=self.entity_chunks,
  2065. relation_chunks_storage=self.relation_chunks,
  2066. current_file_number=current_file_number,
  2067. total_files=ctx.total_files,
  2068. file_path=file_path,
  2069. )
  2070. processing_end_time = int(time.time())
  2071. await self._upsert_doc_status_transition(
  2072. doc_id=doc_id,
  2073. status=DocStatus.PROCESSED,
  2074. status_doc=status_doc,
  2075. file_path=file_path,
  2076. extra_fields={
  2077. "chunks_count": len(chunks),
  2078. "chunks_list": list(chunks.keys()),
  2079. },
  2080. metadata_extra={
  2081. "processing_start_time": processing_start_time,
  2082. "processing_end_time": processing_end_time,
  2083. **extraction_meta,
  2084. },
  2085. )
  2086. await self._insert_done()
  2087. async with ctx.pipeline_status_lock:
  2088. log_message = (
  2089. f"Completed processing file "
  2090. f"{current_file_number}/{ctx.total_files}: "
  2091. f"{file_path}"
  2092. )
  2093. logger.info(log_message)
  2094. ctx.pipeline_status["latest_message"] = log_message
  2095. ctx.pipeline_status["history_messages"].append(log_message)
  2096. except Exception as e:
  2097. await self._finalize_doc_failure(
  2098. doc_id=doc_id,
  2099. status_doc=status_doc,
  2100. file_path=file_path,
  2101. error=e,
  2102. stage_label="merge",
  2103. current_file_number=current_file_number,
  2104. total_files=ctx.total_files,
  2105. failed_chunks_snapshot=get_failed_chunk_snapshot(),
  2106. pending_tasks=[],
  2107. metadata_extra={
  2108. "processing_start_time": processing_start_time,
  2109. "processing_end_time": int(time.time()),
  2110. **extraction_meta,
  2111. },
  2112. pipeline_status=ctx.pipeline_status,
  2113. pipeline_status_lock=ctx.pipeline_status_lock,
  2114. )
  2115. async def _purge_stale_extraction_if_resuming(
  2116. self,
  2117. *,
  2118. doc_id: str,
  2119. status_doc: DocProcessingStatus,
  2120. file_path: str,
  2121. content_data: dict[str, Any] | None,
  2122. pipeline_status: dict,
  2123. pipeline_status_lock,
  2124. ) -> None:
  2125. """If the document already has extracted content, purge stale chunks
  2126. and KG contributions before re-running chunking + entity extraction
  2127. under the current ``process_options``.
  2128. Mutates ``status_doc.chunks_list`` / ``chunks_count`` to reflect the
  2129. purge so subsequent state-machine upserts don't write back stale IDs.
  2130. Also emits an engine-mismatch warning when the filename hint disagrees
  2131. with the stored ``parse_engine`` — the extracted content is the source
  2132. of truth, so the user must delete + re-upload to switch engines.
  2133. """
  2134. content_already_extracted = isinstance(content_data, dict) and (
  2135. (
  2136. content_data.get("parse_format") == FULL_DOCS_FORMAT_LIGHTRAG
  2137. and content_data.get("sidecar_location")
  2138. )
  2139. or (
  2140. content_data.get("parse_format") == FULL_DOCS_FORMAT_RAW
  2141. and (content_data.get("content") or "").strip()
  2142. )
  2143. )
  2144. if not content_already_extracted:
  2145. return
  2146. intended_engine, _ = resolve_file_parser_directives(file_path)
  2147. stored_engine = (content_data.get("parse_engine") or "").lower()
  2148. if intended_engine and stored_engine and intended_engine != stored_engine:
  2149. log_message = (
  2150. f"[resume] {doc_id}: filename hint / "
  2151. f"LIGHTRAG_PARSER implies engine="
  2152. f"{intended_engine!r} but full_docs "
  2153. f"already has parse_engine="
  2154. f"{stored_engine!r}; keeping the existing "
  2155. f"extraction. Delete + re-upload to "
  2156. f"switch engines."
  2157. )
  2158. logger.warning(log_message)
  2159. async with pipeline_status_lock:
  2160. pipeline_status["latest_message"] = log_message
  2161. pipeline_status["history_messages"].append(log_message)
  2162. stored_chunk_ids = {
  2163. chunk_id
  2164. for chunk_id in (status_doc.chunks_list or [])
  2165. if isinstance(chunk_id, str) and chunk_id
  2166. }
  2167. if not stored_chunk_ids:
  2168. return
  2169. log_message = (
  2170. f"[resume] {doc_id}: purging "
  2171. f"{len(stored_chunk_ids)} chunk(s) and "
  2172. f"associated KG entries from a previous run "
  2173. f"before rebuilding under current "
  2174. f"process_options"
  2175. )
  2176. logger.info(log_message)
  2177. async with pipeline_status_lock:
  2178. pipeline_status["latest_message"] = log_message
  2179. pipeline_status["history_messages"].append(log_message)
  2180. await self._purge_doc_chunks_and_kg(
  2181. doc_id,
  2182. stored_chunk_ids,
  2183. pipeline_status=pipeline_status,
  2184. pipeline_status_lock=pipeline_status_lock,
  2185. )
  2186. # The status_doc carries chunks_list / chunks_count from the prior
  2187. # run; clear them so subsequent state-machine upserts don't write
  2188. # back stale IDs.
  2189. status_doc.chunks_list = []
  2190. status_doc.chunks_count = 0
  2191. # ============================================================
  2192. # doc_status state-machine helpers (shared by all layers)
  2193. # ============================================================
  2194. async def _upsert_doc_status_transition(
  2195. self,
  2196. doc_id: str,
  2197. status: DocStatus,
  2198. status_doc: DocProcessingStatus,
  2199. file_path: str,
  2200. *,
  2201. extra_fields: dict[str, Any] | None = None,
  2202. metadata_extra: dict[str, Any] | None = None,
  2203. ) -> None:
  2204. """Single source of truth for doc_status state-transition upserts.
  2205. Mirrors the field set used at every PARSING / ANALYZING / PROCESSING /
  2206. PROCESSED / FAILED transition. ``extra_fields`` carries
  2207. ``chunks_count`` / ``chunks_list`` / ``error_msg``; ``metadata_extra``
  2208. is forwarded to ``doc_status_transition_metadata`` so carry-over
  2209. fields (e.g. ``process_options``) survive every state change.
  2210. """
  2211. payload: dict[str, Any] = {
  2212. "status": status,
  2213. "content_summary": status_doc.content_summary,
  2214. "content_length": status_doc.content_length,
  2215. "created_at": status_doc.created_at,
  2216. "updated_at": datetime.now(timezone.utc).isoformat(),
  2217. "file_path": file_path,
  2218. "track_id": status_doc.track_id,
  2219. "content_hash": status_doc.content_hash,
  2220. "metadata": doc_status_transition_metadata(
  2221. status_doc, extra=metadata_extra
  2222. ),
  2223. }
  2224. if extra_fields:
  2225. payload.update(extra_fields)
  2226. await self.doc_status.upsert({doc_id: payload})
  2227. async def _raise_if_cancelled(
  2228. self,
  2229. pipeline_status: dict,
  2230. pipeline_status_lock,
  2231. ) -> None:
  2232. """Raise ``PipelineCancelledException`` if the user has requested cancel."""
  2233. async with pipeline_status_lock:
  2234. if pipeline_status.get("cancellation_requested", False):
  2235. raise PipelineCancelledException("User cancelled")
  2236. async def _cancellation_requested(
  2237. self,
  2238. pipeline_status: dict,
  2239. pipeline_status_lock,
  2240. ) -> bool:
  2241. """Read-only cancellation check.
  2242. Use this when a worker wants to branch on the flag (e.g. drain a queue
  2243. item) instead of raising. Callers that prefer the exception style
  2244. should use :meth:`_raise_if_cancelled` instead.
  2245. """
  2246. async with pipeline_status_lock:
  2247. return bool(pipeline_status.get("cancellation_requested", False))
  2248. async def _mark_doc_cancelled_in_stage(
  2249. self,
  2250. *,
  2251. doc_id: str,
  2252. status_doc: DocProcessingStatus,
  2253. file_path: str,
  2254. stage_label: str,
  2255. pipeline_status: dict,
  2256. pipeline_status_lock,
  2257. ) -> None:
  2258. """Mark a queued document FAILED with a 'User cancelled' message.
  2259. Used by the PARSE and ANALYZE workers, which do not have the
  2260. chunks-snapshot / pending-tasks bookkeeping that
  2261. :meth:`_finalize_doc_failure` carries for the PROCESS stage. Also
  2262. flushes the LLM response cache so any cache_ids written by completed
  2263. sibling tasks (e.g. successful multimodal items inside a doc that is
  2264. being cancelled) survive a server restart.
  2265. """
  2266. error_msg = f"User cancelled during {stage_label}: {file_path}"
  2267. logger.warning(error_msg)
  2268. async with pipeline_status_lock:
  2269. pipeline_status["latest_message"] = error_msg
  2270. pipeline_status["history_messages"].append(error_msg)
  2271. if self.llm_response_cache:
  2272. try:
  2273. await self.llm_response_cache.index_done_callback()
  2274. except Exception as persist_error:
  2275. logger.error(f"Failed to persist LLM cache: {persist_error}")
  2276. try:
  2277. await self._upsert_doc_status_transition(
  2278. doc_id=doc_id,
  2279. status=DocStatus.FAILED,
  2280. status_doc=status_doc,
  2281. file_path=file_path,
  2282. extra_fields={"error_msg": error_msg},
  2283. )
  2284. except Exception as exc:
  2285. logger.error(f"Failed to mark cancelled doc {doc_id} as FAILED: {exc}")
  2286. async def _finalize_doc_failure(
  2287. self,
  2288. *,
  2289. doc_id: str,
  2290. status_doc: DocProcessingStatus,
  2291. file_path: str,
  2292. error: BaseException,
  2293. stage_label: str,
  2294. current_file_number: int,
  2295. total_files: int,
  2296. failed_chunks_snapshot: tuple[list[str], int],
  2297. pending_tasks: list[asyncio.Task],
  2298. metadata_extra: dict[str, Any],
  2299. pipeline_status: dict,
  2300. pipeline_status_lock,
  2301. ) -> None:
  2302. """Common epilogue for an extract / merge stage failure.
  2303. Logs the error (or cancellation), cancels any pending stage tasks,
  2304. flushes the LLM response cache, and writes a FAILED status row that
  2305. preserves the failed chunks snapshot and processing-time metadata.
  2306. """
  2307. if isinstance(error, PipelineCancelledException):
  2308. if stage_label == "merge":
  2309. error_msg = (
  2310. f"User cancelled during merge {current_file_number}/"
  2311. f"{total_files}: {file_path}"
  2312. )
  2313. else:
  2314. error_msg = (
  2315. f"User cancelled {current_file_number}/{total_files}: {file_path}"
  2316. )
  2317. logger.warning(error_msg)
  2318. async with pipeline_status_lock:
  2319. pipeline_status["latest_message"] = error_msg
  2320. pipeline_status["history_messages"].append(error_msg)
  2321. else:
  2322. logger.error(traceback.format_exc())
  2323. if stage_label == "merge":
  2324. error_msg = (
  2325. f"Merging stage failed in document "
  2326. f"{current_file_number}/{total_files}: {file_path}"
  2327. )
  2328. else:
  2329. error_msg = (
  2330. f"Failed to extract document "
  2331. f"{current_file_number}/{total_files}: {file_path}"
  2332. )
  2333. logger.error(error_msg)
  2334. async with pipeline_status_lock:
  2335. pipeline_status["latest_message"] = error_msg
  2336. pipeline_status["history_messages"].append(traceback.format_exc())
  2337. pipeline_status["history_messages"].append(error_msg)
  2338. for task in pending_tasks:
  2339. if task and not task.done():
  2340. task.cancel()
  2341. if self.llm_response_cache:
  2342. try:
  2343. await self.llm_response_cache.index_done_callback()
  2344. except Exception as persist_error:
  2345. logger.error(f"Failed to persist LLM cache: {persist_error}")
  2346. failed_chunks_list, failed_chunks_count = failed_chunks_snapshot
  2347. await self._upsert_doc_status_transition(
  2348. doc_id=doc_id,
  2349. status=DocStatus.FAILED,
  2350. status_doc=status_doc,
  2351. file_path=file_path,
  2352. extra_fields={
  2353. "error_msg": str(error),
  2354. "chunks_count": failed_chunks_count,
  2355. "chunks_list": failed_chunks_list,
  2356. },
  2357. metadata_extra=metadata_extra,
  2358. )
  2359. # ============================================================
  2360. # Parser engines (also called by tests directly)
  2361. # ============================================================
  2362. async def parse_native(
  2363. self, doc_id: str, file_path: str, content_data: dict[str, Any]
  2364. ) -> dict[str, Any]:
  2365. """Phase 1 parse for native/raw, lightrag and pending_parse formats."""
  2366. doc_format = content_data.get("parse_format", FULL_DOCS_FORMAT_RAW)
  2367. if doc_format == FULL_DOCS_FORMAT_LIGHTRAG:
  2368. # full_docs.content carries the merged text with the {{LRdoc}}
  2369. # marker; strip it so the chunking path is identical to raw.
  2370. # blocks_path is still resolved for downstream multimodal
  2371. # sidecar reads (_build_mm_chunks_from_sidecars).
  2372. # No re-parse happens here — content + sidecar are reused from a
  2373. # prior parse, so this is semantically a cache-hit and mirrors
  2374. # the parse_mineru / parse_docling raw-bundle skip path by
  2375. # setting ``parse_stage_skipped``.
  2376. merged_text = strip_lightrag_doc_prefix(
  2377. content_data.get("content"), doc_format
  2378. )
  2379. blocks_path = (
  2380. sidecar_blocks_path(content_data.get("sidecar_location")) or ""
  2381. )
  2382. return {
  2383. "doc_id": doc_id,
  2384. "file_path": file_path,
  2385. "parse_format": doc_format,
  2386. "content": merged_text,
  2387. "blocks_path": blocks_path,
  2388. "parse_stage_skipped": True,
  2389. }
  2390. if doc_format == FULL_DOCS_FORMAT_PENDING_PARSE:
  2391. source_path = _call_source_file_resolver(
  2392. self,
  2393. file_path,
  2394. source_file_name=content_data.get("source_file_name"),
  2395. parser_engine=PARSER_ENGINE_NATIVE,
  2396. )
  2397. p = Path(source_path)
  2398. if not (p.exists() and p.is_file() and p.suffix.lower() == ".docx"):
  2399. raise ValueError(
  2400. f"Native parser does not support pending file: {file_path}"
  2401. )
  2402. # Lazy imports keep this module import-cheap and avoid pulling
  2403. # the docx parser into call paths that never touch the native
  2404. # engine (mirrors parse_mineru).
  2405. from lightrag.parser.docx.drawing_image_extractor import (
  2406. DrawingExtractionContext,
  2407. load_relationships,
  2408. )
  2409. from lightrag.parser.docx.parse_document import (
  2410. extract_docx_blocks,
  2411. )
  2412. from lightrag.parser.docx.ir_builder import NativeDocxIRBuilder
  2413. from lightrag.sidecar import write_sidecar
  2414. # ``file_path`` is canonical at the worker layer; canonicalize
  2415. # again defensively so direct callers (tests, CLI) may pass
  2416. # absolute paths or hint-bearing names.
  2417. document_name = normalize_document_file_path(file_path)
  2418. if document_name == "unknown_source":
  2419. document_name = p.name or f"{doc_id}.bin"
  2420. base_name = Path(document_name).stem or document_name
  2421. parsed_dir = parsed_artifact_dir_for(document_name, parent_hint=p.parent)
  2422. asset_dir = parsed_dir / f"{base_name}.blocks.assets"
  2423. def _extract_blocks_sync() -> (
  2424. tuple[list[dict[str, Any]], dict[str, Any], dict[str, Any]]
  2425. ):
  2426. # Pre-clean parsed_dir and pre-create the asset dir so the
  2427. # drawing extractor can write image bytes BEFORE write_sidecar
  2428. # runs (which is then called with clean_parsed_dir=False to
  2429. # keep those bytes). ``parsed_artifact_dir_for`` returns
  2430. # a unique dir per source (with ``_001``/``_002`` suffixes on
  2431. # collision), so the rmtree here only ever clobbers stale
  2432. # artifacts from a previous attempt at the same doc_id.
  2433. if parsed_dir.exists():
  2434. shutil.rmtree(parsed_dir)
  2435. parsed_dir.mkdir(parents=True, exist_ok=True)
  2436. asset_dir.mkdir(parents=True, exist_ok=True)
  2437. ctx = DrawingExtractionContext(
  2438. docx_path=p,
  2439. blocks_output_path=parsed_dir / f"{base_name}.blocks.jsonl",
  2440. export_dir_name=asset_dir.name,
  2441. export_dir_path=asset_dir,
  2442. )
  2443. load_relationships(ctx)
  2444. warnings: dict[str, Any] = {}
  2445. metadata: dict[str, Any] = {}
  2446. extracted = extract_docx_blocks(
  2447. str(p),
  2448. debug=False,
  2449. fixlevel=0,
  2450. drawing_context=ctx,
  2451. parse_warnings=warnings,
  2452. parse_metadata=metadata,
  2453. )
  2454. return extracted, warnings, metadata
  2455. try:
  2456. blocks, parse_warnings, parse_metadata = await asyncio.to_thread(
  2457. _extract_blocks_sync
  2458. )
  2459. except BaseException:
  2460. # ``_extract_blocks_sync`` pre-creates ``parsed_dir`` and
  2461. # ``asset_dir`` before invoking the extractor; if extraction
  2462. # raises, those (possibly partially-populated) dirs would be
  2463. # left on disk. Roll them back so the next attempt starts clean.
  2464. if parsed_dir.exists():
  2465. shutil.rmtree(parsed_dir, ignore_errors=True)
  2466. raise
  2467. if not blocks:
  2468. # Same cleanup path for the "extractor returned []" case —
  2469. # ``write_sidecar`` would never run, so without this the
  2470. # pre-created (empty) dirs would persist.
  2471. if parsed_dir.exists():
  2472. shutil.rmtree(parsed_dir, ignore_errors=True)
  2473. raise ValueError(f"DOCX parser returned empty content for {file_path}")
  2474. missing_paraid_count = int(
  2475. parse_warnings.get("missing_paraid_count", 0) or 0
  2476. )
  2477. if missing_paraid_count > 0:
  2478. # Surface once per document — the parser may encounter many
  2479. # missing paraIds (legacy / non-Word authors omit
  2480. # ``w14:paraId``), but a single warning with the count is
  2481. # enough. Affected blocks emit
  2482. # ``positions: [{"type": "paraid", "range": null}]``.
  2483. logger.warning(
  2484. "[parse_native] %s: %d paragraphs lack paraId; "
  2485. "Re-saving file in Word 2013+ to regenerate ids.",
  2486. p.name,
  2487. missing_paraid_count,
  2488. )
  2489. ir = NativeDocxIRBuilder().normalize(
  2490. blocks,
  2491. document_name=document_name,
  2492. asset_dir_name=asset_dir.name,
  2493. parse_metadata=parse_metadata,
  2494. )
  2495. parsed_data = write_sidecar(
  2496. ir,
  2497. parsed_dir=parsed_dir,
  2498. doc_id=doc_id,
  2499. engine=PARSER_ENGINE_NATIVE,
  2500. clean_parsed_dir=False, # we pre-populated the asset dir
  2501. block_drawing_path_style="basename_only", # legacy native shape
  2502. )
  2503. await self._persist_parsed_full_docs(
  2504. doc_id,
  2505. {
  2506. "content": make_lightrag_doc_content(parsed_data["content"]),
  2507. "file_path": file_path,
  2508. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  2509. "sidecar_location": sidecar_uri_for(parsed_dir),
  2510. "parse_engine": PARSER_ENGINE_NATIVE,
  2511. "update_time": int(time.time()),
  2512. },
  2513. )
  2514. await archive_docx_source_after_full_docs_sync(str(p))
  2515. logger.info(
  2516. f"[parse_native] pending_parse completed for {file_path} "
  2517. f"via parser/docx"
  2518. )
  2519. result: dict[str, Any] = {
  2520. "doc_id": doc_id,
  2521. "file_path": file_path,
  2522. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  2523. "content": parsed_data["content"],
  2524. "blocks_path": parsed_data["blocks_path"],
  2525. }
  2526. if missing_paraid_count > 0:
  2527. # Pipeline reads this from the parsed_data dict and writes it
  2528. # to ``doc_status.metadata.parse_warnings`` so admin/list APIs
  2529. # can surface the issue alongside the document record.
  2530. result["parse_warnings"] = {
  2531. "missing_paraid_count": missing_paraid_count
  2532. }
  2533. return result
  2534. # FULL_DOCS_FORMAT_RAW: no parser ran — the content was supplied
  2535. # at insert time and we pass it through verbatim. Mark as skipped
  2536. # so post-mortem doesn't credit the worker with a synthetic parse
  2537. # duration (mirrors the LIGHTRAG-format branch above).
  2538. return {
  2539. "doc_id": doc_id,
  2540. "file_path": file_path,
  2541. "parse_format": FULL_DOCS_FORMAT_RAW,
  2542. "content": content_data.get("content", ""),
  2543. "blocks_path": "",
  2544. "parse_stage_skipped": True,
  2545. }
  2546. async def parse_mineru(
  2547. self, doc_id: str, file_path: str, content_data: dict[str, Any]
  2548. ) -> dict[str, Any]:
  2549. """Parse a document through MinerU and emit a spec-compliant sidecar.
  2550. Layout produced under ``inputs/<space>/__parsed__/``:
  2551. - ``<base>.parsed/`` — sidecar (blocks.jsonl + per-modality JSONs + assets)
  2552. - ``<base>.mineru_raw/`` — preserved MinerU bundle (content_list.json,
  2553. full.md, middle.json, images/, ...) plus ``_manifest.json``
  2554. The raw bundle is kept on disk so subsequent re-parses with the same
  2555. source content can skip the upload+poll+download round trip. It is
  2556. cleaned only when the user explicitly deletes the document with the
  2557. "also delete original file" option; see
  2558. :func:`lightrag.api.routers.document_routes.delete_file_variants_by_file_path`.
  2559. """
  2560. # Lazy imports keep this module import-cheap and avoid pulling httpx
  2561. # into call paths that never touch the MinerU engine.
  2562. from lightrag.parser.external.mineru import (
  2563. MinerUIRBuilder,
  2564. MinerURawClient,
  2565. clear_dir_contents,
  2566. is_bundle_valid,
  2567. raw_dir_for_parsed_dir,
  2568. )
  2569. from lightrag.sidecar import write_sidecar
  2570. source_file_path = Path(
  2571. _call_source_file_resolver(
  2572. self,
  2573. file_path,
  2574. source_file_name=content_data.get("source_file_name"),
  2575. parser_engine=PARSER_ENGINE_MINERU,
  2576. )
  2577. )
  2578. if not source_file_path.is_file():
  2579. raise FileNotFoundError(f"MinerU source file not found: {source_file_path}")
  2580. # Canonicalize defensively so direct callers (tests, CLI) may pass
  2581. # absolute paths or hint-bearing names.
  2582. document_name = normalize_document_file_path(file_path)
  2583. if document_name == "unknown_source":
  2584. document_name = source_file_path.name or f"{doc_id}.bin"
  2585. parsed_dir = parsed_artifact_dir_for(
  2586. document_name, parent_hint=source_file_path.parent
  2587. )
  2588. raw_dir = raw_dir_for_parsed_dir(parsed_dir)
  2589. force_reparse = os.getenv("LIGHTRAG_FORCE_REPARSE_MINERU", "").lower() in {
  2590. "1",
  2591. "true",
  2592. "yes",
  2593. "on",
  2594. }
  2595. parse_stage_skipped = False
  2596. if not force_reparse and is_bundle_valid(raw_dir, source_file_path):
  2597. # Cache hit: keep the path purely local so a re-parse still
  2598. # succeeds if MinerU credentials/endpoint are temporarily
  2599. # unavailable (key rotation, debugging, etc.). Network config
  2600. # is only required on cache miss below.
  2601. parse_stage_skipped = True
  2602. logger.info("[parse_mineru] raw cache hit doc_id=%s", doc_id)
  2603. else:
  2604. if force_reparse and raw_dir.exists():
  2605. logger.info(
  2606. "[parse_mineru] LIGHTRAG_FORCE_REPARSE_MINERU set; "
  2607. "discarding bundle at %s",
  2608. raw_dir,
  2609. )
  2610. raw_dir.mkdir(parents=True, exist_ok=True)
  2611. clear_dir_contents(raw_dir)
  2612. client = MinerURawClient()
  2613. logger.info(
  2614. "[MinerU] Parsing %s %s (may take a few minutes)",
  2615. doc_id,
  2616. source_file_path.name,
  2617. )
  2618. await client.download_into(
  2619. raw_dir,
  2620. source_file_path,
  2621. upload_name=document_name,
  2622. )
  2623. ir_builder = MinerUIRBuilder()
  2624. ir = ir_builder.normalize_from_workdir(raw_dir, document_name=document_name)
  2625. parsed_data = write_sidecar(
  2626. ir,
  2627. parsed_dir=parsed_dir,
  2628. doc_id=doc_id,
  2629. engine=PARSER_ENGINE_MINERU,
  2630. )
  2631. # Keep full_docs in sync so restart/reprocess can directly use the
  2632. # sidecar (matches the native_docx and content_list paths).
  2633. await self._persist_parsed_full_docs(
  2634. doc_id,
  2635. {
  2636. "content": make_lightrag_doc_content(parsed_data["content"]),
  2637. "file_path": file_path,
  2638. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  2639. "sidecar_location": sidecar_uri_for(parsed_dir),
  2640. "parse_engine": PARSER_ENGINE_MINERU,
  2641. "update_time": int(time.time()),
  2642. },
  2643. )
  2644. await archive_docx_source_after_full_docs_sync(str(source_file_path))
  2645. return {
  2646. "doc_id": doc_id,
  2647. "file_path": file_path,
  2648. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  2649. "content": parsed_data["content"],
  2650. "blocks_path": parsed_data["blocks_path"],
  2651. "parse_stage_skipped": parse_stage_skipped,
  2652. }
  2653. async def parse_docling(
  2654. self, doc_id: str, file_path: str, content_data: dict[str, Any]
  2655. ) -> dict[str, Any]:
  2656. """Parse a document through Docling Serve and emit a spec-compliant sidecar.
  2657. Produces the same dual-directory layout as ``parse_mineru``:
  2658. - ``<base>.parsed/`` — sidecar (blocks.jsonl + per-modality JSONs + assets)
  2659. - ``<base>.docling_raw/`` — preserved Docling bundle (``<stem>.json``,
  2660. ``<stem>.md``, ``artifacts/``) plus ``_manifest.json``
  2661. The raw bundle is kept so subsequent re-parses with the same source
  2662. bytes skip the upload + poll + download round trip.
  2663. """
  2664. # Lazy imports keep this module import-cheap and avoid pulling httpx
  2665. # into call paths that never touch the Docling engine.
  2666. from lightrag.parser.external.docling import (
  2667. DoclingIRBuilder,
  2668. DoclingRawClient,
  2669. clear_dir_contents,
  2670. is_bundle_valid,
  2671. raw_dir_for_parsed_dir,
  2672. )
  2673. from lightrag.sidecar import write_sidecar
  2674. source_file_path = Path(
  2675. _call_source_file_resolver(
  2676. self,
  2677. file_path,
  2678. source_file_name=content_data.get("source_file_name"),
  2679. parser_engine=PARSER_ENGINE_DOCLING,
  2680. )
  2681. )
  2682. if not source_file_path.is_file():
  2683. raise FileNotFoundError(
  2684. f"Docling source file not found: {source_file_path}"
  2685. )
  2686. document_name = normalize_document_file_path(file_path)
  2687. if document_name == "unknown_source":
  2688. document_name = source_file_path.name or f"{doc_id}.bin"
  2689. parsed_dir = parsed_artifact_dir_for(
  2690. document_name, parent_hint=source_file_path.parent
  2691. )
  2692. raw_dir = raw_dir_for_parsed_dir(parsed_dir)
  2693. force_reparse = os.getenv("LIGHTRAG_FORCE_REPARSE_DOCLING", "").lower() in {
  2694. "1",
  2695. "true",
  2696. "yes",
  2697. "on",
  2698. }
  2699. parse_stage_skipped = False
  2700. if not force_reparse and is_bundle_valid(raw_dir, source_file_path):
  2701. # Cache hit: keep purely local so re-parses still work when the
  2702. # docling-serve endpoint is temporarily unavailable.
  2703. parse_stage_skipped = True
  2704. logger.info("[parse_docling] raw cache hit doc_id=%s", doc_id)
  2705. else:
  2706. if force_reparse and raw_dir.exists():
  2707. logger.info(
  2708. "[parse_docling] LIGHTRAG_FORCE_REPARSE_DOCLING set; "
  2709. "discarding bundle at %s",
  2710. raw_dir,
  2711. )
  2712. # ``download_into`` mkdir's the raw_dir itself; we only need to
  2713. # wipe the existing contents (manifest + stale bundle files).
  2714. clear_dir_contents(raw_dir)
  2715. client = DoclingRawClient()
  2716. logger.info(
  2717. "[Docling] Parsing %s %s (may take a few minutes)",
  2718. doc_id,
  2719. source_file_path.name,
  2720. )
  2721. # Pass the canonical (hint-stripped) name so docling-serve names
  2722. # the bundle's main JSON ``<canonical_stem>.json`` instead of
  2723. # ``<hinted_stem>.json``. Otherwise the IR builder — which only sees
  2724. # the canonical ``document_name`` — cannot locate the bundle JSON
  2725. # via the preferred-path lookup.
  2726. await client.download_into(
  2727. raw_dir, source_file_path, upload_filename=document_name
  2728. )
  2729. ir_builder = DoclingIRBuilder()
  2730. ir = ir_builder.normalize_from_workdir(raw_dir, document_name=document_name)
  2731. if not ir.blocks:
  2732. raise ValueError(
  2733. f"Docling IR builder produced zero blocks for {file_path} "
  2734. f"(raw_dir={raw_dir})"
  2735. )
  2736. parsed_data = write_sidecar(
  2737. ir,
  2738. parsed_dir=parsed_dir,
  2739. doc_id=doc_id,
  2740. engine=PARSER_ENGINE_DOCLING,
  2741. )
  2742. await self._persist_parsed_full_docs(
  2743. doc_id,
  2744. {
  2745. "content": make_lightrag_doc_content(parsed_data["content"]),
  2746. "file_path": file_path,
  2747. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  2748. "sidecar_location": sidecar_uri_for(parsed_dir),
  2749. "parse_engine": PARSER_ENGINE_DOCLING,
  2750. "update_time": int(time.time()),
  2751. },
  2752. )
  2753. await archive_docx_source_after_full_docs_sync(str(source_file_path))
  2754. return {
  2755. "doc_id": doc_id,
  2756. "file_path": file_path,
  2757. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  2758. "content": parsed_data["content"],
  2759. "blocks_path": parsed_data["blocks_path"],
  2760. "parse_stage_skipped": parse_stage_skipped,
  2761. }
  2762. # ============================================================
  2763. # Parser internals
  2764. # ============================================================
  2765. async def _persist_parsed_full_docs(
  2766. self,
  2767. doc_id: str,
  2768. record: dict[str, Any],
  2769. ) -> str | None:
  2770. """Write a parse-result record to ``full_docs`` and sync ``content_hash``.
  2771. Computes ``content_hash`` from the actual extracted body so subsequent
  2772. ``get_doc_by_content_hash`` lookups can dedupe across pending_parse
  2773. records that did not have a hash at enqueue time. Also patches the
  2774. existing ``doc_status`` row so both storages stay aligned on
  2775. ``content_hash``.
  2776. The original ``pending_parse`` record carries metadata seeded at
  2777. enqueue time (``process_options`` etc.) that downstream stages still
  2778. need after parsing. ``full_docs`` upserts overwrite the entire row,
  2779. so we merge the existing record with the new ``record`` payload
  2780. before upserting: fresh fields from ``record`` (``content`` /
  2781. ``parse_format`` / ``sidecar_location`` / ``parse_engine`` /
  2782. ``update_time``) take precedence, while pre-existing fields are
  2783. preserved.
  2784. """
  2785. fmt = record.get("parse_format")
  2786. content_hash: str | None = None
  2787. # Hash the bare merged text (after stripping the ``{{LRdoc}}`` marker
  2788. # for lightrag-format) so cross-filename dedup fires regardless of
  2789. # whether the same body was ingested as raw text or via a sidecar.
  2790. # ``strip_lightrag_doc_prefix`` is a no-op for non-lightrag formats.
  2791. if fmt in (FULL_DOCS_FORMAT_RAW, FULL_DOCS_FORMAT_LIGHTRAG):
  2792. content_hash = compute_text_content_hash(
  2793. strip_lightrag_doc_prefix(record.get("content") or "", fmt)
  2794. )
  2795. existing = await self.full_docs.get_by_id(doc_id)
  2796. if isinstance(existing, dict):
  2797. payload = {**existing, **record}
  2798. else:
  2799. payload = dict(record)
  2800. if content_hash:
  2801. payload["content_hash"] = content_hash
  2802. await self.full_docs.upsert({doc_id: payload})
  2803. await self.full_docs.index_done_callback()
  2804. if content_hash:
  2805. existing_status = await self.doc_status.get_by_id(doc_id)
  2806. if existing_status:
  2807. patched = dict(existing_status)
  2808. patched["content_hash"] = content_hash
  2809. patched["updated_at"] = datetime.now(timezone.utc).isoformat()
  2810. await self.doc_status.upsert({doc_id: patched})
  2811. return content_hash
  2812. async def _mark_duplicate_after_parse(
  2813. self,
  2814. doc_id: str,
  2815. status_doc: DocProcessingStatus,
  2816. file_path: str,
  2817. content_hash: str | None,
  2818. content_length: int,
  2819. content_data: dict[str, Any] | None = None,
  2820. pipeline_status: dict | None = None,
  2821. pipeline_status_lock: asyncio.Lock | None = None,
  2822. ) -> bool:
  2823. """Mark post-parse content duplicates and stop further processing."""
  2824. if not content_hash:
  2825. return False
  2826. match = await get_duplicate_doc_by_content_hash(
  2827. self.doc_status, content_hash, doc_id
  2828. )
  2829. if not match:
  2830. return False
  2831. original_doc_id, original_doc = match
  2832. original_track_id = doc_status_field(original_doc, "track_id", "")
  2833. original_status = doc_status_field(original_doc, "status", "unknown")
  2834. now = datetime.now(timezone.utc).isoformat()
  2835. message = (
  2836. "Identical content already exists under another filename. "
  2837. f"Original doc_id: {original_doc_id}, Status: {original_status}"
  2838. )
  2839. await self.doc_status.upsert(
  2840. {
  2841. doc_id: {
  2842. "status": DocStatus.FAILED,
  2843. "content_summary": (
  2844. f"[DUPLICATE:content_hash] Original document: {original_doc_id}"
  2845. ),
  2846. "content_length": content_length,
  2847. "chunks_count": 0,
  2848. "chunks_list": [],
  2849. "created_at": status_doc.created_at,
  2850. "updated_at": now,
  2851. "file_path": file_path,
  2852. "track_id": status_doc.track_id,
  2853. "content_hash": content_hash,
  2854. "error_msg": message,
  2855. "metadata": doc_status_transition_metadata(
  2856. status_doc,
  2857. extra={
  2858. "is_duplicate": True,
  2859. "duplicate_kind": "content_hash",
  2860. "original_doc_id": original_doc_id,
  2861. "original_track_id": original_track_id,
  2862. },
  2863. ),
  2864. }
  2865. }
  2866. )
  2867. try:
  2868. await self.full_docs.delete([doc_id])
  2869. await self.full_docs.index_done_callback()
  2870. except Exception as e:
  2871. logger.warning(f"Failed to remove duplicate full_docs entry {doc_id}: {e}")
  2872. source_path = _call_source_file_resolver(
  2873. self,
  2874. file_path,
  2875. source_file_name=content_data.get("source_file_name")
  2876. if content_data
  2877. else None,
  2878. )
  2879. archived = await archive_source_after_full_docs_sync(source_path)
  2880. archive_msg = f"; archived to {archived}" if archived else ""
  2881. warning = f"Duplicate content skipped after parsing: {file_path}{archive_msg}"
  2882. logger.warning(warning)
  2883. if pipeline_status is not None and pipeline_status_lock is not None:
  2884. async with pipeline_status_lock:
  2885. pipeline_status["latest_message"] = warning
  2886. pipeline_status["history_messages"].append(warning)
  2887. return True
  2888. def _resolve_source_file_for_parser(
  2889. self,
  2890. file_path: str,
  2891. *,
  2892. source_file_name: str | None = None,
  2893. parser_engine: str | None = None,
  2894. ) -> str:
  2895. """Resolve a readable source file path for parser upload.
  2896. ``file_path`` is the canonical stored basename. Pending-parse records
  2897. may also carry ``source_file_name`` with the real uploaded/scanned
  2898. basename, including parser hints.
  2899. """
  2900. candidates: list[Path] = []
  2901. roots: list[Path] = []
  2902. def _add_candidate(path_value: Any) -> None:
  2903. raw = str(path_value or "").strip()
  2904. if not raw:
  2905. return
  2906. path = Path(raw)
  2907. candidates.append(path)
  2908. if path.parent != Path("."):
  2909. roots.append(path.parent)
  2910. roots.append(path.parent / PARSED_DIR_NAME)
  2911. candidates.append(path.parent / PARSED_DIR_NAME / path.name)
  2912. _add_candidate(file_path)
  2913. p = Path(file_path)
  2914. name = p.name
  2915. source_name = Path(str(source_file_name or "").strip()).name
  2916. input_path = input_dir_path()
  2917. # API ``DocumentManager`` scopes its input dir to
  2918. # ``<base_input_dir>/<workspace>/`` (see DocumentManager.__init__);
  2919. # check that location first so files uploaded into a workspace
  2920. # subdirectory resolve correctly. ``self.workspace`` is empty when
  2921. # no workspace is configured, in which case these candidates
  2922. # collapse to the base candidates that follow.
  2923. workspace = getattr(self, "workspace", "") or ""
  2924. if workspace:
  2925. candidates.append(input_path / workspace / name)
  2926. candidates.append(input_path / workspace / PARSED_DIR_NAME / name)
  2927. roots.append(input_path / workspace)
  2928. roots.append(input_path / workspace / PARSED_DIR_NAME)
  2929. candidates.append(input_path / name)
  2930. candidates.append(input_path / PARSED_DIR_NAME / name)
  2931. roots.append(input_path)
  2932. roots.append(input_path / PARSED_DIR_NAME)
  2933. # Common local defaults used by API server.
  2934. cwd = Path.cwd()
  2935. if workspace:
  2936. candidates.append(cwd / "inputs" / workspace / name)
  2937. candidates.append(cwd / "inputs" / workspace / PARSED_DIR_NAME / name)
  2938. roots.append(cwd / "inputs" / workspace)
  2939. roots.append(cwd / "inputs" / workspace / PARSED_DIR_NAME)
  2940. candidates.extend(
  2941. [
  2942. cwd / "inputs" / name,
  2943. cwd / "inputs" / PARSED_DIR_NAME / name,
  2944. cwd / PARSED_DIR_NAME / name,
  2945. ]
  2946. )
  2947. roots.extend(
  2948. [
  2949. cwd / "inputs",
  2950. cwd / "inputs" / PARSED_DIR_NAME,
  2951. cwd / PARSED_DIR_NAME,
  2952. ]
  2953. )
  2954. if source_name:
  2955. candidates = [root / source_name for root in roots] + candidates
  2956. seen_candidates: set[Path] = set()
  2957. for candidate in candidates:
  2958. if candidate in seen_candidates:
  2959. continue
  2960. seen_candidates.add(candidate)
  2961. if candidate.exists() and candidate.is_file():
  2962. return str(candidate)
  2963. canonical_name = normalize_document_file_path(file_path)
  2964. if has_known_document_source(canonical_name):
  2965. matches: list[Path] = []
  2966. seen_roots: set[Path] = set()
  2967. for root in roots:
  2968. if root in seen_roots:
  2969. continue
  2970. seen_roots.add(root)
  2971. if not root.exists() or not root.is_dir():
  2972. continue
  2973. for candidate in sorted(root.iterdir(), key=lambda item: item.name):
  2974. if (
  2975. candidate.is_file()
  2976. and normalize_document_file_path(candidate.name)
  2977. == canonical_name
  2978. ):
  2979. matches.append(candidate)
  2980. if source_name:
  2981. for candidate in matches:
  2982. if candidate.name == source_name:
  2983. return str(candidate)
  2984. if parser_engine:
  2985. from lightrag.parser.routing import filename_parser_directives
  2986. for candidate in matches:
  2987. hinted_engine, _ = filename_parser_directives(candidate.name)
  2988. if hinted_engine == parser_engine:
  2989. return str(candidate)
  2990. if matches:
  2991. return str(matches[0])
  2992. return file_path
  2993. async def _write_lightrag_document_from_content_list(
  2994. self,
  2995. doc_id: str,
  2996. file_path: str,
  2997. content_list: list[dict[str, Any]],
  2998. engine: str,
  2999. ) -> dict[str, Any]:
  3000. """Convert parser content list to LightRAG Document files and return parsed_data."""
  3001. document_name = normalize_document_file_path(file_path)
  3002. if document_name == "unknown_source":
  3003. document_name = f"{doc_id}.bin"
  3004. parsed_dir = parsed_artifact_dir_for(document_name)
  3005. if parsed_dir.exists():
  3006. shutil.rmtree(parsed_dir)
  3007. parsed_dir.mkdir(parents=True, exist_ok=True)
  3008. base_name = Path(document_name).stem or document_name
  3009. blocks_path = parsed_dir / f"{base_name}.blocks.jsonl"
  3010. tables_path = parsed_dir / f"{base_name}.tables.json"
  3011. drawings_path = parsed_dir / f"{base_name}.drawings.json"
  3012. equations_path = parsed_dir / f"{base_name}.equations.json"
  3013. blocks_lines: list[str] = []
  3014. merged_parts: list[str] = []
  3015. block_idx = 0
  3016. table_idx = 0
  3017. drawing_idx = 0
  3018. equation_idx = 0
  3019. tables: dict[str, Any] = {}
  3020. drawings: dict[str, Any] = {}
  3021. equations: dict[str, Any] = {}
  3022. def _to_list_str(value: Any) -> list[str]:
  3023. if value is None:
  3024. return []
  3025. if isinstance(value, list):
  3026. return [str(x) for x in value if str(x).strip()]
  3027. text_val = str(value).strip()
  3028. return [text_val] if text_val else []
  3029. def _parse_int(value: Any, default: int = 0) -> int:
  3030. try:
  3031. return int(value)
  3032. except Exception:
  3033. return default
  3034. def _normalize_grid_rows(grid: Any) -> list[list[str]]:
  3035. normalized_rows: list[list[str]] = []
  3036. if not isinstance(grid, list):
  3037. return normalized_rows
  3038. for row in grid:
  3039. if not isinstance(row, list):
  3040. continue
  3041. normalized_row: list[str] = []
  3042. for cell in row:
  3043. if isinstance(cell, dict):
  3044. normalized_row.append(str(cell.get("text", "")).strip())
  3045. else:
  3046. normalized_row.append(str(cell).strip())
  3047. normalized_rows.append(normalized_row)
  3048. return normalized_rows
  3049. def _coerce_table_rows(
  3050. value: Any,
  3051. ) -> tuple[str, Any, list[list[str]], int, int]:
  3052. raw_value = value
  3053. if isinstance(raw_value, str):
  3054. stripped = raw_value.strip()
  3055. if not stripped:
  3056. return "html", "", [], 0, 0
  3057. parsed_value = None
  3058. try:
  3059. parsed_value = json.loads(stripped)
  3060. except Exception:
  3061. try:
  3062. import ast
  3063. parsed_value = ast.literal_eval(stripped)
  3064. except Exception:
  3065. parsed_value = None
  3066. if parsed_value is None:
  3067. return "html", raw_value, [], 0, 0
  3068. raw_value = parsed_value
  3069. if isinstance(raw_value, list):
  3070. rows = _normalize_grid_rows(raw_value)
  3071. return (
  3072. "json",
  3073. json.dumps(rows, ensure_ascii=False),
  3074. rows,
  3075. len(rows),
  3076. max((len(r) for r in rows), default=0),
  3077. )
  3078. if isinstance(raw_value, dict):
  3079. rows = _normalize_grid_rows(raw_value.get("grid"))
  3080. if not rows and isinstance(raw_value.get("rows"), list):
  3081. rows = _normalize_grid_rows(raw_value.get("rows"))
  3082. num_rows = _parse_int(
  3083. raw_value.get("num_rows"), len(rows) if rows else 0
  3084. )
  3085. num_cols = _parse_int(
  3086. raw_value.get("num_cols"),
  3087. max((len(r) for r in rows), default=0),
  3088. )
  3089. if rows:
  3090. return (
  3091. "json",
  3092. json.dumps(rows, ensure_ascii=False),
  3093. rows,
  3094. num_rows,
  3095. num_cols,
  3096. )
  3097. return (
  3098. "html",
  3099. json.dumps(raw_value, ensure_ascii=False),
  3100. [],
  3101. num_rows,
  3102. num_cols,
  3103. )
  3104. text_value = str(raw_value or "").strip()
  3105. return "html", text_value, [], 0, 0
  3106. heading_stack: list[str] = []
  3107. def _update_heading_context(
  3108. heading_text: str, level: int
  3109. ) -> tuple[str, int, list[str]]:
  3110. nonlocal heading_stack
  3111. clean_heading = str(heading_text or "").strip()
  3112. clean_level = max(_parse_int(level, 1), 1)
  3113. heading_stack = heading_stack[: max(clean_level - 1, 0)]
  3114. parent_chain = [x for x in heading_stack if x]
  3115. heading_stack.append(clean_heading)
  3116. return clean_heading, clean_level, parent_chain
  3117. def _append_block(
  3118. content_text: str,
  3119. heading: str = "",
  3120. level: int = 0,
  3121. parent_headings: list[str] | None = None,
  3122. ) -> str:
  3123. nonlocal block_idx
  3124. content_text = str(content_text or "").strip()
  3125. if not content_text:
  3126. return ""
  3127. blockid = hashlib.md5(
  3128. f"{doc_id}:{block_idx}:{heading}:{content_text}".encode("utf-8")
  3129. ).hexdigest()
  3130. blocks_lines.append(
  3131. json.dumps(
  3132. {
  3133. "type": "content",
  3134. "blockid": blockid,
  3135. "format": "plain_text",
  3136. "content": content_text,
  3137. "heading": heading,
  3138. "parent_headings": list(parent_headings or []),
  3139. "level": level,
  3140. "session_type": "body",
  3141. "table_slice": "none",
  3142. "positions": [],
  3143. },
  3144. ensure_ascii=False,
  3145. )
  3146. )
  3147. merged_parts.append(content_text)
  3148. block_idx += 1
  3149. return blockid
  3150. current_heading = ""
  3151. current_level = 0
  3152. current_parent_headings: list[str] = []
  3153. for item in content_list:
  3154. if not isinstance(item, dict):
  3155. continue
  3156. item_type = str(item.get("type") or item.get("label") or "").lower()
  3157. if item_type in {"text", "title", "section_header", "list", "code"}:
  3158. text = (
  3159. item.get("text")
  3160. or item.get("content")
  3161. or "\n".join(
  3162. item.get("list_items", [])
  3163. if isinstance(item.get("list_items"), list)
  3164. else []
  3165. )
  3166. or item.get("code_body")
  3167. or ""
  3168. )
  3169. if not str(text).strip():
  3170. continue
  3171. inferred_level = int(item.get("text_level", 0) or 0)
  3172. if item_type in {"title", "section_header"} and inferred_level <= 0:
  3173. inferred_level = int(item.get("level", 1) or 1)
  3174. if inferred_level > 0:
  3175. (
  3176. current_heading,
  3177. current_level,
  3178. current_parent_headings,
  3179. ) = _update_heading_context(str(text), inferred_level)
  3180. _append_block(
  3181. str(text),
  3182. heading=current_heading,
  3183. level=current_level,
  3184. parent_headings=current_parent_headings,
  3185. )
  3186. continue
  3187. if item_type == "equation":
  3188. equation_idx += 1
  3189. eq_id = str(
  3190. item.get("id")
  3191. or f"eq-{doc_id.removeprefix('doc-')}-{equation_idx:04d}"
  3192. )
  3193. caption = str(item.get("caption") or f"公式{equation_idx}")
  3194. footnotes = _to_list_str(
  3195. item.get("equation_footnote") or item.get("footnotes")
  3196. )
  3197. eq_text = str(item.get("text") or item.get("content") or "").strip()
  3198. wrapped = (
  3199. f'<equation id="{eq_id}" format="latex" caption="{caption}">{eq_text}</equation>'
  3200. if eq_text
  3201. else f'<cite type="equation" refid="{eq_id}">公式{equation_idx}</cite>'
  3202. )
  3203. blockid = _append_block(
  3204. wrapped,
  3205. heading=current_heading,
  3206. level=current_level,
  3207. parent_headings=current_parent_headings,
  3208. )
  3209. equations[eq_id] = {
  3210. "id": eq_id,
  3211. "blockid": blockid,
  3212. "heading": current_heading,
  3213. "format": "latex",
  3214. "content": eq_text,
  3215. "caption": caption,
  3216. "footnotes": footnotes,
  3217. }
  3218. continue
  3219. if item_type == "table":
  3220. table_idx += 1
  3221. table_id = str(
  3222. item.get("id")
  3223. or f"tb-{doc_id.removeprefix('doc-')}-{table_idx:04d}"
  3224. )
  3225. caption = str(item.get("caption") or f"表格{table_idx}")
  3226. table_caption = _to_list_str(item.get("table_caption"))
  3227. if table_caption and not item.get("caption"):
  3228. caption = table_caption[0]
  3229. footnotes = _to_list_str(
  3230. item.get("table_footnote") or item.get("footnotes")
  3231. )
  3232. table_body = item.get("table_body") or item.get("content") or ""
  3233. rows = item.get("rows") if isinstance(item.get("rows"), list) else None
  3234. (
  3235. fmt,
  3236. table_content,
  3237. normalized_rows,
  3238. inferred_num_rows,
  3239. inferred_num_cols,
  3240. ) = _coerce_table_rows(rows if rows is not None else table_body)
  3241. rows = normalized_rows or (rows if isinstance(rows, list) else [])
  3242. cite_text = (
  3243. f'<cite type="table" refid="{table_id}">表{table_idx}</cite>'
  3244. )
  3245. blockid = _append_block(
  3246. cite_text,
  3247. heading=current_heading,
  3248. level=current_level,
  3249. parent_headings=current_parent_headings,
  3250. )
  3251. tables[table_id] = {
  3252. "id": table_id,
  3253. "blockid": blockid,
  3254. "heading": current_heading,
  3255. "dimension": [
  3256. _parse_int(item.get("num_rows"), inferred_num_rows),
  3257. _parse_int(item.get("num_cols"), inferred_num_cols),
  3258. ],
  3259. "format": fmt,
  3260. "content": table_content,
  3261. "caption": caption,
  3262. "footnotes": footnotes,
  3263. "image": item.get("img_path") or item.get("image"),
  3264. }
  3265. continue
  3266. if item_type in {"image", "picture", "drawing"}:
  3267. drawing_idx += 1
  3268. drawing_id = str(
  3269. item.get("id")
  3270. or f"im-{doc_id.removeprefix('doc-')}-{drawing_idx:04d}"
  3271. )
  3272. image_caption = _to_list_str(
  3273. item.get("image_caption") or item.get("captions")
  3274. )
  3275. caption = str(
  3276. item.get("caption")
  3277. or (image_caption[0] if image_caption else f"图{drawing_idx}")
  3278. )
  3279. footnotes = _to_list_str(
  3280. item.get("image_footnote") or item.get("footnotes")
  3281. )
  3282. path_val = str(item.get("img_path") or item.get("path") or "")
  3283. src_val = str(item.get("src") or "")
  3284. fmt = (
  3285. Path(path_val).suffix.lower().lstrip(".")
  3286. if path_val
  3287. else str(item.get("format") or "")
  3288. )
  3289. drawing_tag = (
  3290. f'<drawing id="{drawing_id}" format="{fmt}" caption="{caption}" '
  3291. f'path="{path_val}" src="{src_val}" />'
  3292. )
  3293. blockid = _append_block(
  3294. drawing_tag,
  3295. heading=current_heading,
  3296. level=current_level,
  3297. parent_headings=current_parent_headings,
  3298. )
  3299. drawings[drawing_id] = {
  3300. "id": drawing_id,
  3301. "blockid": blockid,
  3302. "heading": current_heading,
  3303. "format": fmt,
  3304. "path": path_val,
  3305. "src": src_val,
  3306. "caption": caption,
  3307. "footnotes": footnotes,
  3308. }
  3309. continue
  3310. # Fallback: serialize unknown item to text for robustness.
  3311. fallback_text = str(item.get("text") or item.get("content") or "").strip()
  3312. if fallback_text:
  3313. _append_block(
  3314. fallback_text,
  3315. heading=current_heading,
  3316. level=current_level,
  3317. parent_headings=current_parent_headings,
  3318. )
  3319. merged_text = "\n\n".join([x for x in merged_parts if x.strip()])
  3320. doc_hash = hashlib.sha256(merged_text.encode("utf-8")).hexdigest()
  3321. parse_time = datetime.now(timezone.utc).isoformat()
  3322. meta = {
  3323. "type": "meta",
  3324. "format": "lightrag",
  3325. "version": "1.0",
  3326. "document_name": document_name,
  3327. "document_format": Path(document_name).suffix.lower().lstrip("."),
  3328. "document_hash": f"sha256:{doc_hash}",
  3329. "table_file": bool(tables),
  3330. "equation_file": bool(equations),
  3331. "drawing_file": bool(drawings),
  3332. "asset_dir": False,
  3333. "split_option": {},
  3334. "blocks": len(blocks_lines),
  3335. "doc_id": doc_id,
  3336. "parse_engine": engine,
  3337. "parse_time": parse_time,
  3338. "doc_title": Path(document_name).stem or document_name,
  3339. }
  3340. blocks_path.write_text(
  3341. "\n".join([json.dumps(meta, ensure_ascii=False)] + blocks_lines) + "\n",
  3342. encoding="utf-8",
  3343. )
  3344. if tables:
  3345. tables_path.write_text(
  3346. json.dumps(
  3347. {"version": "1.0", "tables": tables}, ensure_ascii=False, indent=2
  3348. ),
  3349. encoding="utf-8",
  3350. )
  3351. if drawings:
  3352. drawings_path.write_text(
  3353. json.dumps(
  3354. {"version": "1.0", "drawings": drawings},
  3355. ensure_ascii=False,
  3356. indent=2,
  3357. ),
  3358. encoding="utf-8",
  3359. )
  3360. if equations:
  3361. equations_path.write_text(
  3362. json.dumps(
  3363. {"version": "1.0", "equations": equations},
  3364. ensure_ascii=False,
  3365. indent=2,
  3366. ),
  3367. encoding="utf-8",
  3368. )
  3369. # Keep full_docs in sync so restart/reprocess can directly use LightRAG Document.
  3370. await self._persist_parsed_full_docs(
  3371. doc_id,
  3372. {
  3373. "content": make_lightrag_doc_content(merged_text),
  3374. "file_path": file_path,
  3375. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  3376. "sidecar_location": sidecar_uri_for(parsed_dir),
  3377. "parse_engine": engine,
  3378. "update_time": int(time.time()),
  3379. },
  3380. )
  3381. await archive_docx_source_after_full_docs_sync(
  3382. self._resolve_source_file_for_parser(file_path)
  3383. )
  3384. return {
  3385. "doc_id": doc_id,
  3386. "file_path": file_path,
  3387. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  3388. "content": merged_text,
  3389. "blocks_path": str(blocks_path),
  3390. }
  3391. # ============================================================
  3392. # Multimodal / VLM
  3393. # ============================================================
  3394. async def analyze_multimodal(
  3395. self,
  3396. doc_id: str,
  3397. file_path: str,
  3398. parsed_data: dict[str, Any],
  3399. *,
  3400. process_options: str | None = None,
  3401. pipeline_status: dict | None = None,
  3402. pipeline_status_lock: Any | None = None,
  3403. ) -> dict[str, Any]:
  3404. """Phase 2: Multimodal analysis (VLM). Writes llm_analyze_result to LightRAG Document.
  3405. Per-document ``i`` / ``t`` / ``e`` flags from
  3406. ``full_docs.process_options`` decide which modalities are sent to the
  3407. VLM. Sidecars are always written by the parser regardless of these
  3408. flags so toggling options later does not require re-parsing — only
  3409. the ``llm_analyze_result`` payload is gated here.
  3410. Per-item ``llm_analyze_result`` is recomputed and overwritten on each
  3411. run for enabled modalities. This lets operators fix VLM/EXTRACT
  3412. configuration or prompt limits and retry without manually clearing
  3413. prior failure markers from the sidecar.
  3414. Args:
  3415. process_options: Optional override that bypasses the
  3416. ``full_docs.process_options`` lookup; primarily used by unit
  3417. tests that exercise the VLM analysis path without going
  3418. through the enqueue pipeline.
  3419. """
  3420. from lightrag.parser.routing import parse_process_options
  3421. blocks_path = parsed_data.get("blocks_path")
  3422. if not blocks_path:
  3423. parsed_data["analyzing_stage_skipped"] = True
  3424. return parsed_data
  3425. block_file = Path(blocks_path)
  3426. if not block_file.exists():
  3427. parsed_data["analyzing_stage_skipped"] = True
  3428. return parsed_data
  3429. # Resolve which modalities the user opted into for this document.
  3430. if process_options is None:
  3431. try:
  3432. content_data = await self.full_docs.get_by_id(doc_id) or {}
  3433. except Exception:
  3434. content_data = {}
  3435. options_str = (
  3436. content_data.get("process_options")
  3437. if isinstance(content_data, dict)
  3438. else ""
  3439. ) or ""
  3440. else:
  3441. options_str = process_options
  3442. process_opts = parse_process_options(options_str)
  3443. if not (process_opts.images or process_opts.tables or process_opts.equations):
  3444. logger.debug(
  3445. f"[analyze_multimodal] no i/t/e options set for d-id: {doc_id}; "
  3446. f"skipping VLM analysis"
  3447. )
  3448. parsed_data["analyzing_stage_skipped"] = True
  3449. return parsed_data
  3450. # Diagnose opt-in vs sidecar mismatch up-front so users investigating
  3451. # "why did VLM not run on my images" see a one-line INFO per document
  3452. # instead of silent skips. Empty sidecars are a normal outcome
  3453. # (some documents simply have no images/tables/equations), so this is
  3454. # informational rather than a warning.
  3455. sidecar_base = str(block_file)
  3456. if sidecar_base.endswith(".blocks.jsonl"):
  3457. sidecar_base = sidecar_base[: -len(".blocks.jsonl")]
  3458. opt_in_missing: list[str] = []
  3459. for opt_char, modality, suffix in (
  3460. ("i", "drawings", ".drawings.json"),
  3461. ("t", "tables", ".tables.json"),
  3462. ("e", "equations", ".equations.json"),
  3463. ):
  3464. enabled = {
  3465. "i": process_opts.images,
  3466. "t": process_opts.tables,
  3467. "e": process_opts.equations,
  3468. }[opt_char]
  3469. if enabled and not Path(sidecar_base + suffix).exists():
  3470. opt_in_missing.append(f"{opt_char}:{modality}")
  3471. if opt_in_missing:
  3472. logger.info(
  3473. f"[analyze_multimodal] {','.join(opt_in_missing)} sidecar empty: {doc_id}"
  3474. )
  3475. # Backfill sidecar `surrounding` for the enabled modalities just
  3476. # before VLM consumption. Universal coverage: native, MinerU,
  3477. # Docling, and pre-existing LightRAG documents reused from disk
  3478. # all go through this single entrypoint. Idempotent: re-runs
  3479. # overwrite with stable output given unchanged block content.
  3480. enabled_modalities = {
  3481. mod
  3482. for mod, on in (
  3483. ("drawings", process_opts.images),
  3484. ("tables", process_opts.tables),
  3485. ("equations", process_opts.equations),
  3486. )
  3487. if on
  3488. }
  3489. tokenizer = getattr(self, "tokenizer", None)
  3490. if enabled_modalities and tokenizer is not None:
  3491. try:
  3492. from lightrag.multimodal_context import (
  3493. enrich_sidecars_with_surrounding,
  3494. )
  3495. enrich_counts = enrich_sidecars_with_surrounding(
  3496. blocks_path=str(block_file),
  3497. enabled_modalities=enabled_modalities,
  3498. tokenizer=tokenizer,
  3499. )
  3500. if any(enrich_counts.values()):
  3501. logger.info(
  3502. "[analyze_multimodal] "
  3503. + ", ".join(f"{k}={v}" for k, v in enrich_counts.items() if v)
  3504. + f" surrounding backfilled: {doc_id}"
  3505. )
  3506. except Exception as enrich_err:
  3507. logger.warning(
  3508. f"[analyze_multimodal] surrounding enrichment failed for "
  3509. f"d-id: {doc_id}, file: {file_path}: {enrich_err}"
  3510. )
  3511. try:
  3512. lines = block_file.read_text(encoding="utf-8").splitlines()
  3513. if not lines:
  3514. return parsed_data
  3515. meta = json.loads(lines[0])
  3516. if not isinstance(meta, dict) or meta.get("type") != "meta":
  3517. return parsed_data
  3518. from lightrag.llm._vision_utils import (
  3519. image_audit_metadata,
  3520. image_cache_metadata,
  3521. normalize_image_inputs,
  3522. read_image_dimensions,
  3523. )
  3524. from lightrag.prompt_multimodal import (
  3525. IMAGE_TYPE_ENUM,
  3526. IMAGE_TYPE_FALLBACK,
  3527. MULTIMODAL_PROMPTS,
  3528. )
  3529. from lightrag.constants import (
  3530. DEFAULT_MM_ANALYSIS_PRIORITY,
  3531. DEFAULT_MM_IMAGE_MIN_PIXEL,
  3532. DEFAULT_SUMMARY_LANGUAGE,
  3533. )
  3534. global_config = self._build_global_config()
  3535. addon_params = global_config.get("addon_params") or {}
  3536. language = (
  3537. global_config.get("_resolved_summary_language")
  3538. or addon_params.get("language")
  3539. or DEFAULT_SUMMARY_LANGUAGE
  3540. )
  3541. vlm_process_enable = bool(global_config.get("vlm_process_enable", False))
  3542. max_image_bytes = max(
  3543. 256 * 1024,
  3544. int(os.getenv("VLM_MAX_IMAGE_BYTES", str(5 * 1024 * 1024))),
  3545. )
  3546. min_image_pixel = max(
  3547. 1,
  3548. int(os.getenv("VLM_MIN_IMAGE_PIXEL", str(DEFAULT_MM_IMAGE_MIN_PIXEL))),
  3549. )
  3550. # Multimodal analysis shares the entity-extraction cache flag
  3551. # (both run with mode="default" — see handle_cache short-circuit
  3552. # in lightrag.utils). When the flag is off we must NOT save the
  3553. # response either, otherwise stale cache entries would still
  3554. # accumulate while reads are blocked. cache_id attachment to
  3555. # the sidecar item.llm_cache_list is likewise gated so a
  3556. # disabled cache does not seed cache-cleanup metadata that
  3557. # corresponds to entries that were never persisted.
  3558. analysis_cache_enabled = bool(
  3559. global_config.get("enable_llm_cache_for_entity_extract")
  3560. )
  3561. use_vlm_func = self.role_llm_funcs.get("vlm")
  3562. use_extract_func = self.role_llm_funcs.get("extract")
  3563. vlm_cache_identity = get_llm_cache_identity(global_config, role="vlm")
  3564. extract_cache_identity = get_llm_cache_identity(
  3565. global_config, role="extract"
  3566. )
  3567. _IMAGE_TYPE_VALUES = set(IMAGE_TYPE_ENUM)
  3568. _VLM_RASTER_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".webp"}
  3569. def _json_extract(text: str) -> dict[str, Any]:
  3570. """Tolerant JSON object recovery.
  3571. Mirrors :func:`lightrag.operate._process_json_extraction_result`
  3572. so weaker models that emit ```json ... ``` fenced output,
  3573. trailing commas, or unquoted keys are still salvageable.
  3574. The order of attempts is:
  3575. 1. Strip a leading ```json fence if present.
  3576. 2. Hand the cleaned string to ``json_repair.loads`` (handles
  3577. minor structural slips like trailing commas).
  3578. 3. Fall back to a greedy ``{...}`` regex slice for outputs
  3579. that wrap the JSON object in prose, then re-run
  3580. ``json_repair.loads`` on the slice.
  3581. """
  3582. if not text:
  3583. return {}
  3584. candidate = text.strip()
  3585. fence_match = re.match(
  3586. r"^```(?:json)?\s*\n(.*?)\n```$",
  3587. candidate,
  3588. re.DOTALL | re.IGNORECASE,
  3589. )
  3590. if fence_match:
  3591. candidate = fence_match.group(1).strip()
  3592. try:
  3593. obj = json_repair.loads(candidate)
  3594. if isinstance(obj, dict):
  3595. return obj
  3596. except Exception:
  3597. pass
  3598. m = re.search(r"\{[\s\S]*\}", candidate)
  3599. if m:
  3600. try:
  3601. obj = json_repair.loads(m.group(0))
  3602. if isinstance(obj, dict):
  3603. return obj
  3604. except Exception:
  3605. pass
  3606. return {}
  3607. def _normalize_text(value: Any) -> str:
  3608. if value is None:
  3609. return ""
  3610. if isinstance(value, str):
  3611. return value.strip()
  3612. if isinstance(value, (list, tuple)):
  3613. return "\n".join(str(v).strip() for v in value if str(v).strip())
  3614. return str(value).strip()
  3615. def _captions_value(item_obj: dict[str, Any]) -> str:
  3616. return _normalize_text(item_obj.get("caption")) or "n/a"
  3617. def _footnotes_value(item_obj: dict[str, Any]) -> str:
  3618. raw = item_obj.get("footnotes")
  3619. if isinstance(raw, (list, tuple)):
  3620. joined = "; ".join(str(v).strip() for v in raw if str(v).strip())
  3621. return joined or "n/a"
  3622. text = _normalize_text(raw)
  3623. return text or "n/a"
  3624. def _surrounding_value(item_obj: dict[str, Any], key: str) -> str:
  3625. surrounding = item_obj.get("surrounding") or {}
  3626. if not isinstance(surrounding, dict):
  3627. return "n/a"
  3628. value = _normalize_text(surrounding.get(key))
  3629. return value or "n/a"
  3630. def _resolve_image_path(
  3631. path_str: str | None, sidecar_dir: Path
  3632. ) -> Path | None:
  3633. if not path_str:
  3634. return None
  3635. candidate = Path(path_str)
  3636. if not candidate.is_absolute():
  3637. sidecar_candidate = sidecar_dir / path_str
  3638. if sidecar_candidate.exists() and sidecar_candidate.is_file():
  3639. candidate = sidecar_candidate
  3640. if candidate.exists() and candidate.is_file():
  3641. return candidate
  3642. return None
  3643. def _failure_result(message: str) -> dict[str, Any]:
  3644. return {
  3645. "analyze_time": int(time.time()),
  3646. "status": "failure",
  3647. "message": message,
  3648. }
  3649. def _skipped_result(message: str) -> dict[str, Any]:
  3650. return {
  3651. "analyze_time": int(time.time()),
  3652. "status": "skipped",
  3653. "message": message,
  3654. }
  3655. async def _analyze_drawing(
  3656. item_id: str, item: dict[str, Any], sidecar_dir: Path
  3657. ) -> tuple[dict[str, Any], str | None]:
  3658. path_str = (
  3659. item.get("path") or item.get("img_path") or item.get("image_path")
  3660. )
  3661. candidate = _resolve_image_path(path_str, sidecar_dir)
  3662. if candidate is None:
  3663. return (
  3664. _skipped_result(f"image file not found: {path_str or 'n/a'}"),
  3665. None,
  3666. )
  3667. ext = candidate.suffix.lower()
  3668. if ext not in _VLM_RASTER_EXTS:
  3669. return (
  3670. _skipped_result(f"unsupported image format: {ext}"),
  3671. None,
  3672. )
  3673. dims = read_image_dimensions(candidate)
  3674. if dims is not None and (
  3675. dims[0] < min_image_pixel or dims[1] < min_image_pixel
  3676. ):
  3677. return (
  3678. _skipped_result(
  3679. f"image width or height is smaller than "
  3680. f"{min_image_pixel}px"
  3681. ),
  3682. None,
  3683. )
  3684. if not vlm_process_enable or use_vlm_func is None:
  3685. raise MultimodalAnalysisError(
  3686. f"drawings/{item_id}: VLM analysis required but "
  3687. "VLM role is not available "
  3688. "(VLM_PROCESS_ENABLE or vlm role config)"
  3689. )
  3690. try:
  3691. raw = candidate.read_bytes()
  3692. except OSError as exc:
  3693. raise MultimodalAnalysisError(
  3694. f"drawings/{item_id}: cannot read image {candidate}: {exc}"
  3695. ) from exc
  3696. if not raw:
  3697. raise MultimodalAnalysisError(
  3698. f"drawings/{item_id}: image file is empty"
  3699. )
  3700. if len(raw) > max_image_bytes:
  3701. return (
  3702. _skipped_result(
  3703. f"image too large: {len(raw)} bytes "
  3704. f"(limit {max_image_bytes})"
  3705. ),
  3706. None,
  3707. )
  3708. mime, _ = mimetypes.guess_type(str(candidate))
  3709. mime = mime or "image/png"
  3710. img_payload = {
  3711. "base64": base64.b64encode(raw).decode("ascii"),
  3712. "mime_type": mime,
  3713. "source_id": item_id,
  3714. "source_file": str(candidate),
  3715. "modality": "image",
  3716. "doc_id": doc_id,
  3717. }
  3718. normalized_images = normalize_image_inputs([img_payload])
  3719. prompt = MULTIMODAL_PROMPTS["image_analysis"].format(
  3720. language=language,
  3721. content="",
  3722. captions=_captions_value(item),
  3723. footnotes=_footnotes_value(item),
  3724. leading=_surrounding_value(item, "leading"),
  3725. trailing=_surrounding_value(item, "trailing"),
  3726. item_id=item_id,
  3727. file_path=file_path,
  3728. )
  3729. args_hash = compute_args_hash(
  3730. prompt,
  3731. "",
  3732. "",
  3733. serialize_llm_cache_identity(vlm_cache_identity),
  3734. _serialize_cache_variant({"type": "json_object"}),
  3735. _serialize_cache_variant(image_cache_metadata(normalized_images)),
  3736. "drawing",
  3737. )
  3738. cache_id = generate_cache_key("default", "analysis", args_hash)
  3739. cached = await handle_cache(
  3740. self.llm_response_cache,
  3741. args_hash,
  3742. prompt,
  3743. mode="default",
  3744. cache_type="analysis",
  3745. )
  3746. if cached is not None:
  3747. result_text = cached[0]
  3748. fresh = False
  3749. else:
  3750. try:
  3751. result_text = await use_vlm_func(
  3752. prompt,
  3753. stream=False,
  3754. image_inputs=[img_payload],
  3755. _priority=DEFAULT_MM_ANALYSIS_PRIORITY,
  3756. )
  3757. except Exception as exc:
  3758. raise MultimodalAnalysisError(
  3759. f"drawings/{item_id}: VLM call failed: {exc}"
  3760. ) from exc
  3761. fresh = True
  3762. parsed = _json_extract(str(result_text))
  3763. name = parsed.get("name")
  3764. type_value = parsed.get("type")
  3765. description = parsed.get("description")
  3766. if not isinstance(name, str) or not name.strip():
  3767. raise MultimodalAnalysisError(
  3768. f"drawings/{item_id}: missing or invalid field 'name'"
  3769. )
  3770. if not isinstance(description, str) or not description.strip():
  3771. raise MultimodalAnalysisError(
  3772. f"drawings/{item_id}: missing or invalid field 'description'"
  3773. )
  3774. if not isinstance(type_value, str) or not type_value.strip():
  3775. raise MultimodalAnalysisError(
  3776. f"drawings/{item_id}: missing or invalid field 'type'"
  3777. )
  3778. if type_value not in _IMAGE_TYPE_VALUES:
  3779. type_value = IMAGE_TYPE_FALLBACK
  3780. cache_id_to_attach: str | None = None
  3781. if fresh and analysis_cache_enabled:
  3782. audit_blob = image_audit_metadata(normalized_images)
  3783. original_prompt = prompt + (
  3784. f"\n<vlm_images>"
  3785. f"{json.dumps(audit_blob, ensure_ascii=False)}"
  3786. "</vlm_images>"
  3787. if audit_blob
  3788. else ""
  3789. )
  3790. await save_to_cache(
  3791. self.llm_response_cache,
  3792. CacheData(
  3793. args_hash=args_hash,
  3794. content=str(result_text),
  3795. prompt=original_prompt,
  3796. mode="default",
  3797. cache_type="analysis",
  3798. chunk_id=None,
  3799. ),
  3800. )
  3801. cache_id_to_attach = cache_id
  3802. elif not fresh:
  3803. # Cache hit: the entry exists, so attaching its id is
  3804. # safe (and necessary for document-delete cleanup).
  3805. cache_id_to_attach = cache_id
  3806. return (
  3807. {
  3808. "name": name.strip(),
  3809. "type": type_value,
  3810. "description": description.strip(),
  3811. "analyze_time": int(time.time()),
  3812. "status": "success",
  3813. "message": "",
  3814. },
  3815. cache_id_to_attach,
  3816. )
  3817. async def _analyze_text_modality(
  3818. kind: str, item_id: str, item: dict[str, Any]
  3819. ) -> tuple[dict[str, Any], str | None]:
  3820. if use_extract_func is None:
  3821. raise MultimodalAnalysisError(
  3822. f"{kind}/{item_id}: EXTRACT role is required but not configured"
  3823. )
  3824. content_text = _normalize_text(item.get("content"))
  3825. if not content_text:
  3826. if kind == "table":
  3827. # Defensive fallback for sidecars that still carry
  3828. # empty-bodied table items (e.g. produced by an older
  3829. # parser run, or by a parser that doesn't filter
  3830. # MinerU-style misidentified blanks). Don't abort the
  3831. # whole worker — record the skip and move on.
  3832. logger.warning(
  3833. f"[analyze_multimodal] table/{item_id}: missing "
  3834. f"table content; skipping analysis ({file_path})"
  3835. )
  3836. return (
  3837. _skipped_result("missing table content"),
  3838. None,
  3839. )
  3840. raise MultimodalAnalysisError(
  3841. f"{kind}/{item_id}: missing {kind} content"
  3842. )
  3843. template = MULTIMODAL_PROMPTS[f"{kind}_analysis"]
  3844. def _render(content_value: str) -> str:
  3845. return template.format(
  3846. language=language,
  3847. content=content_value,
  3848. captions=_captions_value(item),
  3849. footnotes=_footnotes_value(item),
  3850. leading=_surrounding_value(item, "leading"),
  3851. trailing=_surrounding_value(item, "trailing"),
  3852. item_id=item_id,
  3853. file_path=file_path,
  3854. )
  3855. prompt = _render(content_text)
  3856. # Cap the EXTRACT prompt at MAX_EXTRACT_INPUT_TOKENS by
  3857. # trimming the (typically huge) sidecar `content` field — the
  3858. # other slots (surrounding/captions/footnotes) already have
  3859. # their own per-field caps upstream. The cap is resolved
  3860. # from the env var (falling back to
  3861. # DEFAULT_MAX_EXTRACT_INPUT_TOKENS) so deployments can tune
  3862. # it for their model's context window.
  3863. tokenizer = getattr(self, "tokenizer", None)
  3864. if tokenizer is not None:
  3865. from lightrag.constants import DEFAULT_MAX_EXTRACT_INPUT_TOKENS
  3866. from lightrag.multimodal_context import trim_content_to_budget
  3867. SAFETY_BUFFER = 256
  3868. max_extract_tokens = get_env_value(
  3869. "MAX_EXTRACT_INPUT_TOKENS",
  3870. DEFAULT_MAX_EXTRACT_INPUT_TOKENS,
  3871. int,
  3872. )
  3873. total_tokens = len(tokenizer.encode(prompt))
  3874. if max_extract_tokens > 0 and total_tokens > max_extract_tokens:
  3875. frame_tokens = len(tokenizer.encode(_render("")))
  3876. content_budget = (
  3877. max_extract_tokens - frame_tokens - SAFETY_BUFFER
  3878. )
  3879. if content_budget <= 0:
  3880. # The prompt template alone (with empty content)
  3881. # already exceeds the cap — no content trim can
  3882. # bring the request under the limit. Fail this
  3883. # item rather than handing the LLM a payload we
  3884. # know will trigger ``context_length_exceeded``.
  3885. # Operators must raise MAX_EXTRACT_INPUT_TOKENS
  3886. # above the template frame for analysis to
  3887. # succeed; the document is reprocessable
  3888. # idempotently once the cap is widened.
  3889. raise MultimodalAnalysisError(
  3890. f"{kind}/{item_id}: prompt frame "
  3891. f"({frame_tokens} tokens) exceeds "
  3892. f"MAX_EXTRACT_INPUT_TOKENS "
  3893. f"({max_extract_tokens}); raise the cap"
  3894. )
  3895. trimmed, was_trimmed = trim_content_to_budget(
  3896. content_text,
  3897. kind=f"{kind}s",
  3898. max_tokens=content_budget,
  3899. tokenizer=tokenizer,
  3900. )
  3901. if was_trimmed:
  3902. prompt = _render(trimmed)
  3903. logger.warning(
  3904. f"[analyze_multimodal] {kind}/{item_id} "
  3905. f"content trimmed (prompt {total_tokens} "
  3906. f"→ fit {max_extract_tokens}, "
  3907. f"content_budget={content_budget})"
  3908. )
  3909. # Post-trim hard guard: ``trim_content_to_budget``
  3910. # is constrained by ``content_budget`` so the final
  3911. # prompt should fit within ``max_extract_tokens``;
  3912. # defend against tokenizer rounding / future template
  3913. # changes that could push it over. Refuse the call
  3914. # rather than send an over-cap prompt to the LLM.
  3915. final_tokens = len(tokenizer.encode(prompt))
  3916. if final_tokens > max_extract_tokens:
  3917. raise MultimodalAnalysisError(
  3918. f"{kind}/{item_id}: trimmed prompt "
  3919. f"({final_tokens} tokens) still exceeds "
  3920. f"MAX_EXTRACT_INPUT_TOKENS "
  3921. f"({max_extract_tokens})"
  3922. )
  3923. args_hash = compute_args_hash(
  3924. prompt,
  3925. "",
  3926. "",
  3927. serialize_llm_cache_identity(extract_cache_identity),
  3928. _serialize_cache_variant({"type": "json_object"}),
  3929. _serialize_cache_variant([]),
  3930. kind,
  3931. )
  3932. cache_id = generate_cache_key("default", "analysis", args_hash)
  3933. cached = await handle_cache(
  3934. self.llm_response_cache,
  3935. args_hash,
  3936. prompt,
  3937. mode="default",
  3938. cache_type="analysis",
  3939. )
  3940. if cached is not None:
  3941. result_text = cached[0]
  3942. fresh = False
  3943. else:
  3944. try:
  3945. result_text = await use_extract_func(
  3946. prompt,
  3947. stream=False,
  3948. response_format={"type": "json_object"},
  3949. _priority=DEFAULT_MM_ANALYSIS_PRIORITY,
  3950. )
  3951. except Exception as exc:
  3952. raise MultimodalAnalysisError(
  3953. f"{kind}/{item_id}: EXTRACT call failed: {exc}"
  3954. ) from exc
  3955. fresh = True
  3956. parsed = _json_extract(str(result_text))
  3957. name = parsed.get("name")
  3958. description = parsed.get("description")
  3959. if not isinstance(name, str) or not name.strip():
  3960. raise MultimodalAnalysisError(
  3961. f"{kind}/{item_id}: missing or invalid field 'name'"
  3962. )
  3963. if not isinstance(description, str) or not description.strip():
  3964. raise MultimodalAnalysisError(
  3965. f"{kind}/{item_id}: missing or invalid field 'description'"
  3966. )
  3967. result_obj: dict[str, Any] = {
  3968. "name": name.strip(),
  3969. "description": description.strip(),
  3970. "analyze_time": int(time.time()),
  3971. "status": "success",
  3972. "message": "",
  3973. }
  3974. if kind == "equation":
  3975. equation_value = parsed.get("equation")
  3976. if (
  3977. not isinstance(equation_value, str)
  3978. or not equation_value.strip()
  3979. ):
  3980. raise MultimodalAnalysisError(
  3981. f"equation/{item_id}: missing or invalid field 'equation'"
  3982. )
  3983. result_obj["equation"] = equation_value.strip()
  3984. cache_id_to_attach: str | None = None
  3985. if fresh and analysis_cache_enabled:
  3986. await save_to_cache(
  3987. self.llm_response_cache,
  3988. CacheData(
  3989. args_hash=args_hash,
  3990. content=str(result_text),
  3991. prompt=prompt,
  3992. mode="default",
  3993. cache_type="analysis",
  3994. chunk_id=None,
  3995. ),
  3996. )
  3997. cache_id_to_attach = cache_id
  3998. elif not fresh:
  3999. # Cache hit path (handle_cache already gated by flag):
  4000. # safe to surface the existing cache_id for cleanup.
  4001. cache_id_to_attach = cache_id
  4002. return (result_obj, cache_id_to_attach)
  4003. def _attach_cache_id(
  4004. item_obj: dict[str, Any], cache_id: str | None
  4005. ) -> None:
  4006. if not cache_id:
  4007. return
  4008. existing = item_obj.get("llm_cache_list")
  4009. if not isinstance(existing, list):
  4010. existing = []
  4011. if cache_id not in existing:
  4012. existing.append(cache_id)
  4013. item_obj["llm_cache_list"] = existing
  4014. async def _run_with_progress_log(coro, kind: str, item_id: str):
  4015. """Append per-item completion log to pipeline_status the moment
  4016. this single ``_analyze_*`` task finishes — not after the whole
  4017. ``asyncio.gather`` batch returns — so the UI sees each
  4018. drawing/table/equation result land in real time.
  4019. Skipped items are demoted to debug-only logs and do NOT write
  4020. pipeline_status — benign skips (image too small / wrong format
  4021. / missing table body) otherwise flood the UI history for docs
  4022. with many items. The per-item ``llm_analyze_result.message``
  4023. still records why the item was skipped."""
  4024. try:
  4025. result = await coro
  4026. except Exception:
  4027. log_message = f"Analyzing {kind}/{item_id}: failed"
  4028. logger.warning(log_message)
  4029. if pipeline_status is not None and pipeline_status_lock is not None:
  4030. async with pipeline_status_lock:
  4031. pipeline_status["latest_message"] = log_message
  4032. pipeline_status["history_messages"].append(log_message)
  4033. raise
  4034. result_obj = result[0] if isinstance(result, tuple) else {}
  4035. is_success = (
  4036. isinstance(result_obj, dict)
  4037. and result_obj.get("status") == "success"
  4038. )
  4039. if is_success:
  4040. log_message = f"Analyzing {kind}/{item_id}: ok"
  4041. logger.info(log_message)
  4042. if pipeline_status is not None and pipeline_status_lock is not None:
  4043. async with pipeline_status_lock:
  4044. pipeline_status["latest_message"] = log_message
  4045. pipeline_status["history_messages"].append(log_message)
  4046. else:
  4047. logger.debug(f"Analyzing {kind}/{item_id}: skipped")
  4048. return result
  4049. base_name = str(block_file)
  4050. if base_name.endswith(".blocks.jsonl"):
  4051. base_name = base_name[: -len(".blocks.jsonl")]
  4052. sidecars = [
  4053. (
  4054. Path(base_name + ".drawings.json"),
  4055. "drawings",
  4056. "drawing",
  4057. process_opts.images,
  4058. ),
  4059. (
  4060. Path(base_name + ".tables.json"),
  4061. "tables",
  4062. "table",
  4063. process_opts.tables,
  4064. ),
  4065. (
  4066. Path(base_name + ".equations.json"),
  4067. "equations",
  4068. "equation",
  4069. process_opts.equations,
  4070. ),
  4071. ]
  4072. start_logged = False
  4073. for sidecar_path, root_key, kind, enabled in sidecars:
  4074. if not enabled or not sidecar_path.exists():
  4075. continue
  4076. try:
  4077. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  4078. except Exception as exc:
  4079. raise MultimodalAnalysisError(
  4080. f"failed to read sidecar {sidecar_path}: {exc}"
  4081. ) from exc
  4082. items = payload.get(root_key, {})
  4083. if not isinstance(items, dict):
  4084. continue
  4085. if (
  4086. items
  4087. and not start_logged
  4088. and pipeline_status is not None
  4089. and pipeline_status_lock is not None
  4090. ):
  4091. async with pipeline_status_lock:
  4092. log_message = f"Analyzing multimodal: {doc_id}"
  4093. logger.info(log_message)
  4094. pipeline_status["latest_message"] = log_message
  4095. pipeline_status["history_messages"].append(log_message)
  4096. start_logged = True
  4097. # Pre-schedule cancellation check: if the user cancelled
  4098. # between _analyze_worker's boundary check and the moment
  4099. # we are about to spawn VLM tasks for this sidecar, raise
  4100. # here so no item task ever runs. Without this we'd briefly
  4101. # create tasks and then cancel them on the very first poll
  4102. # iteration — wasteful and harder to reason about.
  4103. if pipeline_status is not None and pipeline_status_lock is not None:
  4104. await self._raise_if_cancelled(
  4105. pipeline_status, pipeline_status_lock
  4106. )
  4107. task_meta: dict[asyncio.Task, tuple[str, dict]] = {}
  4108. for item_id, item in items.items():
  4109. if not isinstance(item, dict):
  4110. continue
  4111. if kind == "drawing":
  4112. inner_coro = _analyze_drawing(
  4113. item_id, item, sidecar_path.parent
  4114. )
  4115. else:
  4116. inner_coro = _analyze_text_modality(kind, item_id, item)
  4117. task = asyncio.create_task(
  4118. _run_with_progress_log(inner_coro, kind, item_id)
  4119. )
  4120. task_meta[task] = (item_id, item)
  4121. if not task_meta:
  4122. # No valid items in this sidecar — asyncio.wait([]) would
  4123. # ValueError, so skip the wait loop entirely.
  4124. continue
  4125. # Fail-fast polling loop. Three trigger paths:
  4126. # 1. an item task raises (e.g. MultimodalAnalysisError) →
  4127. # asyncio.wait returns early via FIRST_EXCEPTION;
  4128. # 2. an item task raises PipelineCancelledException →
  4129. # same path, preserving the exception type;
  4130. # 3. user clicks /cancel_pipeline mid-VLM → the
  4131. # cancellation_requested check at the top of the next
  4132. # poll iteration (≤ POLL_INTERVAL_SECONDS) fabricates
  4133. # a PipelineCancelledException.
  4134. #
  4135. # Do NOT add a watcher coroutine to the wait set: it would be
  4136. # an infinite loop that stays pending when all items succeed,
  4137. # preventing FIRST_EXCEPTION from ever returning.
  4138. pending: set[asyncio.Task] = set(task_meta.keys())
  4139. fail_fast_exc: BaseException | None = None
  4140. POLL_INTERVAL_SECONDS = 0.5
  4141. while pending:
  4142. if (
  4143. pipeline_status is not None
  4144. and pipeline_status_lock is not None
  4145. and await self._cancellation_requested(
  4146. pipeline_status, pipeline_status_lock
  4147. )
  4148. ):
  4149. fail_fast_exc = PipelineCancelledException(
  4150. "User cancelled during analyze"
  4151. )
  4152. break
  4153. done_now, pending = await asyncio.wait(
  4154. pending,
  4155. timeout=POLL_INTERVAL_SECONDS,
  4156. return_when=asyncio.FIRST_EXCEPTION,
  4157. )
  4158. for t in done_now:
  4159. if t.cancelled():
  4160. continue
  4161. texc = t.exception()
  4162. if texc is not None:
  4163. # Preserve original exception type so the
  4164. # _analyze_worker except dispatch can distinguish
  4165. # PipelineCancelledException from
  4166. # MultimodalAnalysisError.
  4167. fail_fast_exc = texc
  4168. break
  4169. if fail_fast_exc is not None:
  4170. break
  4171. # If we broke early, cancel the still-running tasks.
  4172. for t in pending:
  4173. t.cancel()
  4174. if pending:
  4175. await asyncio.gather(*pending, return_exceptions=True)
  4176. # Collect results — preserve completed successes so reprocess
  4177. # can hit the LLM cache instead of re-running the VLM.
  4178. for t, (item_id, item) in task_meta.items():
  4179. if t.cancelled():
  4180. item["llm_analyze_result"] = _failure_result("cancelled")
  4181. continue
  4182. texc = t.exception()
  4183. if texc is None:
  4184. result_obj, cache_id = t.result()
  4185. item["llm_analyze_result"] = result_obj
  4186. _attach_cache_id(item, cache_id)
  4187. elif isinstance(texc, PipelineCancelledException):
  4188. item["llm_analyze_result"] = _failure_result("cancelled")
  4189. elif isinstance(texc, MultimodalAnalysisError):
  4190. item["llm_analyze_result"] = _failure_result(str(texc))
  4191. else:
  4192. item["llm_analyze_result"] = _failure_result(
  4193. f"unexpected error: {texc}"
  4194. )
  4195. try:
  4196. sidecar_path.write_text(
  4197. json.dumps(payload, ensure_ascii=False, indent=2),
  4198. encoding="utf-8",
  4199. )
  4200. except OSError as exc:
  4201. logger.warning(
  4202. f"[analyze_multimodal] failed to write sidecar "
  4203. f"{sidecar_path}: {exc}"
  4204. )
  4205. if fail_fast_exc is not None:
  4206. # Best-effort cache flush so any cache_ids written by
  4207. # already-completed sibling tasks survive a restart —
  4208. # otherwise the sidecar references cache rows that
  4209. # haven't been persisted yet. Mirrors
  4210. # _finalize_doc_failure's PROCESS-stage behaviour.
  4211. if self.llm_response_cache:
  4212. try:
  4213. await self.llm_response_cache.index_done_callback()
  4214. except Exception as persist_error:
  4215. logger.error(
  4216. f"Failed to persist LLM cache after analyze "
  4217. f"fail-fast: {persist_error}"
  4218. )
  4219. raise fail_fast_exc
  4220. parsed_data["multimodal_processed"] = True
  4221. logger.info(f"[analyze_multimodal] completed for d-id: {doc_id}")
  4222. except PipelineCancelledException:
  4223. # Must re-raise BEFORE the generic Exception handler below,
  4224. # otherwise the doc would be returned as if analyze succeeded
  4225. # and would advance to PROCESS instead of being marked FAILED.
  4226. raise
  4227. except MultimodalAnalysisError:
  4228. raise
  4229. except Exception as e:
  4230. logger.warning(f"[analyze_multimodal] failed for d-id: {doc_id}: {e}")
  4231. return parsed_data
  4232. def _build_mm_chunks_from_sidecars(
  4233. self,
  4234. doc_id: str,
  4235. file_path: str,
  4236. blocks_path: str,
  4237. base_order_index: int,
  4238. process_options: str | None = None,
  4239. ) -> list[dict[str, Any]]:
  4240. """Build multimodal chunks from sidecars carrying analysis results.
  4241. Only items whose ``llm_analyze_result.status == "success"`` produce
  4242. chunks. ``"skipped"`` items are silently ignored; ``"failure"``
  4243. items raise :class:`MultimodalAnalysisError` so the document is
  4244. marked failed (a failure should already have aborted the analyze
  4245. phase — this is a defensive recheck).
  4246. Each chunk follows the new schema: nested ``heading`` and
  4247. ``sidecar`` dicts, no flat ``parent_headings`` / ``level`` /
  4248. ``content_type`` fields. ``llm_cache_list`` is merged from the
  4249. underlying sidecar item so document deletion can clean up the
  4250. ``cache_type="analysis"`` entries it created.
  4251. ``process_options`` gates which modality sidecars are read: a
  4252. document re-processed after opting out of ``i`` / ``t`` / ``e``
  4253. must NOT pick up stale success results from a prior pass. When
  4254. ``None`` (e.g. ad-hoc unit tests), every modality is considered.
  4255. Raises:
  4256. MultimodalAnalysisError: when an item carries ``status="failure"``,
  4257. or when the multimodal chunk cannot be fit under the
  4258. extraction token budget even after truncating description
  4259. to :data:`DEFAULT_MM_CHUNK_DESCRIPTION_MIN_TOKENS`.
  4260. """
  4261. from lightrag.constants import (
  4262. DEFAULT_MAX_EXTRACT_INPUT_TOKENS,
  4263. DEFAULT_MM_CHUNK_DESCRIPTION_MIN_TOKENS,
  4264. )
  4265. from lightrag.parser.routing import parse_process_options
  4266. block_file = Path(blocks_path)
  4267. if not block_file.exists():
  4268. return []
  4269. base = str(block_file)
  4270. if base.endswith(".blocks.jsonl"):
  4271. base = base[: -len(".blocks.jsonl")]
  4272. if process_options is None:
  4273. allowed = {"drawing", "table", "equation"}
  4274. else:
  4275. opts = parse_process_options(process_options)
  4276. allowed = set()
  4277. if opts.images:
  4278. allowed.add("drawing")
  4279. if opts.tables:
  4280. allowed.add("table")
  4281. if opts.equations:
  4282. allowed.add("equation")
  4283. sidecar_defs = [
  4284. (root, Path(base + suffix), kind)
  4285. for root, suffix, kind in (
  4286. ("drawings", ".drawings.json", "drawing"),
  4287. ("tables", ".tables.json", "table"),
  4288. ("equations", ".equations.json", "equation"),
  4289. )
  4290. if kind in allowed
  4291. ]
  4292. mm_chunks: list[dict[str, Any]] = []
  4293. order = base_order_index
  4294. def _norm_str_list(v: Any) -> list[str]:
  4295. if v is None:
  4296. return []
  4297. if isinstance(v, list):
  4298. return [str(x).strip() for x in v if str(x).strip()]
  4299. s = str(v).strip()
  4300. return [s] if s else []
  4301. def _norm_parent_headings(value: Any) -> list[str]:
  4302. if not isinstance(value, list):
  4303. return []
  4304. return [str(p).strip() for p in value if str(p or "").strip()]
  4305. def _build_heading_dict(item: dict[str, Any]) -> dict[str, Any] | None:
  4306. heading_raw = item.get("heading")
  4307. if isinstance(heading_raw, dict):
  4308. heading_text = str(heading_raw.get("heading") or "").strip()
  4309. parents = _norm_parent_headings(heading_raw.get("parent_headings"))
  4310. try:
  4311. level = int(heading_raw.get("level") or 0)
  4312. except (TypeError, ValueError):
  4313. level = 0
  4314. else:
  4315. heading_text = str(heading_raw or "").strip()
  4316. parents = _norm_parent_headings(item.get("parent_headings"))
  4317. try:
  4318. level = int(item.get("level") or 0)
  4319. except (TypeError, ValueError):
  4320. level = 0
  4321. if not heading_text and not parents and level == 0:
  4322. return None
  4323. return {
  4324. "level": level,
  4325. "heading": heading_text,
  4326. "parent_headings": parents,
  4327. }
  4328. def _render(
  4329. kind: str,
  4330. name: str,
  4331. image_type: str,
  4332. description: str,
  4333. footnotes_joined: str,
  4334. equation_body: str,
  4335. ) -> str:
  4336. # NOTE: the `[Image Name]` / `[Table Name]` / `[Equation Name]`
  4337. # leading labels below are a contract consumed by
  4338. # ``lightrag.operate._parse_mm_display_name`` (regex
  4339. # ``_MM_DISPLAY_NAME_PATTERN``). If you rename or restructure
  4340. # these labels, update that regex too, or relation descriptions
  4341. # will silently fall back to sidecar ids. The
  4342. # ``test_parse_mm_display_name_on_real_builder_output``
  4343. # regression pins this contract end-to-end.
  4344. if kind == "drawing":
  4345. head = f"[Image Name]{name}\n[Image Type]{image_type}"
  4346. footnote_label = "Image Footnotes"
  4347. elif kind == "table":
  4348. head = f"[Table Name]{name}"
  4349. footnote_label = "Table Footnotes"
  4350. else: # equation
  4351. head = f"{equation_body}\n[Equation Name]{name}"
  4352. footnote_label = "Equation Footnotes"
  4353. sections = [head, description]
  4354. if footnotes_joined:
  4355. sections.append(f"[{footnote_label}]{footnotes_joined}")
  4356. return "\n\n".join(s for s in sections if s).strip()
  4357. max_tokens = DEFAULT_MAX_EXTRACT_INPUT_TOKENS
  4358. min_desc_tokens = DEFAULT_MM_CHUNK_DESCRIPTION_MIN_TOKENS
  4359. for root_key, sidecar_path, kind in sidecar_defs:
  4360. if not sidecar_path.exists():
  4361. continue
  4362. try:
  4363. payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
  4364. except Exception:
  4365. continue
  4366. items = payload.get(root_key, {})
  4367. if not isinstance(items, dict):
  4368. continue
  4369. for local_idx, (item_id, item) in enumerate(items.items()):
  4370. if not isinstance(item, dict):
  4371. continue
  4372. analysis = item.get("llm_analyze_result")
  4373. if not isinstance(analysis, dict):
  4374. continue
  4375. status = analysis.get("status")
  4376. if status == "skipped":
  4377. continue
  4378. if status == "failure":
  4379. raise MultimodalAnalysisError(
  4380. f"{root_key}/{item_id}: llm_analyze_result.status='failure' "
  4381. f"({analysis.get('message') or 'no message'})"
  4382. )
  4383. if status != "success":
  4384. # Treat unknown / legacy status as missing — no chunk.
  4385. continue
  4386. name = str(analysis.get("name") or "").strip()
  4387. description = str(analysis.get("description") or "").strip()
  4388. equation_body = str(analysis.get("equation") or "").strip()
  4389. image_type = str(analysis.get("type") or "").strip()
  4390. if not name:
  4391. raise MultimodalAnalysisError(
  4392. f"{root_key}/{item_id}: success result missing 'name'"
  4393. )
  4394. if not description:
  4395. raise MultimodalAnalysisError(
  4396. f"{root_key}/{item_id}: success result missing 'description'"
  4397. )
  4398. if kind == "drawing" and not image_type:
  4399. raise MultimodalAnalysisError(
  4400. f"drawings/{item_id}: success result missing 'type'"
  4401. )
  4402. if kind == "equation" and not equation_body:
  4403. raise MultimodalAnalysisError(
  4404. f"equations/{item_id}: success result missing 'equation'"
  4405. )
  4406. footnotes_list = _norm_str_list(item.get("footnotes"))
  4407. footnotes_joined = "; ".join(footnotes_list)
  4408. def _compose(desc: str) -> str:
  4409. return _render(
  4410. kind=kind,
  4411. name=name,
  4412. image_type=image_type,
  4413. description=desc,
  4414. footnotes_joined=footnotes_joined,
  4415. equation_body=equation_body,
  4416. )
  4417. chunk_content = _compose(description)
  4418. tokens = len(self.tokenizer.encode(chunk_content))
  4419. if tokens > max_tokens:
  4420. # Truncate only the description, never name/type/equation.
  4421. desc_tokens = self.tokenizer.encode(description)
  4422. overflow = tokens - max_tokens
  4423. keep = max(min_desc_tokens, len(desc_tokens) - overflow)
  4424. while True:
  4425. truncated_desc = self.tokenizer.decode(desc_tokens[:keep])
  4426. chunk_content = _compose(truncated_desc)
  4427. tokens = len(self.tokenizer.encode(chunk_content))
  4428. if tokens <= max_tokens or keep <= min_desc_tokens:
  4429. break
  4430. keep = max(min_desc_tokens, keep - (tokens - max_tokens))
  4431. if tokens > max_tokens:
  4432. raise MultimodalAnalysisError(
  4433. f"{root_key}/{item_id}: multimodal chunk exceeds "
  4434. f"{max_tokens} tokens even after truncating description "
  4435. f"to {min_desc_tokens} tokens"
  4436. )
  4437. if not chunk_content:
  4438. continue
  4439. heading_dict = _build_heading_dict(item)
  4440. sidecar_block = {
  4441. "type": kind,
  4442. "id": str(item_id),
  4443. "refs": [{"type": kind, "id": str(item_id)}],
  4444. }
  4445. cache_list = item.get("llm_cache_list")
  4446. cache_list = (
  4447. [str(c) for c in cache_list if str(c).strip()]
  4448. if isinstance(cache_list, list)
  4449. else []
  4450. )
  4451. chunk_dict: dict[str, Any] = {
  4452. "chunk_id": f"{doc_id}-mm-{kind}-{local_idx:03d}",
  4453. "chunk_order_index": order,
  4454. "content": chunk_content,
  4455. "tokens": tokens,
  4456. "sidecar": sidecar_block,
  4457. "llm_cache_list": cache_list,
  4458. }
  4459. if heading_dict is not None:
  4460. chunk_dict["heading"] = heading_dict
  4461. mm_chunks.append(chunk_dict)
  4462. order += 1
  4463. return mm_chunks