| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810 |
- """Document ingestion pipeline mixin for the LightRAG class.
- This module isolates the document parse/enqueue/extraction pipeline so that
- ``lightrag.py`` stays focused on storage management, querying, and editing.
- The mixin is wired into :class:`lightrag.LightRAG` via multiple inheritance
- and relies on attributes/methods that the main class provides
- (``self.full_docs``, ``self.doc_status``, ``self.tokenizer``,
- ``self.parse_native``-related fields, ``self._insert_done``,
- ``self._process_extract_entities``, etc.).
- """
- from __future__ import annotations
- import asyncio
- import base64
- import hashlib
- import inspect
- import json
- import json_repair
- import mimetypes
- import os
- import re
- import shutil
- import time
- import traceback
- from dataclasses import dataclass
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any
- from lightrag.base import DocProcessingStatus, DocStatus
- from lightrag.constants import (
- FULL_DOCS_FORMAT_LIGHTRAG,
- FULL_DOCS_FORMAT_PENDING_PARSE,
- FULL_DOCS_FORMAT_RAW,
- PARSED_DIR_NAME,
- PARSER_ENGINE_DOCLING,
- PARSER_ENGINE_MINERU,
- PARSER_ENGINE_NATIVE,
- )
- from lightrag.exceptions import MultimodalAnalysisError, PipelineCancelledException
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- from lightrag.operate import merge_nodes_and_edges
- from lightrag.parser.routing import (
- resolve_file_parser_directives,
- resolve_stored_document_parser_engine,
- )
- from lightrag.utils import (
- CacheData,
- _serialize_cache_variant,
- compute_args_hash,
- compute_mdhash_id,
- enforce_chunk_token_limit_before_embedding,
- generate_cache_key,
- generate_track_id,
- get_content_summary,
- get_env_value,
- get_llm_cache_identity,
- handle_cache,
- logger,
- sanitize_text_for_encoding,
- save_to_cache,
- serialize_llm_cache_identity,
- )
- from lightrag.utils_pipeline import (
- archive_docx_source_after_full_docs_sync,
- archive_source_after_full_docs_sync,
- build_chunks_dict_from_chunking_result,
- chunk_fields_from_status_doc,
- compute_text_content_hash,
- doc_status_field,
- doc_status_transition_metadata,
- get_duplicate_doc_by_content_hash,
- get_existing_doc_by_content_hash,
- get_existing_doc_by_file_basename,
- has_known_document_source,
- input_dir_path,
- load_lightrag_document_content,
- make_lightrag_doc_content,
- normalize_document_file_path,
- parsed_artifact_dir_for,
- resolve_doc_file_path,
- sidecar_blocks_path,
- sidecar_uri_for,
- strip_lightrag_doc_prefix,
- )
- # Document statuses the pipeline considers "in-flight or pending" — used by
- # both the initial snapshot and every refetch after a request_pending
- # continuation. Module-level so we don't reconstruct the list on every
- # pipeline entry.
- _INFLIGHT_DOC_STATUSES = (
- DocStatus.PROCESSING,
- DocStatus.FAILED,
- DocStatus.PENDING,
- DocStatus.PARSING,
- DocStatus.ANALYZING,
- )
- def _call_source_file_resolver(
- owner: Any,
- file_path: str,
- *,
- source_file_name: str | None = None,
- parser_engine: str | None = None,
- ) -> str:
- """Call parser source resolver while tolerating legacy test doubles."""
- resolver = owner._resolve_source_file_for_parser
- params = inspect.signature(resolver).parameters
- supports_context = "source_file_name" in params or any(
- param.kind == inspect.Parameter.VAR_KEYWORD for param in params.values()
- )
- if supports_context:
- return resolver(
- file_path,
- source_file_name=source_file_name,
- parser_engine=parser_engine,
- )
- return resolver(source_file_name or file_path)
- # Map ``process_options.chunking`` selector → ``extraction_meta.chunking_method``
- # string used by the pipeline observability layer and the resume path.
- _CHUNKING_METHOD_LABELS: dict[str, str] = {
- "F": "fixed_token",
- "R": "recursive_character",
- "V": "semantic_vector",
- "P": "paragraph_semantic",
- }
- _CHUNK_LOG_KEY_ALIASES: dict[str, str] = {
- "chunk_overlap_token_size": "overlap",
- "breakpoint_threshold_type": "break",
- "breakpoint_threshold_amount": "amount",
- "buffer_size": "buf",
- "split_by_character": "split_by",
- "split_by_character_only": "split_only",
- "separators": "seps",
- "sentence_split_regex": "regex",
- }
- def _format_chunking_params(
- chunk_size: int,
- params: dict[str, Any],
- ) -> str:
- """Format the ``size=..., key=value, ...`` portion shared by the chunking
- start log line and ``doc_status.metadata['chunk_opts']``.
- Drops keys with ``None``/empty values so the line stays scannable;
- callers pass the strategy-specific kwargs they're about to splat
- into the chunker so the output mirrors the actual call. Long keys are
- aliased to short forms via ``_CHUNK_LOG_KEY_ALIASES``.
- """
- pieces = [f"size={chunk_size}"]
- for key, value in params.items():
- if value is None:
- continue
- if isinstance(value, (list, dict, str)) and len(value) == 0:
- continue
- short = _CHUNK_LOG_KEY_ALIASES.get(key, key)
- pieces.append(
- f"{short}={value!r}" if isinstance(value, str) else f"{short}={value}"
- )
- return ", ".join(pieces)
- @dataclass
- class _BatchRunContext:
- """Per-batch shared state for the parse/analyze/process worker pipeline.
- Bundles the cross-cutting handles (pipeline_status, locks, queues,
- semaphore) so worker methods accept a single ``ctx`` argument instead of
- ~8 individually plumbed parameters. ``processed_count`` mutates inside
- each batch and is always read/written under ``pipeline_status_lock``.
- """
- pipeline_status: dict
- pipeline_status_lock: Any
- semaphore: asyncio.Semaphore
- total_files: int
- q_native: asyncio.Queue
- q_mineru: asyncio.Queue
- q_docling: asyncio.Queue
- q_analyze: asyncio.Queue
- q_process: asyncio.Queue
- processed_count: int = 0
- class _PipelineMixin:
- """Mixin providing document ingestion pipeline methods for LightRAG.
- Designed to be combined as a base of LightRAG only. Relies on
- LightRAG-provided attributes (``self.full_docs``, ``self.doc_status``,
- ``self.tokenizer``, ``self.parser_*``, ``self.workspace`` ...) and on the
- shared methods ``self._insert_done`` / ``self._process_extract_entities``
- which remain in the main class and are resolved through MRO.
- """
- # ============================================================
- # Public document ingestion API (entry points)
- # ============================================================
- async def apipeline_enqueue_documents(
- self,
- input: str | list[str],
- ids: list[str] | None = None,
- file_paths: str | list[str] | None = None,
- track_id: str | None = None,
- docs_format: str = FULL_DOCS_FORMAT_RAW,
- lightrag_document_paths: str | list[str] | None = None,
- parse_engine: str | list[str] | None = None,
- process_options: str | list[str] | None = None,
- chunk_options: dict | list[dict] | None = None,
- from_scan: bool = False,
- ) -> str:
- """
- Pipeline for Processing Documents
- 1. Validate ids if provided or generate MD5 hash IDs and remove duplicate contents (skip content dedup when format is lightrag)
- 2. Generate document initial status
- 3. Filter out already processed documents
- 4. Enqueue document in status
- Args:
- input: Single document string or list of document strings (can be empty when docs_format is lightrag)
- ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated (from content or file_path when lightrag)
- file_paths: list of file paths corresponding to each document, used for citation
- track_id: tracking ID for monitoring processing status
- docs_format: "raw" (default) or "lightrag"; when "lightrag" content may be empty and content-dedup is skipped
- lightrag_document_paths: paths to LightRAG Document (e.g. .blocks.jsonl dir or base path), when docs_format is lightrag
- parse_engine: file extraction engine already used or target engine for pending_parse
- process_options: per-document processing options string (i/t/e/!/F/R/V/P);
- accepted as a single string broadcast to every input or as a list
- aligned with ``input``. Stored verbatim on ``full_docs`` and
- mirrored to ``doc_status.metadata['process_options']``.
- chunk_options: per-document chunker parameter snapshot.
- Accepted as ``dict`` (broadcast to every input) or
- ``list[dict]`` (aligned with ``input``). When ``None``,
- each doc's snapshot is built via
- :func:`lightrag.parser.routing.resolve_chunk_options`
- from ``self.addon_params['chunker']``. Persisted to
- ``full_docs[doc_id]['chunk_options']`` and consumed by
- :meth:`process_single_document` to drive the file
- chunkers (F / R / V / P). Callers that need to bake
- F-strategy runtime args (``split_by_character`` /
- ``split_by_character_only``) into the snapshot — e.g.
- :meth:`LightRAG.ainsert` — should call
- :func:`resolve_chunk_options` themselves and pass the
- result here; this function is intentionally chunker-
- config agnostic. See
- ``docs/FileProcessingConfiguration-zh.md`` for the schema.
- from_scan: when True, the caller is the scan-owned background task
- that already holds ``pipeline_status["scanning"]``. Scan
- does additional doc_status reads during its classification
- phase (PROCESSED detection, FAILED-stub deletion, etc.)
- so external writers are blocked via
- ``scanning_exclusive``. Scan's own enqueues happen in
- its processing phase, after classification has cleared
- ``scanning_exclusive``, but ``from_scan=True`` is still
- forwarded as a defence-in-depth bypass so an unexpected
- scan-owned write inside the classification window is
- allowed through. External callers must leave this False.
- Returns:
- str: tracking ID for monitoring processing status
- Raises:
- RuntimeError: if a scan is in progress (and ``from_scan`` is
- False), or if a destructive job (clear / delete) is in
- flight. Concurrent indexing (``busy=True`` from the
- processing loop) is permitted — the running loop is
- notified via ``request_pending`` and picks up the
- newly-enqueued doc after its current batch finishes.
- """
- # Concurrency contract: enqueue may proceed concurrently with the
- # processing loop because (a) full_docs is upserted before
- # doc_status, so a consistency check never sees a ghost row, and
- # (b) the running loop re-queries doc_status by status after each
- # batch and sets ``request_pending`` whenever new work arrives
- # while busy. Two states still block enqueue:
- # * ``scanning_exclusive`` — scan task is in its CLASSIFICATION
- # phase, reading doc_status to classify files and possibly
- # deleting stale stubs. Concurrent enqueue would race
- # against scan's reads / mutations. ``from_scan=True``
- # lifts this guard for the scan task's own enqueues.
- # ``scanning`` alone (the processing phase) does NOT block,
- # identical to the upload-during-busy case.
- # * ``destructive_busy`` — clear / delete is dropping storages
- # or removing input files; a concurrent write would be
- # silently clobbered.
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=self.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=self.workspace
- )
- async with pipeline_status_lock:
- if not from_scan and pipeline_status.get("scanning_exclusive"):
- raise RuntimeError(
- "Cannot enqueue while scan is classifying files; "
- "wait for the classification phase to finish "
- "before retrying."
- )
- if pipeline_status.get("destructive_busy"):
- raise RuntimeError(
- "Cannot enqueue while pipeline is clearing or "
- "deleting documents; wait for the running job to "
- "finish before retrying."
- )
- # Generate track_id if not provided
- if track_id is None or track_id.strip() == "":
- track_id = generate_track_id("enqueue")
- if isinstance(input, str):
- input = [input]
- if isinstance(ids, str):
- ids = [ids]
- if isinstance(file_paths, str):
- file_paths = [file_paths]
- if isinstance(lightrag_document_paths, str):
- lightrag_document_paths = (
- [lightrag_document_paths] if lightrag_document_paths else None
- )
- if isinstance(parse_engine, str):
- parse_engine = [parse_engine] * len(input)
- if isinstance(process_options, str):
- process_options = [process_options] * len(input)
- if isinstance(chunk_options, dict):
- chunk_options = [chunk_options] * len(input)
- # If file_paths is provided, ensure it matches the number of documents
- if file_paths is not None:
- if isinstance(file_paths, str):
- file_paths = [file_paths]
- if len(file_paths) != len(input):
- raise ValueError(
- "Number of file paths must match the number of documents"
- )
- file_paths = [
- path.strip() if isinstance(path, str) else "" for path in file_paths
- ]
- file_paths = [path if path else "unknown_source" for path in file_paths]
- else:
- file_paths = ["unknown_source"] * len(input)
- is_lightrag_format = docs_format == FULL_DOCS_FORMAT_LIGHTRAG
- if is_lightrag_format and lightrag_document_paths is not None:
- if len(lightrag_document_paths) != len(input):
- raise ValueError(
- "Number of lightrag_document_paths must match the number of documents"
- )
- if parse_engine is not None and len(parse_engine) != len(input):
- raise ValueError(
- "Number of parse engines must match the number of documents"
- )
- if process_options is not None and len(process_options) != len(input):
- raise ValueError(
- "Number of process options must match the number of documents"
- )
- if chunk_options is not None and len(chunk_options) != len(input):
- raise ValueError(
- "Number of chunk_options dicts must match the number of documents"
- )
- def _parse_engine_at(index: int) -> str | None:
- if parse_engine is None:
- return None
- engine = str(parse_engine[index] or "").strip().lower()
- return engine or None
- def _process_options_at(index: int) -> str:
- if process_options is None:
- return ""
- from lightrag.parser.routing import sanitize_process_options
- return sanitize_process_options(process_options[index])
- def _chunk_options_at(index: int) -> dict[str, Any]:
- """Resolve the per-doc slim chunk_options snapshot.
- Projects the chunker config down to the one strategy
- sub-dict selected by the doc's ``process_options`` (F by
- default) — the persisted ``full_docs[doc_id]['chunk_options']``
- carries only the params actually consumed at process time.
- When the caller supplied ``chunk_options`` we slim it
- against the per-doc options (deep-copying internally so two
- docs broadcast from a single dict cannot share mutable
- sub-dicts); otherwise we build a fresh snapshot from
- ``self.addon_params['chunker']``.
- F-strategy runtime args (``split_by_character`` /
- ``split_by_character_only`` from :meth:`LightRAG.ainsert`)
- are baked into the snapshot upstream — ainsert calls
- :func:`lightrag.parser.routing.resolve_chunk_options` itself
- and passes the result via ``chunk_options=``. This function
- is purely a persistence helper; chunker-config construction
- is not its concern.
- """
- from lightrag.parser.routing import (
- resolve_chunk_options,
- slim_chunk_options,
- )
- doc_options = _process_options_at(index)
- if chunk_options is not None:
- return slim_chunk_options(chunk_options[index], doc_options)
- return resolve_chunk_options(self.addon_params, process_options=doc_options)
- # 1. Validate ids and build contents (when lightrag: no content dedup, content may be empty)
- if ids is not None:
- if len(ids) != len(input):
- raise ValueError("Number of IDs must match the number of documents")
- if len(ids) != len(set(ids)):
- raise ValueError("IDs must be unique")
- # Canonicalize every input filename once: the stored ``file_path``
- # is hint-stripped and serves UI display, filename dedup, and the
- # deterministic doc_id seed in one go.
- file_paths_canonical = [
- normalize_document_file_path(path) for path in file_paths
- ]
- contents: dict[str, dict[str, Any]] = {}
- source_to_doc_id: dict[str, str] = {}
- content_hash_to_doc_id: dict[str, str] = {}
- duplicate_attempts: list[dict[str, Any]] = []
- # Per-doc I/O failures from the lightrag-format branch. Populated when
- # ``load_lightrag_document_content`` cannot read the user-supplied
- # blocks.jsonl; flushed as FAILED stubs via
- # ``apipeline_enqueue_error_documents`` inside the critical section so
- # the UI surfaces the root cause instead of a silent empty document.
- lightrag_load_errors: list[dict[str, Any]] = []
- def _add_content(
- index: int,
- content: str,
- doc_format: str,
- *,
- sidecar_location: str | None = None,
- ) -> None:
- file_path_canonical = file_paths_canonical[index]
- # Body length excludes the {{LRdoc}} marker so duplicate-attempt
- # bookkeeping reports the same units as raw documents.
- # strip_lightrag_doc_prefix is a no-op for non-lightrag formats.
- body_length = len(strip_lightrag_doc_prefix(content, doc_format))
- # Compute content hash: skip for pending_parse (content extracted later).
- # RAW and LIGHTRAG both hash the bare merged text so the same body
- # carried by different envelopes (raw text vs sidecar) dedupes
- # against itself across formats.
- content_hash: str | None = None
- if doc_format in (FULL_DOCS_FORMAT_RAW, FULL_DOCS_FORMAT_LIGHTRAG):
- content_hash = compute_text_content_hash(
- strip_lightrag_doc_prefix(content or "", doc_format)
- )
- known_source = has_known_document_source(file_path_canonical)
- if ids is not None:
- doc_id = ids[index]
- elif known_source:
- doc_id = compute_mdhash_id(file_path_canonical, prefix="doc-")
- elif doc_format == FULL_DOCS_FORMAT_RAW:
- doc_id = compute_mdhash_id(content or "", prefix="doc-")
- elif content_hash:
- doc_id = compute_mdhash_id(content_hash, prefix="doc-")
- else:
- doc_id = compute_mdhash_id(
- f"{file_path_canonical}-{track_id}-{index}", prefix="doc-"
- )
- if known_source and file_path_canonical in source_to_doc_id:
- duplicate_attempts.append(
- {
- "doc_id": doc_id,
- "original_doc_id": source_to_doc_id[file_path_canonical],
- "file_path": file_path_canonical,
- "content_length": body_length,
- "existing_status": "batch_duplicate",
- "existing_track_id": "",
- "duplicate_kind": "filename",
- }
- )
- return
- if content_hash and content_hash in content_hash_to_doc_id:
- duplicate_attempts.append(
- {
- "doc_id": doc_id,
- "original_doc_id": content_hash_to_doc_id[content_hash],
- "file_path": file_path_canonical,
- "content_length": body_length,
- "existing_status": "batch_duplicate",
- "existing_track_id": "",
- "duplicate_kind": "content_hash",
- }
- )
- return
- if known_source:
- source_to_doc_id[file_path_canonical] = doc_id
- if content_hash:
- content_hash_to_doc_id[content_hash] = doc_id
- content_data: dict[str, Any] = {
- "content": content,
- "file_path": file_path_canonical,
- "parse_format": doc_format,
- }
- if content_hash:
- content_data["content_hash"] = content_hash
- if sidecar_location:
- content_data["sidecar_location"] = sidecar_location
- if engine := _parse_engine_at(index):
- content_data["parse_engine"] = engine
- if doc_format == FULL_DOCS_FORMAT_PENDING_PARSE:
- source_file_name = Path(str(file_paths[index] or "").strip()).name
- if has_known_document_source(source_file_name):
- content_data["source_file_name"] = source_file_name
- options_str = _process_options_at(index)
- if options_str:
- content_data["process_options"] = options_str
- # Always snapshot chunk_options at enqueue time — independent
- # of whether process_options selected a specific strategy —
- # so the per-doc parameters are frozen even when ``F``
- # (default) is used.
- content_data["chunk_options"] = _chunk_options_at(index)
- contents[doc_id] = content_data
- if is_lightrag_format:
- # LightRAG Document: no content hash dedup; content may be empty
- for i in range(len(file_paths)):
- path = file_paths[i]
- raw_path = (
- lightrag_document_paths[i] if lightrag_document_paths else ""
- ) or path
- # Resolve to an absolute path so the sidecar URI carries
- # full location info; relative paths are interpreted under
- # input_dir.
- p = Path(raw_path)
- if not p.is_absolute():
- p = input_dir_path() / p
- # The user may point at the ``*.blocks.jsonl`` file itself
- # or at its containing ``*.parsed/`` directory. Sidecars
- # are addressed by directory, so step up when given a file.
- sidecar_dir = (
- p.parent
- if p.suffix == ".jsonl" and p.name.endswith(".blocks.jsonl")
- else p
- )
- sidecar_location = sidecar_uri_for(sidecar_dir)
- # Per docs/FileProcessingConfiguration-zh.md, full_docs.content
- # for format=lightrag must be "{{LRdoc}}" + the merged body.
- # If the blocks file cannot be read (permission, truncation,
- # invalid JSON line), recording an empty body would let an
- # untrue "{{LRdoc}}" record land in full_docs and desync from
- # the on-disk blocks.jsonl. Instead, skip this doc and flush
- # a FAILED stub via apipeline_enqueue_error_documents after
- # the critical section so /documents surfaces the cause and
- # /documents/scan retries cleanly once the file is fixed.
- try:
- merged_text, _ = await load_lightrag_document_content(
- sidecar_location
- )
- except Exception as exc:
- error_msg = f"load_lightrag_document_content failed: {exc}"
- logger.warning(f"[apipeline_enqueue] {error_msg} ({raw_path})")
- file_size = 0
- blocks_path_str = sidecar_blocks_path(sidecar_location)
- if blocks_path_str:
- try:
- file_size = Path(blocks_path_str).stat().st_size
- except OSError:
- file_size = 0
- lightrag_load_errors.append(
- {
- "file_path": path,
- "error_description": (
- "Failed to load LightRAG Document blocks"
- ),
- "original_error": error_msg,
- "file_size": file_size,
- }
- )
- continue
- summary_content = make_lightrag_doc_content(merged_text)
- _add_content(
- i,
- summary_content,
- FULL_DOCS_FORMAT_LIGHTRAG,
- sidecar_location=sidecar_location,
- )
- elif ids is not None:
- for i, doc in enumerate(input):
- cleaned_content = sanitize_text_for_encoding(doc)
- _add_content(
- i,
- cleaned_content,
- FULL_DOCS_FORMAT_RAW,
- )
- elif docs_format == FULL_DOCS_FORMAT_PENDING_PARSE:
- for i, doc in enumerate(input):
- _add_content(
- i,
- doc or "",
- FULL_DOCS_FORMAT_PENDING_PARSE,
- )
- else:
- for i, doc in enumerate(input):
- cleaned_content = sanitize_text_for_encoding(doc)
- _add_content(i, cleaned_content, FULL_DOCS_FORMAT_RAW)
- # 2. Generate document initial status (without content)
- def _initial_doc_status(content_data: dict[str, Any]) -> dict[str, Any]:
- # For lightrag-format full_docs the persisted content carries the
- # ``{{LRdoc}}`` marker; strip it so summary/length match raw
- # semantics (the marker is full_docs internal bookkeeping and
- # must not leak into doc_status). strip_lightrag_doc_prefix
- # internally checks parse_format, so non-lightrag formats pass
- # through untouched.
- body_text = strip_lightrag_doc_prefix(
- content_data.get("content", ""),
- content_data.get("parse_format"),
- )
- base: dict[str, Any] = {
- "status": DocStatus.PENDING,
- "content_summary": get_content_summary(body_text),
- "content_length": len(body_text),
- "created_at": datetime.now(timezone.utc).isoformat(),
- "updated_at": datetime.now(timezone.utc).isoformat(),
- "file_path": content_data["file_path"],
- "track_id": track_id,
- }
- if content_data.get("content_hash"):
- base["content_hash"] = content_data["content_hash"]
- metadata: dict[str, Any] = {}
- options_str = content_data.get("process_options") or ""
- if options_str:
- # Mirror process_options into doc_status.metadata so admin UIs
- # can surface the per-document strategy without a full_docs lookup.
- metadata["process_options"] = options_str
- source_file_name = content_data.get("source_file_name")
- if source_file_name:
- metadata["source_file_name"] = source_file_name
- if metadata:
- base["metadata"] = metadata
- return base
- new_docs: dict[str, Any] = {
- id_: _initial_doc_status(content_data)
- for id_, content_data in contents.items()
- }
- # Serialise the dedup-read-then-upsert critical section across
- # concurrent enqueue calls within the same workspace. Without
- # this, two enqueues for the same content (e.g. /upload during
- # scan's processing phase, or two uploads via /text + /upload)
- # can both read doc_status before either upserts, both miss the
- # content_hash dedup, and both end up writing PENDING rows for
- # the same content — bypassing the dedup that's supposed to
- # land one of them as ``duplicate_kind=content_hash`` FAILED.
- #
- # The lock is workspace-scoped and only spans steps 3-4 below
- # (filter_keys → upserts). It does NOT block concurrent
- # processing (``apipeline_process_enqueue_documents`` reads
- # doc_status independently) or scan classification
- # (``scanning_exclusive`` already gates concurrent enqueue).
- # Lock order: enqueue_serialize → pipeline_status_lock (the
- # request_pending nudge inside is fine; no caller holds
- # pipeline_status_lock first then needs enqueue_serialize).
- enqueue_serialize_lock = get_namespace_lock(
- "enqueue_serialize", workspace=self.workspace
- )
- async with enqueue_serialize_lock:
- # 3. Filter out already processed documents
- # Get docs ids
- all_new_doc_ids = set(new_docs.keys())
- # Exclude IDs of documents that are already enqueued. The previous
- # ``reprocess_existing_non_processed`` flag has been removed: any
- # same-name record (regardless of status) is treated as a duplicate
- # here. Recovering half-processed documents is now the job of the
- # pipeline's resume logic, which runs in apipeline_process_enqueue_documents
- # rather than this enqueue path.
- unique_new_doc_ids = await self.doc_status.filter_keys(all_new_doc_ids)
- for doc_id in list(unique_new_doc_ids):
- content_data = contents[doc_id]
- # 3a. Filename-based dedup: same basename always treated as duplicate.
- match = await get_existing_doc_by_file_basename(
- self.doc_status, content_data["file_path"]
- )
- if match:
- existing_doc_id, existing_doc = match
- unique_new_doc_ids.discard(doc_id)
- duplicate_attempts.append(
- {
- "doc_id": doc_id,
- "original_doc_id": existing_doc_id,
- "file_path": content_data["file_path"],
- "content_length": new_docs.get(doc_id, {}).get(
- "content_length", 0
- ),
- "existing_status": doc_status_field(
- existing_doc, "status", "unknown"
- ),
- "existing_track_id": doc_status_field(
- existing_doc, "track_id", ""
- ),
- "duplicate_kind": "filename",
- }
- )
- continue
- # 3b. Content-hash dedup: different filename but same body still dupes.
- content_hash = content_data.get("content_hash")
- if not content_hash:
- continue
- hash_match = await get_existing_doc_by_content_hash(
- self.doc_status, content_hash
- )
- if hash_match:
- existing_doc_id, existing_doc = hash_match
- unique_new_doc_ids.discard(doc_id)
- duplicate_attempts.append(
- {
- "doc_id": doc_id,
- "original_doc_id": existing_doc_id,
- "file_path": content_data["file_path"],
- "content_length": new_docs.get(doc_id, {}).get(
- "content_length", 0
- ),
- "existing_status": doc_status_field(
- existing_doc, "status", "unknown"
- ),
- "existing_track_id": doc_status_field(
- existing_doc, "track_id", ""
- ),
- "duplicate_kind": "content_hash",
- }
- )
- # Handle duplicate documents - create trackable records with current track_id
- ignored_ids = list(all_new_doc_ids - unique_new_doc_ids)
- for doc_id in ignored_ids:
- if any(
- attempt.get("doc_id") == doc_id for attempt in duplicate_attempts
- ):
- continue
- existing_doc = await self.doc_status.get_by_id(doc_id)
- duplicate_attempts.append(
- {
- "doc_id": doc_id,
- "original_doc_id": doc_id,
- "file_path": new_docs.get(doc_id, {}).get(
- "file_path", "unknown_source"
- ),
- "content_length": new_docs.get(doc_id, {}).get(
- "content_length", 0
- ),
- "existing_status": (
- existing_doc.get("status", "unknown")
- if existing_doc
- else "unknown"
- ),
- "existing_track_id": (
- existing_doc.get("track_id", "") if existing_doc else ""
- ),
- "duplicate_kind": "filename",
- }
- )
- if duplicate_attempts:
- duplicate_docs: dict[str, Any] = {}
- for index, attempt in enumerate(duplicate_attempts):
- doc_id = attempt["doc_id"]
- file_path = attempt.get("file_path") or "unknown_source"
- duplicate_kind = attempt.get("duplicate_kind") or "filename"
- logger.warning(
- f"Duplicate document detected ({duplicate_kind}): "
- f"{doc_id} ({file_path})"
- )
- # Create a new record with unique ID for this duplicate attempt
- dup_record_id = compute_mdhash_id(
- f"{doc_id}-{track_id}-{index}-{file_path}", prefix="dup-"
- )
- if duplicate_kind == "content_hash":
- error_prefix = (
- "Identical content already exists under another filename."
- )
- else:
- error_prefix = "File name already exists."
- duplicate_docs[dup_record_id] = {
- "status": DocStatus.FAILED,
- "content_summary": (
- f"[DUPLICATE:{duplicate_kind}] Original document: "
- f"{attempt.get('original_doc_id', doc_id)}"
- ),
- "content_length": attempt.get("content_length", 0),
- "chunks_count": 0,
- "chunks_list": [],
- "created_at": datetime.now(timezone.utc).isoformat(),
- "updated_at": datetime.now(timezone.utc).isoformat(),
- "file_path": file_path,
- "track_id": track_id, # Use current track_id for tracking
- "error_msg": (
- f"{error_prefix} "
- f"Original doc_id: {attempt.get('original_doc_id', doc_id)}, "
- f"Status: {attempt.get('existing_status', 'unknown')}"
- ),
- "metadata": {
- "is_duplicate": True,
- "duplicate_kind": duplicate_kind,
- "original_doc_id": attempt.get("original_doc_id", doc_id),
- "original_track_id": attempt.get("existing_track_id", ""),
- },
- }
- # Store duplicate records in doc_status
- if duplicate_docs:
- await self.doc_status.upsert(duplicate_docs)
- logger.info(
- f"Created {len(duplicate_docs)} duplicate document records with track_id: {track_id}"
- )
- # Flush lightrag-format I/O failures as FAILED stubs. Done
- # inside the critical section so concurrent enqueues either see
- # the failure rows in full or not at all, and so a subsequent
- # /documents/scan finds the stub-without-full_docs combination
- # that document_routes treats as "delete and re-extract".
- if lightrag_load_errors:
- await self.apipeline_enqueue_error_documents(
- lightrag_load_errors, track_id=track_id
- )
- # Filter new_docs to only include documents with unique IDs
- new_docs = {
- doc_id: new_docs[doc_id]
- for doc_id in unique_new_doc_ids
- if doc_id in new_docs
- }
- if not new_docs:
- logger.warning("No new unique documents were found.")
- # If FAILED stubs were just flushed (lightrag-format I/O
- # errors), the caller needs the track_id to query their
- # status; a bare ``return None`` would also be interpreted
- # by document_routes upload paths as "all duplicate —
- # archive the source", silently hiding the failure.
- if lightrag_load_errors:
- return track_id
- return
- # 4. Store document content in full_docs and status in doc_status
- full_docs_data = {
- doc_id: {
- "content": contents[doc_id].get("content", ""),
- "file_path": contents[doc_id]["file_path"],
- "parse_format": contents[doc_id].get(
- "parse_format", FULL_DOCS_FORMAT_RAW
- ),
- }
- for doc_id in new_docs.keys()
- }
- for doc_id in new_docs.keys():
- if contents[doc_id].get("content_hash"):
- full_docs_data[doc_id]["content_hash"] = contents[doc_id][
- "content_hash"
- ]
- if contents[doc_id].get("sidecar_location"):
- full_docs_data[doc_id]["sidecar_location"] = contents[doc_id][
- "sidecar_location"
- ]
- if contents[doc_id].get("parse_engine"):
- full_docs_data[doc_id]["parse_engine"] = contents[doc_id][
- "parse_engine"
- ]
- if contents[doc_id].get("process_options"):
- full_docs_data[doc_id]["process_options"] = contents[doc_id][
- "process_options"
- ]
- # ``chunk_options`` is always populated by ``_add_content``
- # at enqueue time so it's persisted unconditionally.
- if contents[doc_id].get("chunk_options") is not None:
- full_docs_data[doc_id]["chunk_options"] = contents[doc_id][
- "chunk_options"
- ]
- await self.full_docs.upsert(full_docs_data)
- # Persist data to disk immediately
- await self.full_docs.index_done_callback()
- # Store document status (without content)
- await self.doc_status.upsert(new_docs)
- logger.debug(f"Stored {len(new_docs)} new unique documents")
- # Notify any in-flight processing loop that new work has arrived.
- # The loop checks ``request_pending`` after each batch and will
- # re-query doc_status to pick up these PENDING rows. Without
- # this nudge a caller that does not subsequently call
- # ``apipeline_process_enqueue_documents`` (or whose call races
- # with the loop's just-finished batch) could leave the new docs
- # stranded until the next unrelated trigger.
- async with pipeline_status_lock:
- if pipeline_status.get("busy"):
- pipeline_status["request_pending"] = True
- return track_id
- async def apipeline_enqueue_error_documents(
- self,
- error_files: list[dict[str, Any]],
- track_id: str | None = None,
- ) -> None:
- """
- Record file extraction errors in doc_status storage.
- This function creates error document entries in the doc_status storage for files
- that failed during the extraction process. Each error entry contains information
- about the failure to help with debugging and monitoring.
- Args:
- error_files: List of dictionaries containing error information for each failed file.
- Each dictionary should contain:
- - file_path: Original file name/path
- - error_description: Brief error description (for content_summary)
- - original_error: Full error message (for error_msg)
- - file_size: File size in bytes (for content_length, 0 if unknown)
- track_id: Optional tracking ID for grouping related operations
- Returns:
- None
- """
- if not error_files:
- logger.debug("No error files to record")
- return
- # Generate track_id if not provided
- if track_id is None or track_id.strip() == "":
- track_id = generate_track_id("error")
- error_docs: dict[str, Any] = {}
- current_time = datetime.now(timezone.utc).isoformat()
- for error_file in error_files:
- file_path = normalize_document_file_path(
- error_file.get("file_path", "unknown_file")
- )
- error_description = error_file.get(
- "error_description", "File extraction failed"
- )
- original_error = error_file.get("original_error", "Unknown error")
- file_size = error_file.get("file_size", 0)
- # Generate unique doc_id with "error-" prefix
- doc_id_content = f"{file_path}-{error_description}"
- doc_id = compute_mdhash_id(doc_id_content, prefix="error-")
- error_docs[doc_id] = {
- "status": DocStatus.FAILED,
- "content_summary": error_description,
- "content_length": file_size,
- "error_msg": original_error,
- "chunks_count": 0, # No chunks for failed files
- "chunks_list": [],
- "created_at": current_time,
- "updated_at": current_time,
- "file_path": file_path,
- "track_id": track_id,
- "metadata": {
- "error_type": "file_extraction_error",
- },
- }
- # Store error documents in doc_status
- if error_docs:
- await self.doc_status.upsert(error_docs)
- # Log each error for debugging
- for doc_id, error_doc in error_docs.items():
- logger.error(
- f"File processing error: - ID: {doc_id} {error_doc['file_path']}"
- )
- async def apipeline_process_enqueue_documents(self) -> None:
- """
- Process pending documents by splitting them into chunks, processing
- each chunk for entity and relation extraction, and updating the
- document status.
- 1. Get all pending, failed, and abnormally terminated processing documents.
- 2. Validate document data consistency and fix any issues
- 3. Split document content into chunks
- 4. Process each chunk for entity and relation extraction
- 5. Update the document status
- """
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=self.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=self.workspace
- )
- async with pipeline_status_lock:
- # Ensure only one worker is processing documents
- if not pipeline_status.get("busy", False):
- to_process_docs: dict[
- str, DocProcessingStatus
- ] = await self.doc_status.get_docs_by_statuses(
- list(_INFLIGHT_DOC_STATUSES)
- )
- if not to_process_docs:
- logger.info("No documents to process")
- return
- pipeline_status.update(
- {
- "busy": True,
- "job_name": "Default Job",
- "job_start": datetime.now(timezone.utc).isoformat(),
- "docs": 0,
- "batchs": 0, # Total number of files to be processed
- "cur_batch": 0, # Number of files already processed
- "request_pending": False, # Clear any previous request
- "cancellation_requested": False, # Initialize cancellation flag
- "latest_message": "",
- }
- )
- # Cleaning history_messages without breaking it as a shared list object
- del pipeline_status["history_messages"][:]
- else:
- # Another process is busy, just set request flag and return
- pipeline_status["request_pending"] = True
- logger.info(
- "Another process is already processing the document queue. Request queued."
- )
- return
- # Tracks whether the loop has already released ``busy`` under
- # the same critical section that observed request_pending=False.
- # This makes the exit handoff atomic: a concurrent enqueue can
- # either set request_pending BEFORE we release (in which case
- # the loop continues with a fresh snapshot) or AFTER (in which
- # case it sees busy=False and starts a new loop via its own
- # process_enqueue call). Without this, a small window between
- # "loop reads request_pending=False" and "finally clears busy"
- # could strand newly-enqueued PENDING docs.
- busy_released_in_loop = False
- try:
- # Process documents until no more documents or requests
- while True:
- # Check for cancellation request at the start of main loop
- async with pipeline_status_lock:
- if pipeline_status.get("cancellation_requested", False):
- pipeline_status["request_pending"] = False
- pipeline_status["cancellation_requested"] = False
- log_message = "Pipeline cancelled by user"
- logger.info(log_message)
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- # Exit directly, skipping request_pending check
- return
- if not to_process_docs:
- log_message = "All enqueued documents have been processed"
- logger.info(log_message)
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- if await self._atomic_release_busy_or_consume_pending(
- pipeline_status, pipeline_status_lock
- ):
- busy_released_in_loop = True
- break
- to_process_docs = await self.doc_status.get_docs_by_statuses(
- list(_INFLIGHT_DOC_STATUSES)
- )
- continue
- # Validate document data consistency and fix any issues
- to_process_docs = await self._validate_and_fix_document_consistency(
- to_process_docs, pipeline_status, pipeline_status_lock
- )
- if not to_process_docs:
- log_message = (
- "No valid documents to process after consistency check"
- )
- logger.info(log_message)
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- if await self._atomic_release_busy_or_consume_pending(
- pipeline_status, pipeline_status_lock
- ):
- busy_released_in_loop = True
- break
- to_process_docs = await self.doc_status.get_docs_by_statuses(
- list(_INFLIGHT_DOC_STATUSES)
- )
- continue
- log_message = f"Processing {len(to_process_docs)} document(s)"
- logger.info(log_message)
- pipeline_status["docs"] = len(to_process_docs)
- pipeline_status["batchs"] = len(to_process_docs)
- pipeline_status["cur_batch"] = 0
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- await self._run_pipeline_batch(
- to_process_docs,
- pipeline_status=pipeline_status,
- pipeline_status_lock=pipeline_status_lock,
- )
- # Atomic exit handoff: if request_pending was set during
- # this batch (e.g. a concurrent enqueue while busy=True),
- # clear it and refetch. Otherwise release ``busy`` under
- # the SAME lock so a concurrent enqueue cannot squeeze a
- # request_pending=True past us into a now-stranded state.
- if await self._atomic_release_busy_or_consume_pending(
- pipeline_status, pipeline_status_lock
- ):
- busy_released_in_loop = True
- break
- log_message = "Processing additional documents due to pending request"
- logger.info(log_message)
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- # Check for pending documents again
- to_process_docs = await self.doc_status.get_docs_by_statuses(
- list(_INFLIGHT_DOC_STATUSES)
- )
- finally:
- log_message = "Enqueued document processing pipeline stopped"
- logger.info(log_message)
- # If the loop already released ``busy`` under the atomic exit
- # check, don't clobber it here — a concurrent enqueue may have
- # observed busy=False and started a new processing pass that
- # has set busy=True for itself. Cancellation flag and log
- # bookkeeping are always safe to update.
- async with pipeline_status_lock:
- if not busy_released_in_loop:
- pipeline_status["busy"] = False
- pipeline_status["cancellation_requested"] = (
- False # Always reset cancellation flag
- )
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- # ============================================================
- # Pipeline orchestration
- # ============================================================
- async def _run_pipeline_batch(
- self,
- to_process_docs: dict[str, DocProcessingStatus],
- *,
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> None:
- """Run one batch of pending documents through the parse → analyze →
- process queues.
- Three cascading layers of queues:
- - Layer 1: Content Parsing (parse_native / parse_mineru / parse_docling)
- - Layer 2: Multimodal Analyze (analyze_multimodal)
- - Layer 3: Entity / Relation Extraction (process_single_document)
- """
- total_files = len(to_process_docs)
- pipeline_status["job_name"] = self._format_job_name(
- to_process_docs, total_files
- )
- ctx = _BatchRunContext(
- pipeline_status=pipeline_status,
- pipeline_status_lock=pipeline_status_lock,
- semaphore=asyncio.Semaphore(self.max_parallel_insert),
- total_files=total_files,
- q_native=asyncio.Queue(maxsize=self.queue_size_default),
- q_mineru=asyncio.Queue(maxsize=self.queue_size_default),
- q_docling=asyncio.Queue(maxsize=self.queue_size_default),
- q_analyze=asyncio.Queue(maxsize=self.queue_size_default),
- q_process=asyncio.Queue(maxsize=self.queue_size_insert),
- )
- workers: list[asyncio.Task] = []
- for _ in range(max(1, self.max_parallel_parse_native)):
- workers.append(
- asyncio.create_task(self._parse_worker("native", ctx.q_native, ctx))
- )
- for _ in range(max(1, self.max_parallel_parse_mineru)):
- workers.append(
- asyncio.create_task(self._parse_worker("mineru", ctx.q_mineru, ctx))
- )
- for _ in range(max(1, self.max_parallel_parse_docling)):
- workers.append(
- asyncio.create_task(self._parse_worker("docling", ctx.q_docling, ctx))
- )
- for _ in range(max(1, self.max_parallel_analyze)):
- workers.append(asyncio.create_task(self._analyze_worker(ctx)))
- for _ in range(max(1, self.max_parallel_insert)):
- workers.append(asyncio.create_task(self._process_worker(ctx)))
- # Add pending files to the correct parsing queue
- for doc_id, status_doc in to_process_docs.items():
- content_data = await self.full_docs.get_by_id(doc_id) or {}
- engine = resolve_stored_document_parser_engine(
- file_path=getattr(status_doc, "file_path", "unknown_source"),
- content_data=content_data,
- )
- if engine == "mineru":
- await ctx.q_mineru.put((doc_id, status_doc))
- elif engine == "docling":
- await ctx.q_docling.put((doc_id, status_doc))
- else:
- await ctx.q_native.put((doc_id, status_doc))
- await asyncio.gather(
- ctx.q_native.join(), ctx.q_mineru.join(), ctx.q_docling.join()
- )
- await ctx.q_analyze.join()
- await ctx.q_process.join()
- for w in workers:
- w.cancel()
- await asyncio.gather(*workers, return_exceptions=True)
- async def _validate_and_fix_document_consistency(
- self,
- to_process_docs: dict[str, DocProcessingStatus],
- pipeline_status: dict,
- pipeline_status_lock: asyncio.Lock,
- ) -> dict[str, DocProcessingStatus]:
- """Validate and fix document data consistency by deleting inconsistent entries, but preserve failed documents"""
- inconsistent_docs = []
- failed_docs_to_preserve = []
- successful_deletions = 0
- # Check each document's data consistency
- for doc_id, status_doc in to_process_docs.items():
- # Check if corresponding content exists in full_docs
- content_data = await self.full_docs.get_by_id(doc_id)
- if not content_data:
- # Check if this is a failed document that should be preserved
- if (
- hasattr(status_doc, "status")
- and status_doc.status == DocStatus.FAILED
- ):
- failed_docs_to_preserve.append(doc_id)
- else:
- inconsistent_docs.append(doc_id)
- # Log information about failed documents that will be preserved
- if failed_docs_to_preserve:
- async with pipeline_status_lock:
- preserve_message = f"Preserving {len(failed_docs_to_preserve)} failed document entries for manual review"
- logger.info(preserve_message)
- pipeline_status["latest_message"] = preserve_message
- pipeline_status["history_messages"].append(preserve_message)
- # Remove failed documents from processing list but keep them in doc_status
- for doc_id in failed_docs_to_preserve:
- to_process_docs.pop(doc_id, None)
- # Delete inconsistent document entries(excluding failed documents)
- if inconsistent_docs:
- async with pipeline_status_lock:
- summary_message = (
- f"Inconsistent document entries found: {len(inconsistent_docs)}"
- )
- logger.info(summary_message)
- pipeline_status["latest_message"] = summary_message
- pipeline_status["history_messages"].append(summary_message)
- successful_deletions = 0
- for doc_id in inconsistent_docs:
- try:
- status_doc = to_process_docs[doc_id]
- file_path = resolve_doc_file_path(status_doc=status_doc)
- # Delete doc_status entry
- await self.doc_status.delete([doc_id])
- successful_deletions += 1
- # Log successful deletion
- async with pipeline_status_lock:
- log_message = (
- f"Deleted inconsistent entry: {doc_id} ({file_path})"
- )
- logger.info(log_message)
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- # Remove from processing list
- to_process_docs.pop(doc_id, None)
- except Exception as e:
- # Log deletion failure
- async with pipeline_status_lock:
- error_message = f"Failed to delete entry: {doc_id} - {str(e)}"
- logger.error(error_message)
- pipeline_status["latest_message"] = error_message
- pipeline_status["history_messages"].append(error_message)
- # Final summary log
- # async with pipeline_status_lock:
- # final_message = f"Successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents"
- # logger.info(final_message)
- # pipeline_status["latest_message"] = final_message
- # pipeline_status["history_messages"].append(final_message)
- # Reset interrupted documents that pass consistency checks to PENDING status
- docs_to_reset = {}
- reset_count = 0
- for doc_id, status_doc in to_process_docs.items():
- # Check if document has corresponding content in full_docs (consistency check)
- content_data = await self.full_docs.get_by_id(doc_id)
- if content_data: # Document passes consistency check
- # Check if document is in interrupted status
- if hasattr(status_doc, "status") and status_doc.status in [
- DocStatus.PROCESSING,
- DocStatus.FAILED,
- DocStatus.PARSING,
- DocStatus.ANALYZING,
- ]:
- preserved_chunks_list, preserved_chunks_count = (
- chunk_fields_from_status_doc(status_doc)
- )
- resolved_file_path = resolve_doc_file_path(
- status_doc=status_doc,
- content_data=content_data,
- )
- # Prepare document for status reset to PENDING
- docs_to_reset[doc_id] = {
- "status": DocStatus.PENDING,
- "content_summary": status_doc.content_summary,
- "content_length": status_doc.content_length,
- "chunks_count": preserved_chunks_count,
- "chunks_list": preserved_chunks_list,
- "created_at": status_doc.created_at,
- "updated_at": datetime.now(timezone.utc).isoformat(),
- "file_path": resolved_file_path,
- "track_id": getattr(status_doc, "track_id", ""),
- "content_hash": getattr(status_doc, "content_hash", None),
- # Clear transient error / processing fields but preserve
- # long-lived per-doc metadata (process_options) seeded
- # at enqueue time.
- "error_msg": "",
- "metadata": doc_status_transition_metadata(status_doc),
- }
- # Update the status in to_process_docs as well
- status_doc.status = DocStatus.PENDING
- status_doc.file_path = resolved_file_path
- reset_count += 1
- # Update doc_status storage if there are documents to reset
- if docs_to_reset:
- await self.doc_status.upsert(docs_to_reset)
- async with pipeline_status_lock:
- reset_message = (
- f"Reset {reset_count} documents from "
- "PARSING/ANALYZING/PROCESSING/FAILED to PENDING status"
- )
- logger.info(reset_message)
- pipeline_status["latest_message"] = reset_message
- pipeline_status["history_messages"].append(reset_message)
- return to_process_docs
- async def _atomic_release_busy_or_consume_pending(
- self,
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> bool:
- """Atomically decide whether to release ``busy`` or consume a
- pending request.
- Closes the loop-exit handoff race: a concurrent enqueue that
- sets ``request_pending`` while the processing loop is on its
- way out will be observed in the same critical section that
- releases ``busy``, so the loop sees it and refetches instead
- of stranding the new doc in PENDING.
- Returns:
- True when ``busy`` has been cleared under the same lock
- that observed ``request_pending=False`` — caller must
- break out of the loop and skip clearing ``busy`` in its
- finally block.
- False when ``request_pending`` was set: the flag is
- cleared and the caller must refetch ``doc_status`` and
- continue the loop.
- """
- async with pipeline_status_lock:
- if pipeline_status.get("request_pending", False):
- pipeline_status["request_pending"] = False
- return False
- pipeline_status["busy"] = False
- return True
- @staticmethod
- def _format_job_name(
- to_process_docs: dict[str, DocProcessingStatus],
- total_files: int,
- ) -> str:
- """Build the ``job_name`` shown in pipeline_status for one batch."""
- first_doc = next(iter(to_process_docs.values()))
- first_doc_path = first_doc.file_path
- if first_doc_path:
- path_prefix = first_doc_path[:20] + (
- "..." if len(first_doc_path) > 20 else ""
- )
- else:
- path_prefix = "unknown_source"
- return f"{path_prefix}[{total_files} files]"
- # ============================================================
- # Cascading queue workers (Layer 1 -> 2 -> 3)
- # ============================================================
- async def _parse_worker(
- self,
- engine: str,
- in_q: asyncio.Queue,
- ctx: _BatchRunContext,
- ) -> None:
- """Layer 1 worker: consume (doc_id, status_doc) and emit parsed data.
- Marks PARSING, runs the engine-specific parser (mineru / docling /
- native), refreshes ``content_hash`` if the parser patched it, and
- either short-circuits via ``_mark_duplicate_after_parse`` or hands
- off to ``q_analyze``. Writes FAILED on exception.
- """
- while True:
- item = await in_q.get()
- try:
- doc_id_w, status_doc_w = item
- file_path_w = getattr(status_doc_w, "file_path", "unknown_source")
- # Boundary cancellation check: skip parsing the next queued doc
- # without invoking the engine, mark it FAILED with a friendly
- # "User cancelled" message, and let the finally task_done()
- # drain the queue so q.join() in _run_pipeline_batch returns.
- if await self._cancellation_requested(
- ctx.pipeline_status, ctx.pipeline_status_lock
- ):
- await self._mark_doc_cancelled_in_stage(
- doc_id=doc_id_w,
- status_doc=status_doc_w,
- file_path=file_path_w,
- stage_label="parse",
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- continue
- content_data_w = await self.full_docs.get_by_id(doc_id_w)
- if not content_data_w:
- raise Exception(
- f"Document content not found in full_docs for doc_id: {doc_id_w}"
- )
- if isinstance(status_doc_w.metadata, dict):
- source_file_name_w = status_doc_w.metadata.get("source_file_name")
- if source_file_name_w and not content_data_w.get(
- "source_file_name"
- ):
- content_data_w["source_file_name"] = source_file_name_w
- # Stamp parsing_start_time on the in-memory status_doc so
- # carry-over (_DOC_STATUS_METADATA_CARRY_OVER_KEYS) writes it
- # into doc_status here and preserves it across every
- # subsequent state transition for stage-duration analysis.
- if not isinstance(status_doc_w.metadata, dict):
- status_doc_w.metadata = {}
- # Drop stale per-attempt fields from any prior failed/retried
- # attempt before stamping the new parsing_start_time. All of
- # these are written by either this worker (cache-hit /
- # cache-miss branches below) or the downstream analyze worker,
- # and would otherwise be carried forward via carry-over,
- # skewing stage-duration metrics and the raw-cache-hit /
- # skipped signals for the new attempt. The cache-hit mirror
- # block only writes ``parse_stage_skipped`` when the parser
- # actually returns a hit; the cache-miss branch only writes
- # ``parsing_end_time`` when parse actually runs; the analyze
- # worker writes its pair on re-entry.
- for _stale_key in (
- "parsing_end_time",
- "parse_stage_skipped",
- "analyzing_start_time",
- "analyzing_end_time",
- "analyzing_stage_skipped",
- ):
- status_doc_w.metadata.pop(_stale_key, None)
- status_doc_w.metadata["parsing_start_time"] = int(time.time())
- await self._upsert_doc_status_transition(
- doc_id=doc_id_w,
- status=DocStatus.PARSING,
- status_doc=status_doc_w,
- file_path=file_path_w,
- )
- async with ctx.pipeline_status_lock:
- log_message = f"Parsing ({engine}): {doc_id_w}"
- logger.info(log_message)
- ctx.pipeline_status["latest_message"] = log_message
- ctx.pipeline_status["history_messages"].append(log_message)
- if engine == "mineru":
- parsed_data_w = await self.parse_mineru(
- doc_id_w, file_path_w, content_data_w
- )
- elif engine == "docling":
- parsed_data_w = await self.parse_docling(
- doc_id_w, file_path_w, content_data_w
- )
- else:
- parsed_data_w = await self.parse_native(
- doc_id_w, file_path_w, content_data_w
- )
- # Mirror non-fatal parser warnings (e.g. legacy docx tables
- # missing w14:paraId) onto the in-memory status_doc so the
- # ANALYZING / PROCESSING / PROCESSED / FAILED upserts carry
- # the field through ``doc_status_transition_metadata``.
- parse_warnings_payload_w = parsed_data_w.get("parse_warnings")
- if parse_warnings_payload_w:
- if not isinstance(status_doc_w.metadata, dict):
- status_doc_w.metadata = {}
- status_doc_w.metadata["parse_warnings"] = parse_warnings_payload_w
- # Mirror raw-bundle cache-hit flag from mineru/docling so the
- # next upsert (ANALYZING) carries it into doc_status; cache-
- # miss runs (including parse_native, which has no cache
- # concept) stamp ``parsing_end_time`` instead so post-mortem
- # can derive the parse-stage duration. The two fields are
- # mutually exclusive per attempt.
- if not isinstance(status_doc_w.metadata, dict):
- status_doc_w.metadata = {}
- if parsed_data_w.get("parse_stage_skipped"):
- status_doc_w.metadata["parse_stage_skipped"] = True
- else:
- status_doc_w.metadata["parsing_end_time"] = int(time.time())
- # parse_* may have patched content_hash for
- # pending_parse → raw transitions.
- refreshed = await self.doc_status.get_by_id(doc_id_w)
- if refreshed:
- refreshed_hash = (
- refreshed.get("content_hash")
- if isinstance(refreshed, dict)
- else getattr(refreshed, "content_hash", None)
- )
- if refreshed_hash:
- status_doc_w.content_hash = refreshed_hash
- if await self._mark_duplicate_after_parse(
- doc_id=doc_id_w,
- status_doc=status_doc_w,
- file_path=file_path_w,
- content_hash=status_doc_w.content_hash,
- content_length=len(parsed_data_w.get("content", "")),
- content_data=content_data_w,
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- ):
- continue
- await ctx.q_analyze.put((doc_id_w, status_doc_w, parsed_data_w))
- except PipelineCancelledException:
- # Cancellation raised from inside the parse engine (future-
- # proofing — engines don't currently call _raise_if_cancelled,
- # but if they do, route through the same friendly message
- # path as the boundary check above instead of the generic
- # except block below.
- await self._mark_doc_cancelled_in_stage(
- doc_id=doc_id_w,
- status_doc=status_doc_w,
- file_path=getattr(status_doc_w, "file_path", "unknown_source"),
- stage_label="parse",
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- except Exception as e:
- logger.error(f"Parse worker failed ({engine}): {e}")
- try:
- await self._upsert_doc_status_transition(
- doc_id=doc_id_w,
- status=DocStatus.FAILED,
- status_doc=status_doc_w,
- file_path=getattr(status_doc_w, "file_path", "unknown_source"),
- extra_fields={"error_msg": str(e)},
- )
- except Exception:
- pass
- finally:
- in_q.task_done()
- async def _analyze_worker(self, ctx: _BatchRunContext) -> None:
- """Layer 2 worker: run multimodal analysis (VLM) and feed q_process.
- Refreshes ``content_summary`` / ``content_length`` from the parsed
- body (pending_parse → lightrag / raw documents start with empty
- summary / zero length at enqueue) so PROCESSING / PROCESSED upserts
- end up with real values.
- """
- while True:
- item = await ctx.q_analyze.get()
- try:
- doc_id_w, status_doc_w, parsed_data_w = item
- file_path_w = getattr(status_doc_w, "file_path", "unknown_source")
- # Boundary cancellation check: same pattern as _parse_worker.
- # Items already past PARSING that are still queued for analyze
- # are short-circuited to FAILED here so the multimodal VLM
- # path is not entered after the user clicked cancel.
- if await self._cancellation_requested(
- ctx.pipeline_status, ctx.pipeline_status_lock
- ):
- await self._mark_doc_cancelled_in_stage(
- doc_id=doc_id_w,
- status_doc=status_doc_w,
- file_path=file_path_w,
- stage_label="analyze",
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- continue
- refreshed_content_w = parsed_data_w.get("content", "") or ""
- refreshed_summary_w = get_content_summary(refreshed_content_w)
- refreshed_length_w = len(refreshed_content_w)
- status_doc_w.content_summary = refreshed_summary_w
- status_doc_w.content_length = refreshed_length_w
- # Stamp analyzing_start_time so per-stage durations stay
- # derivable from doc_status even after PROCESSED / FAILED;
- # carry-over preserves it across later upserts.
- if not isinstance(status_doc_w.metadata, dict):
- status_doc_w.metadata = {}
- status_doc_w.metadata["analyzing_start_time"] = int(time.time())
- await self._upsert_doc_status_transition(
- doc_id=doc_id_w,
- status=DocStatus.ANALYZING,
- status_doc=status_doc_w,
- file_path=file_path_w,
- )
- analyzed = await self.analyze_multimodal(
- doc_id=doc_id_w,
- file_path=file_path_w,
- parsed_data=parsed_data_w,
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- # Mirror analyze-stage outcome as a 3-way decision so the
- # ``analyzing_end_time`` stamp only ever lands on attempts
- # that genuinely completed:
- # - ``analyzing_stage_skipped`` (set by analyze_multimodal at
- # its three early-return branches: no blocks_path, blocks
- # file missing, no i/t/e options) → user/config skipped;
- # stamp the skipped flag.
- # - ``multimodal_processed`` (set by analyze_multimodal only
- # after the full processing loop succeeds) → genuine
- # completion; stamp ``analyzing_end_time``.
- # - Neither flag → analyze_multimodal soft-swallowed an
- # exception (generic ``except Exception``) or hit a
- # malformed/empty sidecar early return. Failure is not a
- # skip AND not a completion, so write neither field.
- # The skipped/end_time pair is mutually exclusive.
- if not isinstance(status_doc_w.metadata, dict):
- status_doc_w.metadata = {}
- if analyzed.pop("analyzing_stage_skipped", False):
- status_doc_w.metadata["analyzing_stage_skipped"] = True
- elif analyzed.get("multimodal_processed"):
- status_doc_w.metadata["analyzing_end_time"] = int(time.time())
- await ctx.q_process.put((doc_id_w, status_doc_w, analyzed))
- except PipelineCancelledException:
- # In-flight cancellation surfaced from analyze_multimodal
- # (poll loop detected cancellation_requested mid-VLM).
- # Route through the friendly message path so error_msg and
- # history_messages match the boundary-check branch.
- await self._mark_doc_cancelled_in_stage(
- doc_id=doc_id_w,
- status_doc=status_doc_w,
- file_path=getattr(status_doc_w, "file_path", "unknown_source"),
- stage_label="analyze",
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- except Exception as e:
- # Mirror _parse_worker: failures here must transition the
- # document to FAILED with a diagnostic ``error_msg``, otherwise
- # MultimodalAnalysisError (raised by analyze_multimodal under
- # the new hard-failure contract) would leave the doc stuck in
- # ANALYZING forever.
- logger.error(f"Analyze worker failed: {e}")
- try:
- await self._upsert_doc_status_transition(
- doc_id=doc_id_w,
- status=DocStatus.FAILED,
- status_doc=status_doc_w,
- file_path=getattr(status_doc_w, "file_path", "unknown_source"),
- extra_fields={"error_msg": str(e)},
- )
- except Exception:
- pass
- finally:
- ctx.q_analyze.task_done()
- async def _process_worker(self, ctx: _BatchRunContext) -> None:
- """Layer 3 worker: dispatch each ready document to single-doc processing."""
- while True:
- item = await ctx.q_process.get()
- try:
- doc_id_w, status_doc_w, parsed_data_w = item
- await self.process_single_document(
- doc_id=doc_id_w,
- status_doc=status_doc_w,
- parsed_data=parsed_data_w,
- ctx=ctx,
- )
- finally:
- ctx.q_process.task_done()
- # ============================================================
- # Single-document state machine
- # ============================================================
- async def process_single_document(
- self,
- *,
- doc_id: str,
- status_doc: DocProcessingStatus,
- parsed_data: dict[str, Any],
- ctx: _BatchRunContext,
- ) -> None:
- """Single-document state machine: chunking → KG extraction → merge.
- Always invoked from ``_process_worker`` with ``parsed_data`` already
- populated by ``_parse_worker`` + ``_analyze_worker``. Drives the
- PROCESSING → PROCESSED state machine, with FAILED fallbacks at both
- the extract and merge stage boundaries.
- """
- from lightrag.parser.routing import parse_process_options
- file_path = resolve_doc_file_path(status_doc=status_doc)
- current_file_number = 0
- file_extraction_stage_ok = False
- processing_start_time = int(time.time())
- first_stage_tasks: list[asyncio.Task] = []
- entity_relation_task: asyncio.Task | None = None
- chunks: dict[str, Any] = {}
- content_data: dict[str, Any] | None = None
- extraction_meta: dict[str, Any] = {}
- chunk_results: list = []
- doc_process_opts = parse_process_options("")
- def get_failed_chunk_snapshot() -> tuple[list[str], int]:
- if chunks:
- chunk_ids = list(chunks.keys())
- return chunk_ids, len(chunk_ids)
- return chunk_fields_from_status_doc(status_doc)
- async with ctx.semaphore:
- try:
- # Resolve file_path from full_docs before honoring a queued
- # cancellation so corrupted doc_status placeholders do not
- # get written back again during retry/cancel flows.
- content_data = await self.full_docs.get_by_id(doc_id)
- if content_data:
- file_path = resolve_doc_file_path(
- status_doc=status_doc,
- content_data=content_data,
- )
- status_doc.file_path = file_path
- # Check for cancellation before starting document processing.
- # file_path is resolved before this check so queued documents
- # do not lose their source path on early cancellation.
- await self._raise_if_cancelled(
- ctx.pipeline_status, ctx.pipeline_status_lock
- )
- async with ctx.pipeline_status_lock:
- ctx.processed_count += 1
- current_file_number = ctx.processed_count
- ctx.pipeline_status["cur_batch"] = ctx.processed_count
- log_message = (
- f"Extracting stage {current_file_number}/"
- f"{ctx.total_files}: {file_path}"
- )
- logger.info(log_message)
- ctx.pipeline_status["history_messages"].append(log_message)
- log_message = f"Processing d-id: {doc_id}"
- logger.info(log_message)
- ctx.pipeline_status["latest_message"] = log_message
- ctx.pipeline_status["history_messages"].append(log_message)
- # Prevent memory growth: keep only latest 5000 messages
- # when exceeding 10000. Trim in place so Manager.list-
- # backed shared state remains appendable and visible
- # across processes.
- if len(ctx.pipeline_status["history_messages"]) > 10000:
- logger.info(
- f"Trimming pipeline history from {len(ctx.pipeline_status['history_messages'])} to 5000 messages"
- )
- del ctx.pipeline_status["history_messages"][:-5000]
- content = parsed_data.get("content", "")
- # Decode per-document processing options once; later stages
- # (multimodal hook / KG extraction) re-read them from
- # full_docs as well.
- doc_process_opts = parse_process_options(
- (content_data or {}).get("process_options", "")
- )
- # Resume guard: if content was already extracted under
- # earlier process_options, purge stale chunks + KG before
- # rebuilding.
- await self._purge_stale_extraction_if_resuming(
- doc_id=doc_id,
- status_doc=status_doc,
- file_path=file_path,
- content_data=content_data,
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- # Chunker dispatch is driven by whether ``process_options``
- # explicitly named a chunking strategy:
- # - Explicit selector (F/R/V/P present in the raw
- # options string): dispatch to a chunker that
- # follows the standardized file-chunker contract
- # ``(tokenizer, content, chunk_token_size, *,
- # <strategy kwargs>)``, with kwargs supplied from
- # the per-doc ``chunk_options`` snapshot persisted
- # at enqueue time.
- # - No selector supplied: honor the
- # externally-customizable ``self.chunking_func``
- # with its legacy 6-arg signature so existing
- # callers (typically :meth:`ainsert` for raw text)
- # keep working unchanged. Legacy callers still
- # read parameters from ``chunk_options`` first
- # (per-doc snapshot), with ctx values as fallback
- # for already-enqueued docs predating chunk_options.
- chunk_opts = (content_data or {}).get("chunk_options")
- if not isinstance(chunk_opts, dict) or not chunk_opts:
- # Backwards compatibility: rows enqueued before the
- # chunk_options snapshot was added fall back to a
- # fresh build from current addon_params['chunker'],
- # scoped to the per-doc strategy decoded above so
- # the slim shape stays consistent with newly
- # enqueued rows. F-strategy split args fall back
- # to whatever lives in
- # ``addon_params['chunker']['fixed_token']``;
- # runtime overrides are an ainsert-time concern and
- # don't apply at process time for legacy rows.
- from lightrag.parser.routing import resolve_chunk_options
- chunk_opts = resolve_chunk_options(
- self.addon_params, process_options=doc_process_opts
- )
- resolved_chunk_size = int(
- chunk_opts.get("chunk_token_size") or self.chunk_token_size
- )
- # Captured per-strategy below; persisted to
- # ``doc_status.metadata['chunk_opts']`` via ``extraction_meta``
- # so admin/list APIs can see the actual chunker params used.
- chunk_opts_str: str = ""
- if doc_process_opts.chunking_explicit:
- from lightrag.chunker import (
- chunking_by_fixed_token,
- chunking_by_paragraph_semantic,
- chunking_by_recursive_character,
- chunking_by_semantic_vector,
- )
- strategy = doc_process_opts.chunking
- if strategy == "P":
- # P carries its own ``chunk_token_size`` (CHUNK_P_SIZE
- # env or ``addon_params['chunker']['paragraph_semantic']``);
- # pop it out of the kwargs so we don't pass it
- # both positionally and via ``**`` splat (which
- # would TypeError). Unlike R/V, ``default_chunker_config``
- # always populates this slot — falling back to
- # ``resolved_chunk_size`` (global CHUNK_SIZE) here is
- # only a safety net for snapshots predating that
- # change; new docs always carry ``DEFAULT_CHUNK_P_SIZE``.
- p_opts = dict(chunk_opts.get("paragraph_semantic") or {})
- p_chunk_size = int(
- p_opts.pop("chunk_token_size", resolved_chunk_size)
- )
- p_blocks_path = (
- str(parsed_data.get("blocks_path") or "").strip() or None
- )
- chunk_opts_str = _format_chunking_params(p_chunk_size, p_opts)
- logger.info(f"Chunking P: {chunk_opts_str}, doc_id: {doc_id}")
- chunking_result = chunking_by_paragraph_semantic(
- self.tokenizer,
- content,
- p_chunk_size,
- blocks_path=p_blocks_path,
- **p_opts,
- )
- elif strategy == "R":
- # R carries its own optional ``chunk_token_size``
- # override (CHUNK_R_SIZE env or
- # ``addon_params['chunker']['recursive_character']``);
- # pop it out of the kwargs so we don't pass it
- # both positionally and via ``**`` splat (which
- # would TypeError). Fall back to the shared
- # top-level resolved size when unset.
- r_opts = dict(chunk_opts.get("recursive_character") or {})
- r_chunk_size = int(
- r_opts.pop("chunk_token_size", resolved_chunk_size)
- )
- chunk_opts_str = _format_chunking_params(r_chunk_size, r_opts)
- logger.info(f"Chunking R: {chunk_opts_str}, doc_id: {doc_id}")
- chunking_result = chunking_by_recursive_character(
- self.tokenizer,
- content,
- r_chunk_size,
- **r_opts,
- )
- elif strategy == "V":
- # V carries its own optional ``chunk_token_size``
- # advisory ceiling override (CHUNK_V_SIZE env or
- # ``addon_params['chunker']['semantic_vector']``);
- # same pop-then-splat pattern as P/R.
- v_opts = dict(chunk_opts.get("semantic_vector") or {})
- v_chunk_size = int(
- v_opts.pop("chunk_token_size", resolved_chunk_size)
- )
- chunk_opts_str = _format_chunking_params(v_chunk_size, v_opts)
- logger.info(f"Chunking V: {chunk_opts_str}, doc_id: {doc_id}")
- chunking_result = await chunking_by_semantic_vector(
- self.tokenizer,
- content,
- v_chunk_size,
- embedding_func=self.embedding_func,
- **v_opts,
- )
- else: # "F"
- # F honors its own ``chunk_token_size`` override
- # (``addon_params['chunker']['fixed_token']`` or a
- # caller-supplied ``chunk_options``) exactly like
- # R/V/P: pop it out of the kwargs so we don't pass it
- # both positionally and via ``**`` splat (which would
- # TypeError), falling back to the shared top-level
- # resolved size when unset.
- f_opts = dict(chunk_opts.get("fixed_token") or {})
- f_chunk_size = int(
- f_opts.pop("chunk_token_size", resolved_chunk_size)
- )
- chunk_opts_str = _format_chunking_params(f_chunk_size, f_opts)
- logger.info(f"Chunking F: {chunk_opts_str}, doc_id: {doc_id}")
- chunking_result = chunking_by_fixed_token(
- self.tokenizer,
- content,
- f_chunk_size,
- **f_opts,
- )
- else:
- f_opts = chunk_opts.get("fixed_token") or {}
- # Honor the F-strategy ``chunk_token_size`` override (from
- # ``CHUNK_F_SIZE`` env or an explicit
- # ``addon_params['chunker']['fixed_token']`` / per-doc
- # ``chunk_options``) on this legacy path too, falling back
- # to the shared top-level resolved size when unset. This
- # keeps ``LightRAG.ainsert`` — which intentionally does NOT
- # pass a ``process_options`` selector (so the user's
- # ``chunking_func`` still runs) — consistent with the
- # explicit-F branch instead of silently ignoring
- # ``fixed_token.chunk_token_size``. ``f_opts`` is read
- # field-by-field here (not splatted), so there is no
- # positional/kwarg collision.
- legacy_chunk_size = int(
- f_opts.get("chunk_token_size", resolved_chunk_size)
- )
- chunk_opts_str = _format_chunking_params(
- legacy_chunk_size,
- {
- "split_by_character": f_opts.get("split_by_character"),
- "split_by_character_only": f_opts.get(
- "split_by_character_only", False
- ),
- "overlap": f_opts.get(
- "chunk_overlap_token_size",
- self.chunk_overlap_token_size,
- ),
- },
- )
- logger.info(
- f"Chunking F(legacy): {chunk_opts_str}, doc_id: {doc_id}"
- )
- chunking_result = self.chunking_func(
- self.tokenizer,
- content,
- f_opts.get("split_by_character"),
- f_opts.get("split_by_character_only", False),
- f_opts.get(
- "chunk_overlap_token_size",
- self.chunk_overlap_token_size,
- ),
- legacy_chunk_size,
- )
- if inspect.isawaitable(chunking_result):
- chunking_result = await chunking_result
- if not isinstance(chunking_result, (list, tuple)):
- raise TypeError(
- f"chunking_func must return a list or tuple of dicts, "
- f"got {type(chunking_result)}"
- )
- # Reflect the format actually persisted in full_docs.
- # Previously a structured-parse fallback always tagged
- # parse_format=raw, which silently mislabelled lightrag docs;
- # _build_mm_chunks_from_sidecars below gates on the persisted
- # format via the sidecar presence check, so the tag must
- # reflect what was actually stored.
- persisted_format = (
- content_data.get("parse_format")
- if isinstance(content_data, dict)
- else FULL_DOCS_FORMAT_RAW
- ) or FULL_DOCS_FORMAT_RAW
- persisted_engine = (
- content_data.get("parse_engine")
- if isinstance(content_data, dict)
- else None
- )
- extraction_meta = {
- "parse_format": persisted_format,
- "parse_engine": persisted_engine
- or (
- "native"
- if persisted_format == FULL_DOCS_FORMAT_LIGHTRAG
- else "legacy"
- ),
- "chunking_method": (
- # Explicit selector in process_options: reflect
- # the dispatched strategy. ``fixed_token_fallback``
- # is preserved as a defensive label in case a
- # future selector char slips past the validator.
- _CHUNKING_METHOD_LABELS.get(
- doc_process_opts.chunking, "fixed_token_fallback"
- )
- if doc_process_opts.chunking_explicit
- # No selector: chunking_func was invoked, which
- # defaults to chunking_by_token_size but may be
- # customized by the caller.
- else "legacy_chunking_func"
- ),
- # Mirrors the chunking start log line (params portion only,
- # without the strategy prefix or file path) so admins can
- # see the actual chunker params used. Carried across
- # transitions via ``_DOC_STATUS_METADATA_CARRY_OVER_KEYS``.
- "chunk_opts": chunk_opts_str,
- }
- blocks_path = str(parsed_data.get("blocks_path") or "").strip()
- if blocks_path:
- max_order = -1
- for ch in chunking_result:
- if isinstance(ch, dict) and isinstance(
- ch.get("chunk_order_index"), int
- ):
- max_order = max(max_order, int(ch["chunk_order_index"]))
- # Default to "" (no modalities) when full_docs has no
- # ``process_options`` key for this doc: a reinsert that
- # omits i/t/e must NOT re-index stale successful sidecars
- # left over from an earlier multimodal run. The builder's
- # None branch is reserved for ad-hoc callers (unit tests)
- # that intentionally want every modality considered.
- mm_chunks = self._build_mm_chunks_from_sidecars(
- doc_id=doc_id,
- file_path=file_path,
- blocks_path=blocks_path,
- base_order_index=max_order + 1,
- process_options=(content_data or {}).get("process_options")
- or "",
- )
- if mm_chunks:
- chunking_result = list(chunking_result) + mm_chunks
- extraction_meta["mm_chunks"] = len(mm_chunks)
- # Final hard guard before embedding: split any oversize
- # chunk while preserving heading hierarchy metadata.
- if (
- self.embedding_token_limit is not None
- and self.embedding_token_limit > 0
- ):
- original_chunk_count = len(chunking_result)
- chunking_result = enforce_chunk_token_limit_before_embedding(
- chunking_result=chunking_result,
- tokenizer=self.tokenizer,
- max_tokens=self.embedding_token_limit,
- )
- if len(chunking_result) != original_chunk_count:
- logger.info(
- "Applied hard fallback split before embedding for "
- f"d-id: {doc_id}, chunks {original_chunk_count} -> {len(chunking_result)} "
- f"(limit={self.embedding_token_limit})"
- )
- # Compact "pre -> post" summary mirrors the log
- # middle segment. Field is only present when a
- # hard split actually occurred, so its presence
- # alone signals the trigger.
- extraction_meta["hard_fallback_split"] = (
- f"{original_chunk_count} -> {len(chunking_result)}"
- )
- chunks = build_chunks_dict_from_chunking_result(
- chunking_result, doc_id=doc_id, file_path=file_path
- )
- if not chunks:
- logger.warning("No document chunks to process")
- processing_start_time = int(time.time())
- await self._raise_if_cancelled(
- ctx.pipeline_status, ctx.pipeline_status_lock
- )
- # Stage 1: persist doc_status PROCESSING + chunks in parallel.
- doc_status_task = asyncio.create_task(
- self._upsert_doc_status_transition(
- doc_id=doc_id,
- status=DocStatus.PROCESSING,
- status_doc=status_doc,
- file_path=file_path,
- extra_fields={
- "chunks_count": len(chunks),
- "chunks_list": list(chunks.keys()),
- },
- metadata_extra={
- "processing_start_time": processing_start_time,
- **extraction_meta,
- },
- )
- )
- chunks_vdb_task = asyncio.create_task(self.chunks_vdb.upsert(chunks))
- text_chunks_task = asyncio.create_task(self.text_chunks.upsert(chunks))
- first_stage_tasks = [
- doc_status_task,
- chunks_vdb_task,
- text_chunks_task,
- ]
- entity_relation_task = None
- await asyncio.gather(*first_stage_tasks)
- # Stage 2: entity/relation extraction (after text_chunks are
- # saved). When the user opted out via process_options '!',
- # skip extraction entirely; chunks remain in the vector
- # store so naive / mix retrieval still works.
- if doc_process_opts.skip_kg:
- logger.info(
- f"[skip_kg] process_options '!' set for d-id: {doc_id}; "
- f"skipping entity/relation extraction"
- )
- chunk_results = []
- extraction_meta["skip_kg"] = True
- else:
- entity_relation_task = asyncio.create_task(
- self._process_extract_entities(
- chunks,
- ctx.pipeline_status,
- ctx.pipeline_status_lock,
- )
- )
- chunk_results = await entity_relation_task
- file_extraction_stage_ok = True
- except Exception as e:
- pending_tasks = first_stage_tasks + (
- [entity_relation_task] if entity_relation_task else []
- )
- await self._finalize_doc_failure(
- doc_id=doc_id,
- status_doc=status_doc,
- file_path=file_path,
- error=e,
- stage_label="extract",
- current_file_number=current_file_number,
- total_files=ctx.total_files,
- failed_chunks_snapshot=get_failed_chunk_snapshot(),
- pending_tasks=pending_tasks,
- metadata_extra={
- "processing_start_time": processing_start_time,
- "processing_end_time": int(time.time()),
- },
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- # Concurrency is controlled by keyed lock for individual
- # entities and relationships.
- if file_extraction_stage_ok:
- try:
- await self._raise_if_cancelled(
- ctx.pipeline_status, ctx.pipeline_status_lock
- )
- # Use chunk_results from entity_relation_task. When
- # skip_kg is set, chunk_results is empty so there are no
- # nodes/edges to merge — but we still need to flush the
- # chunks_vdb / text_chunks writes (already done above)
- # and reach PROCESSED.
- if not doc_process_opts.skip_kg:
- await merge_nodes_and_edges(
- chunk_results=chunk_results,
- knowledge_graph_inst=self.chunk_entity_relation_graph,
- entity_vdb=self.entities_vdb,
- relationships_vdb=self.relationships_vdb,
- global_config=self._build_global_config(),
- full_entities_storage=self.full_entities,
- full_relations_storage=self.full_relations,
- doc_id=doc_id,
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- llm_response_cache=self.llm_response_cache,
- entity_chunks_storage=self.entity_chunks,
- relation_chunks_storage=self.relation_chunks,
- current_file_number=current_file_number,
- total_files=ctx.total_files,
- file_path=file_path,
- )
- processing_end_time = int(time.time())
- await self._upsert_doc_status_transition(
- doc_id=doc_id,
- status=DocStatus.PROCESSED,
- status_doc=status_doc,
- file_path=file_path,
- extra_fields={
- "chunks_count": len(chunks),
- "chunks_list": list(chunks.keys()),
- },
- metadata_extra={
- "processing_start_time": processing_start_time,
- "processing_end_time": processing_end_time,
- **extraction_meta,
- },
- )
- await self._insert_done()
- async with ctx.pipeline_status_lock:
- log_message = (
- f"Completed processing file "
- f"{current_file_number}/{ctx.total_files}: "
- f"{file_path}"
- )
- logger.info(log_message)
- ctx.pipeline_status["latest_message"] = log_message
- ctx.pipeline_status["history_messages"].append(log_message)
- except Exception as e:
- await self._finalize_doc_failure(
- doc_id=doc_id,
- status_doc=status_doc,
- file_path=file_path,
- error=e,
- stage_label="merge",
- current_file_number=current_file_number,
- total_files=ctx.total_files,
- failed_chunks_snapshot=get_failed_chunk_snapshot(),
- pending_tasks=[],
- metadata_extra={
- "processing_start_time": processing_start_time,
- "processing_end_time": int(time.time()),
- **extraction_meta,
- },
- pipeline_status=ctx.pipeline_status,
- pipeline_status_lock=ctx.pipeline_status_lock,
- )
- async def _purge_stale_extraction_if_resuming(
- self,
- *,
- doc_id: str,
- status_doc: DocProcessingStatus,
- file_path: str,
- content_data: dict[str, Any] | None,
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> None:
- """If the document already has extracted content, purge stale chunks
- and KG contributions before re-running chunking + entity extraction
- under the current ``process_options``.
- Mutates ``status_doc.chunks_list`` / ``chunks_count`` to reflect the
- purge so subsequent state-machine upserts don't write back stale IDs.
- Also emits an engine-mismatch warning when the filename hint disagrees
- with the stored ``parse_engine`` — the extracted content is the source
- of truth, so the user must delete + re-upload to switch engines.
- """
- content_already_extracted = isinstance(content_data, dict) and (
- (
- content_data.get("parse_format") == FULL_DOCS_FORMAT_LIGHTRAG
- and content_data.get("sidecar_location")
- )
- or (
- content_data.get("parse_format") == FULL_DOCS_FORMAT_RAW
- and (content_data.get("content") or "").strip()
- )
- )
- if not content_already_extracted:
- return
- intended_engine, _ = resolve_file_parser_directives(file_path)
- stored_engine = (content_data.get("parse_engine") or "").lower()
- if intended_engine and stored_engine and intended_engine != stored_engine:
- log_message = (
- f"[resume] {doc_id}: filename hint / "
- f"LIGHTRAG_PARSER implies engine="
- f"{intended_engine!r} but full_docs "
- f"already has parse_engine="
- f"{stored_engine!r}; keeping the existing "
- f"extraction. Delete + re-upload to "
- f"switch engines."
- )
- logger.warning(log_message)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- stored_chunk_ids = {
- chunk_id
- for chunk_id in (status_doc.chunks_list or [])
- if isinstance(chunk_id, str) and chunk_id
- }
- if not stored_chunk_ids:
- return
- log_message = (
- f"[resume] {doc_id}: purging "
- f"{len(stored_chunk_ids)} chunk(s) and "
- f"associated KG entries from a previous run "
- f"before rebuilding under current "
- f"process_options"
- )
- logger.info(log_message)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- await self._purge_doc_chunks_and_kg(
- doc_id,
- stored_chunk_ids,
- pipeline_status=pipeline_status,
- pipeline_status_lock=pipeline_status_lock,
- )
- # The status_doc carries chunks_list / chunks_count from the prior
- # run; clear them so subsequent state-machine upserts don't write
- # back stale IDs.
- status_doc.chunks_list = []
- status_doc.chunks_count = 0
- # ============================================================
- # doc_status state-machine helpers (shared by all layers)
- # ============================================================
- async def _upsert_doc_status_transition(
- self,
- doc_id: str,
- status: DocStatus,
- status_doc: DocProcessingStatus,
- file_path: str,
- *,
- extra_fields: dict[str, Any] | None = None,
- metadata_extra: dict[str, Any] | None = None,
- ) -> None:
- """Single source of truth for doc_status state-transition upserts.
- Mirrors the field set used at every PARSING / ANALYZING / PROCESSING /
- PROCESSED / FAILED transition. ``extra_fields`` carries
- ``chunks_count`` / ``chunks_list`` / ``error_msg``; ``metadata_extra``
- is forwarded to ``doc_status_transition_metadata`` so carry-over
- fields (e.g. ``process_options``) survive every state change.
- """
- payload: dict[str, Any] = {
- "status": status,
- "content_summary": status_doc.content_summary,
- "content_length": status_doc.content_length,
- "created_at": status_doc.created_at,
- "updated_at": datetime.now(timezone.utc).isoformat(),
- "file_path": file_path,
- "track_id": status_doc.track_id,
- "content_hash": status_doc.content_hash,
- "metadata": doc_status_transition_metadata(
- status_doc, extra=metadata_extra
- ),
- }
- if extra_fields:
- payload.update(extra_fields)
- await self.doc_status.upsert({doc_id: payload})
- async def _raise_if_cancelled(
- self,
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> None:
- """Raise ``PipelineCancelledException`` if the user has requested cancel."""
- async with pipeline_status_lock:
- if pipeline_status.get("cancellation_requested", False):
- raise PipelineCancelledException("User cancelled")
- async def _cancellation_requested(
- self,
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> bool:
- """Read-only cancellation check.
- Use this when a worker wants to branch on the flag (e.g. drain a queue
- item) instead of raising. Callers that prefer the exception style
- should use :meth:`_raise_if_cancelled` instead.
- """
- async with pipeline_status_lock:
- return bool(pipeline_status.get("cancellation_requested", False))
- async def _mark_doc_cancelled_in_stage(
- self,
- *,
- doc_id: str,
- status_doc: DocProcessingStatus,
- file_path: str,
- stage_label: str,
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> None:
- """Mark a queued document FAILED with a 'User cancelled' message.
- Used by the PARSE and ANALYZE workers, which do not have the
- chunks-snapshot / pending-tasks bookkeeping that
- :meth:`_finalize_doc_failure` carries for the PROCESS stage. Also
- flushes the LLM response cache so any cache_ids written by completed
- sibling tasks (e.g. successful multimodal items inside a doc that is
- being cancelled) survive a server restart.
- """
- error_msg = f"User cancelled during {stage_label}: {file_path}"
- logger.warning(error_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = error_msg
- pipeline_status["history_messages"].append(error_msg)
- if self.llm_response_cache:
- try:
- await self.llm_response_cache.index_done_callback()
- except Exception as persist_error:
- logger.error(f"Failed to persist LLM cache: {persist_error}")
- try:
- await self._upsert_doc_status_transition(
- doc_id=doc_id,
- status=DocStatus.FAILED,
- status_doc=status_doc,
- file_path=file_path,
- extra_fields={"error_msg": error_msg},
- )
- except Exception as exc:
- logger.error(f"Failed to mark cancelled doc {doc_id} as FAILED: {exc}")
- async def _finalize_doc_failure(
- self,
- *,
- doc_id: str,
- status_doc: DocProcessingStatus,
- file_path: str,
- error: BaseException,
- stage_label: str,
- current_file_number: int,
- total_files: int,
- failed_chunks_snapshot: tuple[list[str], int],
- pending_tasks: list[asyncio.Task],
- metadata_extra: dict[str, Any],
- pipeline_status: dict,
- pipeline_status_lock,
- ) -> None:
- """Common epilogue for an extract / merge stage failure.
- Logs the error (or cancellation), cancels any pending stage tasks,
- flushes the LLM response cache, and writes a FAILED status row that
- preserves the failed chunks snapshot and processing-time metadata.
- """
- if isinstance(error, PipelineCancelledException):
- if stage_label == "merge":
- error_msg = (
- f"User cancelled during merge {current_file_number}/"
- f"{total_files}: {file_path}"
- )
- else:
- error_msg = (
- f"User cancelled {current_file_number}/{total_files}: {file_path}"
- )
- logger.warning(error_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = error_msg
- pipeline_status["history_messages"].append(error_msg)
- else:
- logger.error(traceback.format_exc())
- if stage_label == "merge":
- error_msg = (
- f"Merging stage failed in document "
- f"{current_file_number}/{total_files}: {file_path}"
- )
- else:
- error_msg = (
- f"Failed to extract document "
- f"{current_file_number}/{total_files}: {file_path}"
- )
- logger.error(error_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = error_msg
- pipeline_status["history_messages"].append(traceback.format_exc())
- pipeline_status["history_messages"].append(error_msg)
- for task in pending_tasks:
- if task and not task.done():
- task.cancel()
- if self.llm_response_cache:
- try:
- await self.llm_response_cache.index_done_callback()
- except Exception as persist_error:
- logger.error(f"Failed to persist LLM cache: {persist_error}")
- failed_chunks_list, failed_chunks_count = failed_chunks_snapshot
- await self._upsert_doc_status_transition(
- doc_id=doc_id,
- status=DocStatus.FAILED,
- status_doc=status_doc,
- file_path=file_path,
- extra_fields={
- "error_msg": str(error),
- "chunks_count": failed_chunks_count,
- "chunks_list": failed_chunks_list,
- },
- metadata_extra=metadata_extra,
- )
- # ============================================================
- # Parser engines (also called by tests directly)
- # ============================================================
- async def parse_native(
- self, doc_id: str, file_path: str, content_data: dict[str, Any]
- ) -> dict[str, Any]:
- """Phase 1 parse for native/raw, lightrag and pending_parse formats."""
- doc_format = content_data.get("parse_format", FULL_DOCS_FORMAT_RAW)
- if doc_format == FULL_DOCS_FORMAT_LIGHTRAG:
- # full_docs.content carries the merged text with the {{LRdoc}}
- # marker; strip it so the chunking path is identical to raw.
- # blocks_path is still resolved for downstream multimodal
- # sidecar reads (_build_mm_chunks_from_sidecars).
- # No re-parse happens here — content + sidecar are reused from a
- # prior parse, so this is semantically a cache-hit and mirrors
- # the parse_mineru / parse_docling raw-bundle skip path by
- # setting ``parse_stage_skipped``.
- merged_text = strip_lightrag_doc_prefix(
- content_data.get("content"), doc_format
- )
- blocks_path = (
- sidecar_blocks_path(content_data.get("sidecar_location")) or ""
- )
- return {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": doc_format,
- "content": merged_text,
- "blocks_path": blocks_path,
- "parse_stage_skipped": True,
- }
- if doc_format == FULL_DOCS_FORMAT_PENDING_PARSE:
- source_path = _call_source_file_resolver(
- self,
- file_path,
- source_file_name=content_data.get("source_file_name"),
- parser_engine=PARSER_ENGINE_NATIVE,
- )
- p = Path(source_path)
- if not (p.exists() and p.is_file() and p.suffix.lower() == ".docx"):
- raise ValueError(
- f"Native parser does not support pending file: {file_path}"
- )
- # Lazy imports keep this module import-cheap and avoid pulling
- # the docx parser into call paths that never touch the native
- # engine (mirrors parse_mineru).
- from lightrag.parser.docx.drawing_image_extractor import (
- DrawingExtractionContext,
- load_relationships,
- )
- from lightrag.parser.docx.parse_document import (
- extract_docx_blocks,
- )
- from lightrag.parser.docx.ir_builder import NativeDocxIRBuilder
- from lightrag.sidecar import write_sidecar
- # ``file_path`` is canonical at the worker layer; canonicalize
- # again defensively so direct callers (tests, CLI) may pass
- # absolute paths or hint-bearing names.
- document_name = normalize_document_file_path(file_path)
- if document_name == "unknown_source":
- document_name = p.name or f"{doc_id}.bin"
- base_name = Path(document_name).stem or document_name
- parsed_dir = parsed_artifact_dir_for(document_name, parent_hint=p.parent)
- asset_dir = parsed_dir / f"{base_name}.blocks.assets"
- def _extract_blocks_sync() -> (
- tuple[list[dict[str, Any]], dict[str, Any], dict[str, Any]]
- ):
- # Pre-clean parsed_dir and pre-create the asset dir so the
- # drawing extractor can write image bytes BEFORE write_sidecar
- # runs (which is then called with clean_parsed_dir=False to
- # keep those bytes). ``parsed_artifact_dir_for`` returns
- # a unique dir per source (with ``_001``/``_002`` suffixes on
- # collision), so the rmtree here only ever clobbers stale
- # artifacts from a previous attempt at the same doc_id.
- if parsed_dir.exists():
- shutil.rmtree(parsed_dir)
- parsed_dir.mkdir(parents=True, exist_ok=True)
- asset_dir.mkdir(parents=True, exist_ok=True)
- ctx = DrawingExtractionContext(
- docx_path=p,
- blocks_output_path=parsed_dir / f"{base_name}.blocks.jsonl",
- export_dir_name=asset_dir.name,
- export_dir_path=asset_dir,
- )
- load_relationships(ctx)
- warnings: dict[str, Any] = {}
- metadata: dict[str, Any] = {}
- extracted = extract_docx_blocks(
- str(p),
- debug=False,
- fixlevel=0,
- drawing_context=ctx,
- parse_warnings=warnings,
- parse_metadata=metadata,
- )
- return extracted, warnings, metadata
- try:
- blocks, parse_warnings, parse_metadata = await asyncio.to_thread(
- _extract_blocks_sync
- )
- except BaseException:
- # ``_extract_blocks_sync`` pre-creates ``parsed_dir`` and
- # ``asset_dir`` before invoking the extractor; if extraction
- # raises, those (possibly partially-populated) dirs would be
- # left on disk. Roll them back so the next attempt starts clean.
- if parsed_dir.exists():
- shutil.rmtree(parsed_dir, ignore_errors=True)
- raise
- if not blocks:
- # Same cleanup path for the "extractor returned []" case —
- # ``write_sidecar`` would never run, so without this the
- # pre-created (empty) dirs would persist.
- if parsed_dir.exists():
- shutil.rmtree(parsed_dir, ignore_errors=True)
- raise ValueError(f"DOCX parser returned empty content for {file_path}")
- missing_paraid_count = int(
- parse_warnings.get("missing_paraid_count", 0) or 0
- )
- if missing_paraid_count > 0:
- # Surface once per document — the parser may encounter many
- # missing paraIds (legacy / non-Word authors omit
- # ``w14:paraId``), but a single warning with the count is
- # enough. Affected blocks emit
- # ``positions: [{"type": "paraid", "range": null}]``.
- logger.warning(
- "[parse_native] %s: %d paragraphs lack paraId; "
- "Re-saving file in Word 2013+ to regenerate ids.",
- p.name,
- missing_paraid_count,
- )
- ir = NativeDocxIRBuilder().normalize(
- blocks,
- document_name=document_name,
- asset_dir_name=asset_dir.name,
- parse_metadata=parse_metadata,
- )
- parsed_data = write_sidecar(
- ir,
- parsed_dir=parsed_dir,
- doc_id=doc_id,
- engine=PARSER_ENGINE_NATIVE,
- clean_parsed_dir=False, # we pre-populated the asset dir
- block_drawing_path_style="basename_only", # legacy native shape
- )
- await self._persist_parsed_full_docs(
- doc_id,
- {
- "content": make_lightrag_doc_content(parsed_data["content"]),
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "sidecar_location": sidecar_uri_for(parsed_dir),
- "parse_engine": PARSER_ENGINE_NATIVE,
- "update_time": int(time.time()),
- },
- )
- await archive_docx_source_after_full_docs_sync(str(p))
- logger.info(
- f"[parse_native] pending_parse completed for {file_path} "
- f"via parser/docx"
- )
- result: dict[str, Any] = {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "content": parsed_data["content"],
- "blocks_path": parsed_data["blocks_path"],
- }
- if missing_paraid_count > 0:
- # Pipeline reads this from the parsed_data dict and writes it
- # to ``doc_status.metadata.parse_warnings`` so admin/list APIs
- # can surface the issue alongside the document record.
- result["parse_warnings"] = {
- "missing_paraid_count": missing_paraid_count
- }
- return result
- # FULL_DOCS_FORMAT_RAW: no parser ran — the content was supplied
- # at insert time and we pass it through verbatim. Mark as skipped
- # so post-mortem doesn't credit the worker with a synthetic parse
- # duration (mirrors the LIGHTRAG-format branch above).
- return {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_RAW,
- "content": content_data.get("content", ""),
- "blocks_path": "",
- "parse_stage_skipped": True,
- }
- async def parse_mineru(
- self, doc_id: str, file_path: str, content_data: dict[str, Any]
- ) -> dict[str, Any]:
- """Parse a document through MinerU and emit a spec-compliant sidecar.
- Layout produced under ``inputs/<space>/__parsed__/``:
- - ``<base>.parsed/`` — sidecar (blocks.jsonl + per-modality JSONs + assets)
- - ``<base>.mineru_raw/`` — preserved MinerU bundle (content_list.json,
- full.md, middle.json, images/, ...) plus ``_manifest.json``
- The raw bundle is kept on disk so subsequent re-parses with the same
- source content can skip the upload+poll+download round trip. It is
- cleaned only when the user explicitly deletes the document with the
- "also delete original file" option; see
- :func:`lightrag.api.routers.document_routes.delete_file_variants_by_file_path`.
- """
- # Lazy imports keep this module import-cheap and avoid pulling httpx
- # into call paths that never touch the MinerU engine.
- from lightrag.parser.external.mineru import (
- MinerUIRBuilder,
- MinerURawClient,
- clear_dir_contents,
- is_bundle_valid,
- raw_dir_for_parsed_dir,
- )
- from lightrag.sidecar import write_sidecar
- source_file_path = Path(
- _call_source_file_resolver(
- self,
- file_path,
- source_file_name=content_data.get("source_file_name"),
- parser_engine=PARSER_ENGINE_MINERU,
- )
- )
- if not source_file_path.is_file():
- raise FileNotFoundError(f"MinerU source file not found: {source_file_path}")
- # Canonicalize defensively so direct callers (tests, CLI) may pass
- # absolute paths or hint-bearing names.
- document_name = normalize_document_file_path(file_path)
- if document_name == "unknown_source":
- document_name = source_file_path.name or f"{doc_id}.bin"
- parsed_dir = parsed_artifact_dir_for(
- document_name, parent_hint=source_file_path.parent
- )
- raw_dir = raw_dir_for_parsed_dir(parsed_dir)
- force_reparse = os.getenv("LIGHTRAG_FORCE_REPARSE_MINERU", "").lower() in {
- "1",
- "true",
- "yes",
- "on",
- }
- parse_stage_skipped = False
- if not force_reparse and is_bundle_valid(raw_dir, source_file_path):
- # Cache hit: keep the path purely local so a re-parse still
- # succeeds if MinerU credentials/endpoint are temporarily
- # unavailable (key rotation, debugging, etc.). Network config
- # is only required on cache miss below.
- parse_stage_skipped = True
- logger.info("[parse_mineru] raw cache hit doc_id=%s", doc_id)
- else:
- if force_reparse and raw_dir.exists():
- logger.info(
- "[parse_mineru] LIGHTRAG_FORCE_REPARSE_MINERU set; "
- "discarding bundle at %s",
- raw_dir,
- )
- raw_dir.mkdir(parents=True, exist_ok=True)
- clear_dir_contents(raw_dir)
- client = MinerURawClient()
- logger.info(
- "[MinerU] Parsing %s %s (may take a few minutes)",
- doc_id,
- source_file_path.name,
- )
- await client.download_into(
- raw_dir,
- source_file_path,
- upload_name=document_name,
- )
- ir_builder = MinerUIRBuilder()
- ir = ir_builder.normalize_from_workdir(raw_dir, document_name=document_name)
- parsed_data = write_sidecar(
- ir,
- parsed_dir=parsed_dir,
- doc_id=doc_id,
- engine=PARSER_ENGINE_MINERU,
- )
- # Keep full_docs in sync so restart/reprocess can directly use the
- # sidecar (matches the native_docx and content_list paths).
- await self._persist_parsed_full_docs(
- doc_id,
- {
- "content": make_lightrag_doc_content(parsed_data["content"]),
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "sidecar_location": sidecar_uri_for(parsed_dir),
- "parse_engine": PARSER_ENGINE_MINERU,
- "update_time": int(time.time()),
- },
- )
- await archive_docx_source_after_full_docs_sync(str(source_file_path))
- return {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "content": parsed_data["content"],
- "blocks_path": parsed_data["blocks_path"],
- "parse_stage_skipped": parse_stage_skipped,
- }
- async def parse_docling(
- self, doc_id: str, file_path: str, content_data: dict[str, Any]
- ) -> dict[str, Any]:
- """Parse a document through Docling Serve and emit a spec-compliant sidecar.
- Produces the same dual-directory layout as ``parse_mineru``:
- - ``<base>.parsed/`` — sidecar (blocks.jsonl + per-modality JSONs + assets)
- - ``<base>.docling_raw/`` — preserved Docling bundle (``<stem>.json``,
- ``<stem>.md``, ``artifacts/``) plus ``_manifest.json``
- The raw bundle is kept so subsequent re-parses with the same source
- bytes skip the upload + poll + download round trip.
- """
- # Lazy imports keep this module import-cheap and avoid pulling httpx
- # into call paths that never touch the Docling engine.
- from lightrag.parser.external.docling import (
- DoclingIRBuilder,
- DoclingRawClient,
- clear_dir_contents,
- is_bundle_valid,
- raw_dir_for_parsed_dir,
- )
- from lightrag.sidecar import write_sidecar
- source_file_path = Path(
- _call_source_file_resolver(
- self,
- file_path,
- source_file_name=content_data.get("source_file_name"),
- parser_engine=PARSER_ENGINE_DOCLING,
- )
- )
- if not source_file_path.is_file():
- raise FileNotFoundError(
- f"Docling source file not found: {source_file_path}"
- )
- document_name = normalize_document_file_path(file_path)
- if document_name == "unknown_source":
- document_name = source_file_path.name or f"{doc_id}.bin"
- parsed_dir = parsed_artifact_dir_for(
- document_name, parent_hint=source_file_path.parent
- )
- raw_dir = raw_dir_for_parsed_dir(parsed_dir)
- force_reparse = os.getenv("LIGHTRAG_FORCE_REPARSE_DOCLING", "").lower() in {
- "1",
- "true",
- "yes",
- "on",
- }
- parse_stage_skipped = False
- if not force_reparse and is_bundle_valid(raw_dir, source_file_path):
- # Cache hit: keep purely local so re-parses still work when the
- # docling-serve endpoint is temporarily unavailable.
- parse_stage_skipped = True
- logger.info("[parse_docling] raw cache hit doc_id=%s", doc_id)
- else:
- if force_reparse and raw_dir.exists():
- logger.info(
- "[parse_docling] LIGHTRAG_FORCE_REPARSE_DOCLING set; "
- "discarding bundle at %s",
- raw_dir,
- )
- # ``download_into`` mkdir's the raw_dir itself; we only need to
- # wipe the existing contents (manifest + stale bundle files).
- clear_dir_contents(raw_dir)
- client = DoclingRawClient()
- logger.info(
- "[Docling] Parsing %s %s (may take a few minutes)",
- doc_id,
- source_file_path.name,
- )
- # Pass the canonical (hint-stripped) name so docling-serve names
- # the bundle's main JSON ``<canonical_stem>.json`` instead of
- # ``<hinted_stem>.json``. Otherwise the IR builder — which only sees
- # the canonical ``document_name`` — cannot locate the bundle JSON
- # via the preferred-path lookup.
- await client.download_into(
- raw_dir, source_file_path, upload_filename=document_name
- )
- ir_builder = DoclingIRBuilder()
- ir = ir_builder.normalize_from_workdir(raw_dir, document_name=document_name)
- if not ir.blocks:
- raise ValueError(
- f"Docling IR builder produced zero blocks for {file_path} "
- f"(raw_dir={raw_dir})"
- )
- parsed_data = write_sidecar(
- ir,
- parsed_dir=parsed_dir,
- doc_id=doc_id,
- engine=PARSER_ENGINE_DOCLING,
- )
- await self._persist_parsed_full_docs(
- doc_id,
- {
- "content": make_lightrag_doc_content(parsed_data["content"]),
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "sidecar_location": sidecar_uri_for(parsed_dir),
- "parse_engine": PARSER_ENGINE_DOCLING,
- "update_time": int(time.time()),
- },
- )
- await archive_docx_source_after_full_docs_sync(str(source_file_path))
- return {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "content": parsed_data["content"],
- "blocks_path": parsed_data["blocks_path"],
- "parse_stage_skipped": parse_stage_skipped,
- }
- # ============================================================
- # Parser internals
- # ============================================================
- async def _persist_parsed_full_docs(
- self,
- doc_id: str,
- record: dict[str, Any],
- ) -> str | None:
- """Write a parse-result record to ``full_docs`` and sync ``content_hash``.
- Computes ``content_hash`` from the actual extracted body so subsequent
- ``get_doc_by_content_hash`` lookups can dedupe across pending_parse
- records that did not have a hash at enqueue time. Also patches the
- existing ``doc_status`` row so both storages stay aligned on
- ``content_hash``.
- The original ``pending_parse`` record carries metadata seeded at
- enqueue time (``process_options`` etc.) that downstream stages still
- need after parsing. ``full_docs`` upserts overwrite the entire row,
- so we merge the existing record with the new ``record`` payload
- before upserting: fresh fields from ``record`` (``content`` /
- ``parse_format`` / ``sidecar_location`` / ``parse_engine`` /
- ``update_time``) take precedence, while pre-existing fields are
- preserved.
- """
- fmt = record.get("parse_format")
- content_hash: str | None = None
- # Hash the bare merged text (after stripping the ``{{LRdoc}}`` marker
- # for lightrag-format) so cross-filename dedup fires regardless of
- # whether the same body was ingested as raw text or via a sidecar.
- # ``strip_lightrag_doc_prefix`` is a no-op for non-lightrag formats.
- if fmt in (FULL_DOCS_FORMAT_RAW, FULL_DOCS_FORMAT_LIGHTRAG):
- content_hash = compute_text_content_hash(
- strip_lightrag_doc_prefix(record.get("content") or "", fmt)
- )
- existing = await self.full_docs.get_by_id(doc_id)
- if isinstance(existing, dict):
- payload = {**existing, **record}
- else:
- payload = dict(record)
- if content_hash:
- payload["content_hash"] = content_hash
- await self.full_docs.upsert({doc_id: payload})
- await self.full_docs.index_done_callback()
- if content_hash:
- existing_status = await self.doc_status.get_by_id(doc_id)
- if existing_status:
- patched = dict(existing_status)
- patched["content_hash"] = content_hash
- patched["updated_at"] = datetime.now(timezone.utc).isoformat()
- await self.doc_status.upsert({doc_id: patched})
- return content_hash
- async def _mark_duplicate_after_parse(
- self,
- doc_id: str,
- status_doc: DocProcessingStatus,
- file_path: str,
- content_hash: str | None,
- content_length: int,
- content_data: dict[str, Any] | None = None,
- pipeline_status: dict | None = None,
- pipeline_status_lock: asyncio.Lock | None = None,
- ) -> bool:
- """Mark post-parse content duplicates and stop further processing."""
- if not content_hash:
- return False
- match = await get_duplicate_doc_by_content_hash(
- self.doc_status, content_hash, doc_id
- )
- if not match:
- return False
- original_doc_id, original_doc = match
- original_track_id = doc_status_field(original_doc, "track_id", "")
- original_status = doc_status_field(original_doc, "status", "unknown")
- now = datetime.now(timezone.utc).isoformat()
- message = (
- "Identical content already exists under another filename. "
- f"Original doc_id: {original_doc_id}, Status: {original_status}"
- )
- await self.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.FAILED,
- "content_summary": (
- f"[DUPLICATE:content_hash] Original document: {original_doc_id}"
- ),
- "content_length": content_length,
- "chunks_count": 0,
- "chunks_list": [],
- "created_at": status_doc.created_at,
- "updated_at": now,
- "file_path": file_path,
- "track_id": status_doc.track_id,
- "content_hash": content_hash,
- "error_msg": message,
- "metadata": doc_status_transition_metadata(
- status_doc,
- extra={
- "is_duplicate": True,
- "duplicate_kind": "content_hash",
- "original_doc_id": original_doc_id,
- "original_track_id": original_track_id,
- },
- ),
- }
- }
- )
- try:
- await self.full_docs.delete([doc_id])
- await self.full_docs.index_done_callback()
- except Exception as e:
- logger.warning(f"Failed to remove duplicate full_docs entry {doc_id}: {e}")
- source_path = _call_source_file_resolver(
- self,
- file_path,
- source_file_name=content_data.get("source_file_name")
- if content_data
- else None,
- )
- archived = await archive_source_after_full_docs_sync(source_path)
- archive_msg = f"; archived to {archived}" if archived else ""
- warning = f"Duplicate content skipped after parsing: {file_path}{archive_msg}"
- logger.warning(warning)
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = warning
- pipeline_status["history_messages"].append(warning)
- return True
- def _resolve_source_file_for_parser(
- self,
- file_path: str,
- *,
- source_file_name: str | None = None,
- parser_engine: str | None = None,
- ) -> str:
- """Resolve a readable source file path for parser upload.
- ``file_path`` is the canonical stored basename. Pending-parse records
- may also carry ``source_file_name`` with the real uploaded/scanned
- basename, including parser hints.
- """
- candidates: list[Path] = []
- roots: list[Path] = []
- def _add_candidate(path_value: Any) -> None:
- raw = str(path_value or "").strip()
- if not raw:
- return
- path = Path(raw)
- candidates.append(path)
- if path.parent != Path("."):
- roots.append(path.parent)
- roots.append(path.parent / PARSED_DIR_NAME)
- candidates.append(path.parent / PARSED_DIR_NAME / path.name)
- _add_candidate(file_path)
- p = Path(file_path)
- name = p.name
- source_name = Path(str(source_file_name or "").strip()).name
- input_path = input_dir_path()
- # API ``DocumentManager`` scopes its input dir to
- # ``<base_input_dir>/<workspace>/`` (see DocumentManager.__init__);
- # check that location first so files uploaded into a workspace
- # subdirectory resolve correctly. ``self.workspace`` is empty when
- # no workspace is configured, in which case these candidates
- # collapse to the base candidates that follow.
- workspace = getattr(self, "workspace", "") or ""
- if workspace:
- candidates.append(input_path / workspace / name)
- candidates.append(input_path / workspace / PARSED_DIR_NAME / name)
- roots.append(input_path / workspace)
- roots.append(input_path / workspace / PARSED_DIR_NAME)
- candidates.append(input_path / name)
- candidates.append(input_path / PARSED_DIR_NAME / name)
- roots.append(input_path)
- roots.append(input_path / PARSED_DIR_NAME)
- # Common local defaults used by API server.
- cwd = Path.cwd()
- if workspace:
- candidates.append(cwd / "inputs" / workspace / name)
- candidates.append(cwd / "inputs" / workspace / PARSED_DIR_NAME / name)
- roots.append(cwd / "inputs" / workspace)
- roots.append(cwd / "inputs" / workspace / PARSED_DIR_NAME)
- candidates.extend(
- [
- cwd / "inputs" / name,
- cwd / "inputs" / PARSED_DIR_NAME / name,
- cwd / PARSED_DIR_NAME / name,
- ]
- )
- roots.extend(
- [
- cwd / "inputs",
- cwd / "inputs" / PARSED_DIR_NAME,
- cwd / PARSED_DIR_NAME,
- ]
- )
- if source_name:
- candidates = [root / source_name for root in roots] + candidates
- seen_candidates: set[Path] = set()
- for candidate in candidates:
- if candidate in seen_candidates:
- continue
- seen_candidates.add(candidate)
- if candidate.exists() and candidate.is_file():
- return str(candidate)
- canonical_name = normalize_document_file_path(file_path)
- if has_known_document_source(canonical_name):
- matches: list[Path] = []
- seen_roots: set[Path] = set()
- for root in roots:
- if root in seen_roots:
- continue
- seen_roots.add(root)
- if not root.exists() or not root.is_dir():
- continue
- for candidate in sorted(root.iterdir(), key=lambda item: item.name):
- if (
- candidate.is_file()
- and normalize_document_file_path(candidate.name)
- == canonical_name
- ):
- matches.append(candidate)
- if source_name:
- for candidate in matches:
- if candidate.name == source_name:
- return str(candidate)
- if parser_engine:
- from lightrag.parser.routing import filename_parser_directives
- for candidate in matches:
- hinted_engine, _ = filename_parser_directives(candidate.name)
- if hinted_engine == parser_engine:
- return str(candidate)
- if matches:
- return str(matches[0])
- return file_path
- async def _write_lightrag_document_from_content_list(
- self,
- doc_id: str,
- file_path: str,
- content_list: list[dict[str, Any]],
- engine: str,
- ) -> dict[str, Any]:
- """Convert parser content list to LightRAG Document files and return parsed_data."""
- document_name = normalize_document_file_path(file_path)
- if document_name == "unknown_source":
- document_name = f"{doc_id}.bin"
- parsed_dir = parsed_artifact_dir_for(document_name)
- if parsed_dir.exists():
- shutil.rmtree(parsed_dir)
- parsed_dir.mkdir(parents=True, exist_ok=True)
- base_name = Path(document_name).stem or document_name
- blocks_path = parsed_dir / f"{base_name}.blocks.jsonl"
- tables_path = parsed_dir / f"{base_name}.tables.json"
- drawings_path = parsed_dir / f"{base_name}.drawings.json"
- equations_path = parsed_dir / f"{base_name}.equations.json"
- blocks_lines: list[str] = []
- merged_parts: list[str] = []
- block_idx = 0
- table_idx = 0
- drawing_idx = 0
- equation_idx = 0
- tables: dict[str, Any] = {}
- drawings: dict[str, Any] = {}
- equations: dict[str, Any] = {}
- def _to_list_str(value: Any) -> list[str]:
- if value is None:
- return []
- if isinstance(value, list):
- return [str(x) for x in value if str(x).strip()]
- text_val = str(value).strip()
- return [text_val] if text_val else []
- def _parse_int(value: Any, default: int = 0) -> int:
- try:
- return int(value)
- except Exception:
- return default
- def _normalize_grid_rows(grid: Any) -> list[list[str]]:
- normalized_rows: list[list[str]] = []
- if not isinstance(grid, list):
- return normalized_rows
- for row in grid:
- if not isinstance(row, list):
- continue
- normalized_row: list[str] = []
- for cell in row:
- if isinstance(cell, dict):
- normalized_row.append(str(cell.get("text", "")).strip())
- else:
- normalized_row.append(str(cell).strip())
- normalized_rows.append(normalized_row)
- return normalized_rows
- def _coerce_table_rows(
- value: Any,
- ) -> tuple[str, Any, list[list[str]], int, int]:
- raw_value = value
- if isinstance(raw_value, str):
- stripped = raw_value.strip()
- if not stripped:
- return "html", "", [], 0, 0
- parsed_value = None
- try:
- parsed_value = json.loads(stripped)
- except Exception:
- try:
- import ast
- parsed_value = ast.literal_eval(stripped)
- except Exception:
- parsed_value = None
- if parsed_value is None:
- return "html", raw_value, [], 0, 0
- raw_value = parsed_value
- if isinstance(raw_value, list):
- rows = _normalize_grid_rows(raw_value)
- return (
- "json",
- json.dumps(rows, ensure_ascii=False),
- rows,
- len(rows),
- max((len(r) for r in rows), default=0),
- )
- if isinstance(raw_value, dict):
- rows = _normalize_grid_rows(raw_value.get("grid"))
- if not rows and isinstance(raw_value.get("rows"), list):
- rows = _normalize_grid_rows(raw_value.get("rows"))
- num_rows = _parse_int(
- raw_value.get("num_rows"), len(rows) if rows else 0
- )
- num_cols = _parse_int(
- raw_value.get("num_cols"),
- max((len(r) for r in rows), default=0),
- )
- if rows:
- return (
- "json",
- json.dumps(rows, ensure_ascii=False),
- rows,
- num_rows,
- num_cols,
- )
- return (
- "html",
- json.dumps(raw_value, ensure_ascii=False),
- [],
- num_rows,
- num_cols,
- )
- text_value = str(raw_value or "").strip()
- return "html", text_value, [], 0, 0
- heading_stack: list[str] = []
- def _update_heading_context(
- heading_text: str, level: int
- ) -> tuple[str, int, list[str]]:
- nonlocal heading_stack
- clean_heading = str(heading_text or "").strip()
- clean_level = max(_parse_int(level, 1), 1)
- heading_stack = heading_stack[: max(clean_level - 1, 0)]
- parent_chain = [x for x in heading_stack if x]
- heading_stack.append(clean_heading)
- return clean_heading, clean_level, parent_chain
- def _append_block(
- content_text: str,
- heading: str = "",
- level: int = 0,
- parent_headings: list[str] | None = None,
- ) -> str:
- nonlocal block_idx
- content_text = str(content_text or "").strip()
- if not content_text:
- return ""
- blockid = hashlib.md5(
- f"{doc_id}:{block_idx}:{heading}:{content_text}".encode("utf-8")
- ).hexdigest()
- blocks_lines.append(
- json.dumps(
- {
- "type": "content",
- "blockid": blockid,
- "format": "plain_text",
- "content": content_text,
- "heading": heading,
- "parent_headings": list(parent_headings or []),
- "level": level,
- "session_type": "body",
- "table_slice": "none",
- "positions": [],
- },
- ensure_ascii=False,
- )
- )
- merged_parts.append(content_text)
- block_idx += 1
- return blockid
- current_heading = ""
- current_level = 0
- current_parent_headings: list[str] = []
- for item in content_list:
- if not isinstance(item, dict):
- continue
- item_type = str(item.get("type") or item.get("label") or "").lower()
- if item_type in {"text", "title", "section_header", "list", "code"}:
- text = (
- item.get("text")
- or item.get("content")
- or "\n".join(
- item.get("list_items", [])
- if isinstance(item.get("list_items"), list)
- else []
- )
- or item.get("code_body")
- or ""
- )
- if not str(text).strip():
- continue
- inferred_level = int(item.get("text_level", 0) or 0)
- if item_type in {"title", "section_header"} and inferred_level <= 0:
- inferred_level = int(item.get("level", 1) or 1)
- if inferred_level > 0:
- (
- current_heading,
- current_level,
- current_parent_headings,
- ) = _update_heading_context(str(text), inferred_level)
- _append_block(
- str(text),
- heading=current_heading,
- level=current_level,
- parent_headings=current_parent_headings,
- )
- continue
- if item_type == "equation":
- equation_idx += 1
- eq_id = str(
- item.get("id")
- or f"eq-{doc_id.removeprefix('doc-')}-{equation_idx:04d}"
- )
- caption = str(item.get("caption") or f"公式{equation_idx}")
- footnotes = _to_list_str(
- item.get("equation_footnote") or item.get("footnotes")
- )
- eq_text = str(item.get("text") or item.get("content") or "").strip()
- wrapped = (
- f'<equation id="{eq_id}" format="latex" caption="{caption}">{eq_text}</equation>'
- if eq_text
- else f'<cite type="equation" refid="{eq_id}">公式{equation_idx}</cite>'
- )
- blockid = _append_block(
- wrapped,
- heading=current_heading,
- level=current_level,
- parent_headings=current_parent_headings,
- )
- equations[eq_id] = {
- "id": eq_id,
- "blockid": blockid,
- "heading": current_heading,
- "format": "latex",
- "content": eq_text,
- "caption": caption,
- "footnotes": footnotes,
- }
- continue
- if item_type == "table":
- table_idx += 1
- table_id = str(
- item.get("id")
- or f"tb-{doc_id.removeprefix('doc-')}-{table_idx:04d}"
- )
- caption = str(item.get("caption") or f"表格{table_idx}")
- table_caption = _to_list_str(item.get("table_caption"))
- if table_caption and not item.get("caption"):
- caption = table_caption[0]
- footnotes = _to_list_str(
- item.get("table_footnote") or item.get("footnotes")
- )
- table_body = item.get("table_body") or item.get("content") or ""
- rows = item.get("rows") if isinstance(item.get("rows"), list) else None
- (
- fmt,
- table_content,
- normalized_rows,
- inferred_num_rows,
- inferred_num_cols,
- ) = _coerce_table_rows(rows if rows is not None else table_body)
- rows = normalized_rows or (rows if isinstance(rows, list) else [])
- cite_text = (
- f'<cite type="table" refid="{table_id}">表{table_idx}</cite>'
- )
- blockid = _append_block(
- cite_text,
- heading=current_heading,
- level=current_level,
- parent_headings=current_parent_headings,
- )
- tables[table_id] = {
- "id": table_id,
- "blockid": blockid,
- "heading": current_heading,
- "dimension": [
- _parse_int(item.get("num_rows"), inferred_num_rows),
- _parse_int(item.get("num_cols"), inferred_num_cols),
- ],
- "format": fmt,
- "content": table_content,
- "caption": caption,
- "footnotes": footnotes,
- "image": item.get("img_path") or item.get("image"),
- }
- continue
- if item_type in {"image", "picture", "drawing"}:
- drawing_idx += 1
- drawing_id = str(
- item.get("id")
- or f"im-{doc_id.removeprefix('doc-')}-{drawing_idx:04d}"
- )
- image_caption = _to_list_str(
- item.get("image_caption") or item.get("captions")
- )
- caption = str(
- item.get("caption")
- or (image_caption[0] if image_caption else f"图{drawing_idx}")
- )
- footnotes = _to_list_str(
- item.get("image_footnote") or item.get("footnotes")
- )
- path_val = str(item.get("img_path") or item.get("path") or "")
- src_val = str(item.get("src") or "")
- fmt = (
- Path(path_val).suffix.lower().lstrip(".")
- if path_val
- else str(item.get("format") or "")
- )
- drawing_tag = (
- f'<drawing id="{drawing_id}" format="{fmt}" caption="{caption}" '
- f'path="{path_val}" src="{src_val}" />'
- )
- blockid = _append_block(
- drawing_tag,
- heading=current_heading,
- level=current_level,
- parent_headings=current_parent_headings,
- )
- drawings[drawing_id] = {
- "id": drawing_id,
- "blockid": blockid,
- "heading": current_heading,
- "format": fmt,
- "path": path_val,
- "src": src_val,
- "caption": caption,
- "footnotes": footnotes,
- }
- continue
- # Fallback: serialize unknown item to text for robustness.
- fallback_text = str(item.get("text") or item.get("content") or "").strip()
- if fallback_text:
- _append_block(
- fallback_text,
- heading=current_heading,
- level=current_level,
- parent_headings=current_parent_headings,
- )
- merged_text = "\n\n".join([x for x in merged_parts if x.strip()])
- doc_hash = hashlib.sha256(merged_text.encode("utf-8")).hexdigest()
- parse_time = datetime.now(timezone.utc).isoformat()
- meta = {
- "type": "meta",
- "format": "lightrag",
- "version": "1.0",
- "document_name": document_name,
- "document_format": Path(document_name).suffix.lower().lstrip("."),
- "document_hash": f"sha256:{doc_hash}",
- "table_file": bool(tables),
- "equation_file": bool(equations),
- "drawing_file": bool(drawings),
- "asset_dir": False,
- "split_option": {},
- "blocks": len(blocks_lines),
- "doc_id": doc_id,
- "parse_engine": engine,
- "parse_time": parse_time,
- "doc_title": Path(document_name).stem or document_name,
- }
- blocks_path.write_text(
- "\n".join([json.dumps(meta, ensure_ascii=False)] + blocks_lines) + "\n",
- encoding="utf-8",
- )
- if tables:
- tables_path.write_text(
- json.dumps(
- {"version": "1.0", "tables": tables}, ensure_ascii=False, indent=2
- ),
- encoding="utf-8",
- )
- if drawings:
- drawings_path.write_text(
- json.dumps(
- {"version": "1.0", "drawings": drawings},
- ensure_ascii=False,
- indent=2,
- ),
- encoding="utf-8",
- )
- if equations:
- equations_path.write_text(
- json.dumps(
- {"version": "1.0", "equations": equations},
- ensure_ascii=False,
- indent=2,
- ),
- encoding="utf-8",
- )
- # Keep full_docs in sync so restart/reprocess can directly use LightRAG Document.
- await self._persist_parsed_full_docs(
- doc_id,
- {
- "content": make_lightrag_doc_content(merged_text),
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "sidecar_location": sidecar_uri_for(parsed_dir),
- "parse_engine": engine,
- "update_time": int(time.time()),
- },
- )
- await archive_docx_source_after_full_docs_sync(
- self._resolve_source_file_for_parser(file_path)
- )
- return {
- "doc_id": doc_id,
- "file_path": file_path,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "content": merged_text,
- "blocks_path": str(blocks_path),
- }
- # ============================================================
- # Multimodal / VLM
- # ============================================================
- async def analyze_multimodal(
- self,
- doc_id: str,
- file_path: str,
- parsed_data: dict[str, Any],
- *,
- process_options: str | None = None,
- pipeline_status: dict | None = None,
- pipeline_status_lock: Any | None = None,
- ) -> dict[str, Any]:
- """Phase 2: Multimodal analysis (VLM). Writes llm_analyze_result to LightRAG Document.
- Per-document ``i`` / ``t`` / ``e`` flags from
- ``full_docs.process_options`` decide which modalities are sent to the
- VLM. Sidecars are always written by the parser regardless of these
- flags so toggling options later does not require re-parsing — only
- the ``llm_analyze_result`` payload is gated here.
- Per-item ``llm_analyze_result`` is recomputed and overwritten on each
- run for enabled modalities. This lets operators fix VLM/EXTRACT
- configuration or prompt limits and retry without manually clearing
- prior failure markers from the sidecar.
- Args:
- process_options: Optional override that bypasses the
- ``full_docs.process_options`` lookup; primarily used by unit
- tests that exercise the VLM analysis path without going
- through the enqueue pipeline.
- """
- from lightrag.parser.routing import parse_process_options
- blocks_path = parsed_data.get("blocks_path")
- if not blocks_path:
- parsed_data["analyzing_stage_skipped"] = True
- return parsed_data
- block_file = Path(blocks_path)
- if not block_file.exists():
- parsed_data["analyzing_stage_skipped"] = True
- return parsed_data
- # Resolve which modalities the user opted into for this document.
- if process_options is None:
- try:
- content_data = await self.full_docs.get_by_id(doc_id) or {}
- except Exception:
- content_data = {}
- options_str = (
- content_data.get("process_options")
- if isinstance(content_data, dict)
- else ""
- ) or ""
- else:
- options_str = process_options
- process_opts = parse_process_options(options_str)
- if not (process_opts.images or process_opts.tables or process_opts.equations):
- logger.debug(
- f"[analyze_multimodal] no i/t/e options set for d-id: {doc_id}; "
- f"skipping VLM analysis"
- )
- parsed_data["analyzing_stage_skipped"] = True
- return parsed_data
- # Diagnose opt-in vs sidecar mismatch up-front so users investigating
- # "why did VLM not run on my images" see a one-line INFO per document
- # instead of silent skips. Empty sidecars are a normal outcome
- # (some documents simply have no images/tables/equations), so this is
- # informational rather than a warning.
- sidecar_base = str(block_file)
- if sidecar_base.endswith(".blocks.jsonl"):
- sidecar_base = sidecar_base[: -len(".blocks.jsonl")]
- opt_in_missing: list[str] = []
- for opt_char, modality, suffix in (
- ("i", "drawings", ".drawings.json"),
- ("t", "tables", ".tables.json"),
- ("e", "equations", ".equations.json"),
- ):
- enabled = {
- "i": process_opts.images,
- "t": process_opts.tables,
- "e": process_opts.equations,
- }[opt_char]
- if enabled and not Path(sidecar_base + suffix).exists():
- opt_in_missing.append(f"{opt_char}:{modality}")
- if opt_in_missing:
- logger.info(
- f"[analyze_multimodal] {','.join(opt_in_missing)} sidecar empty: {doc_id}"
- )
- # Backfill sidecar `surrounding` for the enabled modalities just
- # before VLM consumption. Universal coverage: native, MinerU,
- # Docling, and pre-existing LightRAG documents reused from disk
- # all go through this single entrypoint. Idempotent: re-runs
- # overwrite with stable output given unchanged block content.
- enabled_modalities = {
- mod
- for mod, on in (
- ("drawings", process_opts.images),
- ("tables", process_opts.tables),
- ("equations", process_opts.equations),
- )
- if on
- }
- tokenizer = getattr(self, "tokenizer", None)
- if enabled_modalities and tokenizer is not None:
- try:
- from lightrag.multimodal_context import (
- enrich_sidecars_with_surrounding,
- )
- enrich_counts = enrich_sidecars_with_surrounding(
- blocks_path=str(block_file),
- enabled_modalities=enabled_modalities,
- tokenizer=tokenizer,
- )
- if any(enrich_counts.values()):
- logger.info(
- "[analyze_multimodal] "
- + ", ".join(f"{k}={v}" for k, v in enrich_counts.items() if v)
- + f" surrounding backfilled: {doc_id}"
- )
- except Exception as enrich_err:
- logger.warning(
- f"[analyze_multimodal] surrounding enrichment failed for "
- f"d-id: {doc_id}, file: {file_path}: {enrich_err}"
- )
- try:
- lines = block_file.read_text(encoding="utf-8").splitlines()
- if not lines:
- return parsed_data
- meta = json.loads(lines[0])
- if not isinstance(meta, dict) or meta.get("type") != "meta":
- return parsed_data
- from lightrag.llm._vision_utils import (
- image_audit_metadata,
- image_cache_metadata,
- normalize_image_inputs,
- read_image_dimensions,
- )
- from lightrag.prompt_multimodal import (
- IMAGE_TYPE_ENUM,
- IMAGE_TYPE_FALLBACK,
- MULTIMODAL_PROMPTS,
- )
- from lightrag.constants import (
- DEFAULT_MM_ANALYSIS_PRIORITY,
- DEFAULT_MM_IMAGE_MIN_PIXEL,
- DEFAULT_SUMMARY_LANGUAGE,
- )
- global_config = self._build_global_config()
- addon_params = global_config.get("addon_params") or {}
- language = (
- global_config.get("_resolved_summary_language")
- or addon_params.get("language")
- or DEFAULT_SUMMARY_LANGUAGE
- )
- vlm_process_enable = bool(global_config.get("vlm_process_enable", False))
- max_image_bytes = max(
- 256 * 1024,
- int(os.getenv("VLM_MAX_IMAGE_BYTES", str(5 * 1024 * 1024))),
- )
- min_image_pixel = max(
- 1,
- int(os.getenv("VLM_MIN_IMAGE_PIXEL", str(DEFAULT_MM_IMAGE_MIN_PIXEL))),
- )
- # Multimodal analysis shares the entity-extraction cache flag
- # (both run with mode="default" — see handle_cache short-circuit
- # in lightrag.utils). When the flag is off we must NOT save the
- # response either, otherwise stale cache entries would still
- # accumulate while reads are blocked. cache_id attachment to
- # the sidecar item.llm_cache_list is likewise gated so a
- # disabled cache does not seed cache-cleanup metadata that
- # corresponds to entries that were never persisted.
- analysis_cache_enabled = bool(
- global_config.get("enable_llm_cache_for_entity_extract")
- )
- use_vlm_func = self.role_llm_funcs.get("vlm")
- use_extract_func = self.role_llm_funcs.get("extract")
- vlm_cache_identity = get_llm_cache_identity(global_config, role="vlm")
- extract_cache_identity = get_llm_cache_identity(
- global_config, role="extract"
- )
- _IMAGE_TYPE_VALUES = set(IMAGE_TYPE_ENUM)
- _VLM_RASTER_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".webp"}
- def _json_extract(text: str) -> dict[str, Any]:
- """Tolerant JSON object recovery.
- Mirrors :func:`lightrag.operate._process_json_extraction_result`
- so weaker models that emit ```json ... ``` fenced output,
- trailing commas, or unquoted keys are still salvageable.
- The order of attempts is:
- 1. Strip a leading ```json fence if present.
- 2. Hand the cleaned string to ``json_repair.loads`` (handles
- minor structural slips like trailing commas).
- 3. Fall back to a greedy ``{...}`` regex slice for outputs
- that wrap the JSON object in prose, then re-run
- ``json_repair.loads`` on the slice.
- """
- if not text:
- return {}
- candidate = text.strip()
- fence_match = re.match(
- r"^```(?:json)?\s*\n(.*?)\n```$",
- candidate,
- re.DOTALL | re.IGNORECASE,
- )
- if fence_match:
- candidate = fence_match.group(1).strip()
- try:
- obj = json_repair.loads(candidate)
- if isinstance(obj, dict):
- return obj
- except Exception:
- pass
- m = re.search(r"\{[\s\S]*\}", candidate)
- if m:
- try:
- obj = json_repair.loads(m.group(0))
- if isinstance(obj, dict):
- return obj
- except Exception:
- pass
- return {}
- def _normalize_text(value: Any) -> str:
- if value is None:
- return ""
- if isinstance(value, str):
- return value.strip()
- if isinstance(value, (list, tuple)):
- return "\n".join(str(v).strip() for v in value if str(v).strip())
- return str(value).strip()
- def _captions_value(item_obj: dict[str, Any]) -> str:
- return _normalize_text(item_obj.get("caption")) or "n/a"
- def _footnotes_value(item_obj: dict[str, Any]) -> str:
- raw = item_obj.get("footnotes")
- if isinstance(raw, (list, tuple)):
- joined = "; ".join(str(v).strip() for v in raw if str(v).strip())
- return joined or "n/a"
- text = _normalize_text(raw)
- return text or "n/a"
- def _surrounding_value(item_obj: dict[str, Any], key: str) -> str:
- surrounding = item_obj.get("surrounding") or {}
- if not isinstance(surrounding, dict):
- return "n/a"
- value = _normalize_text(surrounding.get(key))
- return value or "n/a"
- def _resolve_image_path(
- path_str: str | None, sidecar_dir: Path
- ) -> Path | None:
- if not path_str:
- return None
- candidate = Path(path_str)
- if not candidate.is_absolute():
- sidecar_candidate = sidecar_dir / path_str
- if sidecar_candidate.exists() and sidecar_candidate.is_file():
- candidate = sidecar_candidate
- if candidate.exists() and candidate.is_file():
- return candidate
- return None
- def _failure_result(message: str) -> dict[str, Any]:
- return {
- "analyze_time": int(time.time()),
- "status": "failure",
- "message": message,
- }
- def _skipped_result(message: str) -> dict[str, Any]:
- return {
- "analyze_time": int(time.time()),
- "status": "skipped",
- "message": message,
- }
- async def _analyze_drawing(
- item_id: str, item: dict[str, Any], sidecar_dir: Path
- ) -> tuple[dict[str, Any], str | None]:
- path_str = (
- item.get("path") or item.get("img_path") or item.get("image_path")
- )
- candidate = _resolve_image_path(path_str, sidecar_dir)
- if candidate is None:
- return (
- _skipped_result(f"image file not found: {path_str or 'n/a'}"),
- None,
- )
- ext = candidate.suffix.lower()
- if ext not in _VLM_RASTER_EXTS:
- return (
- _skipped_result(f"unsupported image format: {ext}"),
- None,
- )
- dims = read_image_dimensions(candidate)
- if dims is not None and (
- dims[0] < min_image_pixel or dims[1] < min_image_pixel
- ):
- return (
- _skipped_result(
- f"image width or height is smaller than "
- f"{min_image_pixel}px"
- ),
- None,
- )
- if not vlm_process_enable or use_vlm_func is None:
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: VLM analysis required but "
- "VLM role is not available "
- "(VLM_PROCESS_ENABLE or vlm role config)"
- )
- try:
- raw = candidate.read_bytes()
- except OSError as exc:
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: cannot read image {candidate}: {exc}"
- ) from exc
- if not raw:
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: image file is empty"
- )
- if len(raw) > max_image_bytes:
- return (
- _skipped_result(
- f"image too large: {len(raw)} bytes "
- f"(limit {max_image_bytes})"
- ),
- None,
- )
- mime, _ = mimetypes.guess_type(str(candidate))
- mime = mime or "image/png"
- img_payload = {
- "base64": base64.b64encode(raw).decode("ascii"),
- "mime_type": mime,
- "source_id": item_id,
- "source_file": str(candidate),
- "modality": "image",
- "doc_id": doc_id,
- }
- normalized_images = normalize_image_inputs([img_payload])
- prompt = MULTIMODAL_PROMPTS["image_analysis"].format(
- language=language,
- content="",
- captions=_captions_value(item),
- footnotes=_footnotes_value(item),
- leading=_surrounding_value(item, "leading"),
- trailing=_surrounding_value(item, "trailing"),
- item_id=item_id,
- file_path=file_path,
- )
- args_hash = compute_args_hash(
- prompt,
- "",
- "",
- serialize_llm_cache_identity(vlm_cache_identity),
- _serialize_cache_variant({"type": "json_object"}),
- _serialize_cache_variant(image_cache_metadata(normalized_images)),
- "drawing",
- )
- cache_id = generate_cache_key("default", "analysis", args_hash)
- cached = await handle_cache(
- self.llm_response_cache,
- args_hash,
- prompt,
- mode="default",
- cache_type="analysis",
- )
- if cached is not None:
- result_text = cached[0]
- fresh = False
- else:
- try:
- result_text = await use_vlm_func(
- prompt,
- stream=False,
- image_inputs=[img_payload],
- _priority=DEFAULT_MM_ANALYSIS_PRIORITY,
- )
- except Exception as exc:
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: VLM call failed: {exc}"
- ) from exc
- fresh = True
- parsed = _json_extract(str(result_text))
- name = parsed.get("name")
- type_value = parsed.get("type")
- description = parsed.get("description")
- if not isinstance(name, str) or not name.strip():
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: missing or invalid field 'name'"
- )
- if not isinstance(description, str) or not description.strip():
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: missing or invalid field 'description'"
- )
- if not isinstance(type_value, str) or not type_value.strip():
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: missing or invalid field 'type'"
- )
- if type_value not in _IMAGE_TYPE_VALUES:
- type_value = IMAGE_TYPE_FALLBACK
- cache_id_to_attach: str | None = None
- if fresh and analysis_cache_enabled:
- audit_blob = image_audit_metadata(normalized_images)
- original_prompt = prompt + (
- f"\n<vlm_images>"
- f"{json.dumps(audit_blob, ensure_ascii=False)}"
- "</vlm_images>"
- if audit_blob
- else ""
- )
- await save_to_cache(
- self.llm_response_cache,
- CacheData(
- args_hash=args_hash,
- content=str(result_text),
- prompt=original_prompt,
- mode="default",
- cache_type="analysis",
- chunk_id=None,
- ),
- )
- cache_id_to_attach = cache_id
- elif not fresh:
- # Cache hit: the entry exists, so attaching its id is
- # safe (and necessary for document-delete cleanup).
- cache_id_to_attach = cache_id
- return (
- {
- "name": name.strip(),
- "type": type_value,
- "description": description.strip(),
- "analyze_time": int(time.time()),
- "status": "success",
- "message": "",
- },
- cache_id_to_attach,
- )
- async def _analyze_text_modality(
- kind: str, item_id: str, item: dict[str, Any]
- ) -> tuple[dict[str, Any], str | None]:
- if use_extract_func is None:
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: EXTRACT role is required but not configured"
- )
- content_text = _normalize_text(item.get("content"))
- if not content_text:
- if kind == "table":
- # Defensive fallback for sidecars that still carry
- # empty-bodied table items (e.g. produced by an older
- # parser run, or by a parser that doesn't filter
- # MinerU-style misidentified blanks). Don't abort the
- # whole worker — record the skip and move on.
- logger.warning(
- f"[analyze_multimodal] table/{item_id}: missing "
- f"table content; skipping analysis ({file_path})"
- )
- return (
- _skipped_result("missing table content"),
- None,
- )
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: missing {kind} content"
- )
- template = MULTIMODAL_PROMPTS[f"{kind}_analysis"]
- def _render(content_value: str) -> str:
- return template.format(
- language=language,
- content=content_value,
- captions=_captions_value(item),
- footnotes=_footnotes_value(item),
- leading=_surrounding_value(item, "leading"),
- trailing=_surrounding_value(item, "trailing"),
- item_id=item_id,
- file_path=file_path,
- )
- prompt = _render(content_text)
- # Cap the EXTRACT prompt at MAX_EXTRACT_INPUT_TOKENS by
- # trimming the (typically huge) sidecar `content` field — the
- # other slots (surrounding/captions/footnotes) already have
- # their own per-field caps upstream. The cap is resolved
- # from the env var (falling back to
- # DEFAULT_MAX_EXTRACT_INPUT_TOKENS) so deployments can tune
- # it for their model's context window.
- tokenizer = getattr(self, "tokenizer", None)
- if tokenizer is not None:
- from lightrag.constants import DEFAULT_MAX_EXTRACT_INPUT_TOKENS
- from lightrag.multimodal_context import trim_content_to_budget
- SAFETY_BUFFER = 256
- max_extract_tokens = get_env_value(
- "MAX_EXTRACT_INPUT_TOKENS",
- DEFAULT_MAX_EXTRACT_INPUT_TOKENS,
- int,
- )
- total_tokens = len(tokenizer.encode(prompt))
- if max_extract_tokens > 0 and total_tokens > max_extract_tokens:
- frame_tokens = len(tokenizer.encode(_render("")))
- content_budget = (
- max_extract_tokens - frame_tokens - SAFETY_BUFFER
- )
- if content_budget <= 0:
- # The prompt template alone (with empty content)
- # already exceeds the cap — no content trim can
- # bring the request under the limit. Fail this
- # item rather than handing the LLM a payload we
- # know will trigger ``context_length_exceeded``.
- # Operators must raise MAX_EXTRACT_INPUT_TOKENS
- # above the template frame for analysis to
- # succeed; the document is reprocessable
- # idempotently once the cap is widened.
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: prompt frame "
- f"({frame_tokens} tokens) exceeds "
- f"MAX_EXTRACT_INPUT_TOKENS "
- f"({max_extract_tokens}); raise the cap"
- )
- trimmed, was_trimmed = trim_content_to_budget(
- content_text,
- kind=f"{kind}s",
- max_tokens=content_budget,
- tokenizer=tokenizer,
- )
- if was_trimmed:
- prompt = _render(trimmed)
- logger.warning(
- f"[analyze_multimodal] {kind}/{item_id} "
- f"content trimmed (prompt {total_tokens} "
- f"→ fit {max_extract_tokens}, "
- f"content_budget={content_budget})"
- )
- # Post-trim hard guard: ``trim_content_to_budget``
- # is constrained by ``content_budget`` so the final
- # prompt should fit within ``max_extract_tokens``;
- # defend against tokenizer rounding / future template
- # changes that could push it over. Refuse the call
- # rather than send an over-cap prompt to the LLM.
- final_tokens = len(tokenizer.encode(prompt))
- if final_tokens > max_extract_tokens:
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: trimmed prompt "
- f"({final_tokens} tokens) still exceeds "
- f"MAX_EXTRACT_INPUT_TOKENS "
- f"({max_extract_tokens})"
- )
- args_hash = compute_args_hash(
- prompt,
- "",
- "",
- serialize_llm_cache_identity(extract_cache_identity),
- _serialize_cache_variant({"type": "json_object"}),
- _serialize_cache_variant([]),
- kind,
- )
- cache_id = generate_cache_key("default", "analysis", args_hash)
- cached = await handle_cache(
- self.llm_response_cache,
- args_hash,
- prompt,
- mode="default",
- cache_type="analysis",
- )
- if cached is not None:
- result_text = cached[0]
- fresh = False
- else:
- try:
- result_text = await use_extract_func(
- prompt,
- stream=False,
- response_format={"type": "json_object"},
- _priority=DEFAULT_MM_ANALYSIS_PRIORITY,
- )
- except Exception as exc:
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: EXTRACT call failed: {exc}"
- ) from exc
- fresh = True
- parsed = _json_extract(str(result_text))
- name = parsed.get("name")
- description = parsed.get("description")
- if not isinstance(name, str) or not name.strip():
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: missing or invalid field 'name'"
- )
- if not isinstance(description, str) or not description.strip():
- raise MultimodalAnalysisError(
- f"{kind}/{item_id}: missing or invalid field 'description'"
- )
- result_obj: dict[str, Any] = {
- "name": name.strip(),
- "description": description.strip(),
- "analyze_time": int(time.time()),
- "status": "success",
- "message": "",
- }
- if kind == "equation":
- equation_value = parsed.get("equation")
- if (
- not isinstance(equation_value, str)
- or not equation_value.strip()
- ):
- raise MultimodalAnalysisError(
- f"equation/{item_id}: missing or invalid field 'equation'"
- )
- result_obj["equation"] = equation_value.strip()
- cache_id_to_attach: str | None = None
- if fresh and analysis_cache_enabled:
- await save_to_cache(
- self.llm_response_cache,
- CacheData(
- args_hash=args_hash,
- content=str(result_text),
- prompt=prompt,
- mode="default",
- cache_type="analysis",
- chunk_id=None,
- ),
- )
- cache_id_to_attach = cache_id
- elif not fresh:
- # Cache hit path (handle_cache already gated by flag):
- # safe to surface the existing cache_id for cleanup.
- cache_id_to_attach = cache_id
- return (result_obj, cache_id_to_attach)
- def _attach_cache_id(
- item_obj: dict[str, Any], cache_id: str | None
- ) -> None:
- if not cache_id:
- return
- existing = item_obj.get("llm_cache_list")
- if not isinstance(existing, list):
- existing = []
- if cache_id not in existing:
- existing.append(cache_id)
- item_obj["llm_cache_list"] = existing
- async def _run_with_progress_log(coro, kind: str, item_id: str):
- """Append per-item completion log to pipeline_status the moment
- this single ``_analyze_*`` task finishes — not after the whole
- ``asyncio.gather`` batch returns — so the UI sees each
- drawing/table/equation result land in real time.
- Skipped items are demoted to debug-only logs and do NOT write
- pipeline_status — benign skips (image too small / wrong format
- / missing table body) otherwise flood the UI history for docs
- with many items. The per-item ``llm_analyze_result.message``
- still records why the item was skipped."""
- try:
- result = await coro
- except Exception:
- log_message = f"Analyzing {kind}/{item_id}: failed"
- logger.warning(log_message)
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- raise
- result_obj = result[0] if isinstance(result, tuple) else {}
- is_success = (
- isinstance(result_obj, dict)
- and result_obj.get("status") == "success"
- )
- if is_success:
- log_message = f"Analyzing {kind}/{item_id}: ok"
- logger.info(log_message)
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- else:
- logger.debug(f"Analyzing {kind}/{item_id}: skipped")
- return result
- base_name = str(block_file)
- if base_name.endswith(".blocks.jsonl"):
- base_name = base_name[: -len(".blocks.jsonl")]
- sidecars = [
- (
- Path(base_name + ".drawings.json"),
- "drawings",
- "drawing",
- process_opts.images,
- ),
- (
- Path(base_name + ".tables.json"),
- "tables",
- "table",
- process_opts.tables,
- ),
- (
- Path(base_name + ".equations.json"),
- "equations",
- "equation",
- process_opts.equations,
- ),
- ]
- start_logged = False
- for sidecar_path, root_key, kind, enabled in sidecars:
- if not enabled or not sidecar_path.exists():
- continue
- try:
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- except Exception as exc:
- raise MultimodalAnalysisError(
- f"failed to read sidecar {sidecar_path}: {exc}"
- ) from exc
- items = payload.get(root_key, {})
- if not isinstance(items, dict):
- continue
- if (
- items
- and not start_logged
- and pipeline_status is not None
- and pipeline_status_lock is not None
- ):
- async with pipeline_status_lock:
- log_message = f"Analyzing multimodal: {doc_id}"
- logger.info(log_message)
- pipeline_status["latest_message"] = log_message
- pipeline_status["history_messages"].append(log_message)
- start_logged = True
- # Pre-schedule cancellation check: if the user cancelled
- # between _analyze_worker's boundary check and the moment
- # we are about to spawn VLM tasks for this sidecar, raise
- # here so no item task ever runs. Without this we'd briefly
- # create tasks and then cancel them on the very first poll
- # iteration — wasteful and harder to reason about.
- if pipeline_status is not None and pipeline_status_lock is not None:
- await self._raise_if_cancelled(
- pipeline_status, pipeline_status_lock
- )
- task_meta: dict[asyncio.Task, tuple[str, dict]] = {}
- for item_id, item in items.items():
- if not isinstance(item, dict):
- continue
- if kind == "drawing":
- inner_coro = _analyze_drawing(
- item_id, item, sidecar_path.parent
- )
- else:
- inner_coro = _analyze_text_modality(kind, item_id, item)
- task = asyncio.create_task(
- _run_with_progress_log(inner_coro, kind, item_id)
- )
- task_meta[task] = (item_id, item)
- if not task_meta:
- # No valid items in this sidecar — asyncio.wait([]) would
- # ValueError, so skip the wait loop entirely.
- continue
- # Fail-fast polling loop. Three trigger paths:
- # 1. an item task raises (e.g. MultimodalAnalysisError) →
- # asyncio.wait returns early via FIRST_EXCEPTION;
- # 2. an item task raises PipelineCancelledException →
- # same path, preserving the exception type;
- # 3. user clicks /cancel_pipeline mid-VLM → the
- # cancellation_requested check at the top of the next
- # poll iteration (≤ POLL_INTERVAL_SECONDS) fabricates
- # a PipelineCancelledException.
- #
- # Do NOT add a watcher coroutine to the wait set: it would be
- # an infinite loop that stays pending when all items succeed,
- # preventing FIRST_EXCEPTION from ever returning.
- pending: set[asyncio.Task] = set(task_meta.keys())
- fail_fast_exc: BaseException | None = None
- POLL_INTERVAL_SECONDS = 0.5
- while pending:
- if (
- pipeline_status is not None
- and pipeline_status_lock is not None
- and await self._cancellation_requested(
- pipeline_status, pipeline_status_lock
- )
- ):
- fail_fast_exc = PipelineCancelledException(
- "User cancelled during analyze"
- )
- break
- done_now, pending = await asyncio.wait(
- pending,
- timeout=POLL_INTERVAL_SECONDS,
- return_when=asyncio.FIRST_EXCEPTION,
- )
- for t in done_now:
- if t.cancelled():
- continue
- texc = t.exception()
- if texc is not None:
- # Preserve original exception type so the
- # _analyze_worker except dispatch can distinguish
- # PipelineCancelledException from
- # MultimodalAnalysisError.
- fail_fast_exc = texc
- break
- if fail_fast_exc is not None:
- break
- # If we broke early, cancel the still-running tasks.
- for t in pending:
- t.cancel()
- if pending:
- await asyncio.gather(*pending, return_exceptions=True)
- # Collect results — preserve completed successes so reprocess
- # can hit the LLM cache instead of re-running the VLM.
- for t, (item_id, item) in task_meta.items():
- if t.cancelled():
- item["llm_analyze_result"] = _failure_result("cancelled")
- continue
- texc = t.exception()
- if texc is None:
- result_obj, cache_id = t.result()
- item["llm_analyze_result"] = result_obj
- _attach_cache_id(item, cache_id)
- elif isinstance(texc, PipelineCancelledException):
- item["llm_analyze_result"] = _failure_result("cancelled")
- elif isinstance(texc, MultimodalAnalysisError):
- item["llm_analyze_result"] = _failure_result(str(texc))
- else:
- item["llm_analyze_result"] = _failure_result(
- f"unexpected error: {texc}"
- )
- try:
- sidecar_path.write_text(
- json.dumps(payload, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- except OSError as exc:
- logger.warning(
- f"[analyze_multimodal] failed to write sidecar "
- f"{sidecar_path}: {exc}"
- )
- if fail_fast_exc is not None:
- # Best-effort cache flush so any cache_ids written by
- # already-completed sibling tasks survive a restart —
- # otherwise the sidecar references cache rows that
- # haven't been persisted yet. Mirrors
- # _finalize_doc_failure's PROCESS-stage behaviour.
- if self.llm_response_cache:
- try:
- await self.llm_response_cache.index_done_callback()
- except Exception as persist_error:
- logger.error(
- f"Failed to persist LLM cache after analyze "
- f"fail-fast: {persist_error}"
- )
- raise fail_fast_exc
- parsed_data["multimodal_processed"] = True
- logger.info(f"[analyze_multimodal] completed for d-id: {doc_id}")
- except PipelineCancelledException:
- # Must re-raise BEFORE the generic Exception handler below,
- # otherwise the doc would be returned as if analyze succeeded
- # and would advance to PROCESS instead of being marked FAILED.
- raise
- except MultimodalAnalysisError:
- raise
- except Exception as e:
- logger.warning(f"[analyze_multimodal] failed for d-id: {doc_id}: {e}")
- return parsed_data
- def _build_mm_chunks_from_sidecars(
- self,
- doc_id: str,
- file_path: str,
- blocks_path: str,
- base_order_index: int,
- process_options: str | None = None,
- ) -> list[dict[str, Any]]:
- """Build multimodal chunks from sidecars carrying analysis results.
- Only items whose ``llm_analyze_result.status == "success"`` produce
- chunks. ``"skipped"`` items are silently ignored; ``"failure"``
- items raise :class:`MultimodalAnalysisError` so the document is
- marked failed (a failure should already have aborted the analyze
- phase — this is a defensive recheck).
- Each chunk follows the new schema: nested ``heading`` and
- ``sidecar`` dicts, no flat ``parent_headings`` / ``level`` /
- ``content_type`` fields. ``llm_cache_list`` is merged from the
- underlying sidecar item so document deletion can clean up the
- ``cache_type="analysis"`` entries it created.
- ``process_options`` gates which modality sidecars are read: a
- document re-processed after opting out of ``i`` / ``t`` / ``e``
- must NOT pick up stale success results from a prior pass. When
- ``None`` (e.g. ad-hoc unit tests), every modality is considered.
- Raises:
- MultimodalAnalysisError: when an item carries ``status="failure"``,
- or when the multimodal chunk cannot be fit under the
- extraction token budget even after truncating description
- to :data:`DEFAULT_MM_CHUNK_DESCRIPTION_MIN_TOKENS`.
- """
- from lightrag.constants import (
- DEFAULT_MAX_EXTRACT_INPUT_TOKENS,
- DEFAULT_MM_CHUNK_DESCRIPTION_MIN_TOKENS,
- )
- from lightrag.parser.routing import parse_process_options
- block_file = Path(blocks_path)
- if not block_file.exists():
- return []
- base = str(block_file)
- if base.endswith(".blocks.jsonl"):
- base = base[: -len(".blocks.jsonl")]
- if process_options is None:
- allowed = {"drawing", "table", "equation"}
- else:
- opts = parse_process_options(process_options)
- allowed = set()
- if opts.images:
- allowed.add("drawing")
- if opts.tables:
- allowed.add("table")
- if opts.equations:
- allowed.add("equation")
- sidecar_defs = [
- (root, Path(base + suffix), kind)
- for root, suffix, kind in (
- ("drawings", ".drawings.json", "drawing"),
- ("tables", ".tables.json", "table"),
- ("equations", ".equations.json", "equation"),
- )
- if kind in allowed
- ]
- mm_chunks: list[dict[str, Any]] = []
- order = base_order_index
- def _norm_str_list(v: Any) -> list[str]:
- if v is None:
- return []
- if isinstance(v, list):
- return [str(x).strip() for x in v if str(x).strip()]
- s = str(v).strip()
- return [s] if s else []
- def _norm_parent_headings(value: Any) -> list[str]:
- if not isinstance(value, list):
- return []
- return [str(p).strip() for p in value if str(p or "").strip()]
- def _build_heading_dict(item: dict[str, Any]) -> dict[str, Any] | None:
- heading_raw = item.get("heading")
- if isinstance(heading_raw, dict):
- heading_text = str(heading_raw.get("heading") or "").strip()
- parents = _norm_parent_headings(heading_raw.get("parent_headings"))
- try:
- level = int(heading_raw.get("level") or 0)
- except (TypeError, ValueError):
- level = 0
- else:
- heading_text = str(heading_raw or "").strip()
- parents = _norm_parent_headings(item.get("parent_headings"))
- try:
- level = int(item.get("level") or 0)
- except (TypeError, ValueError):
- level = 0
- if not heading_text and not parents and level == 0:
- return None
- return {
- "level": level,
- "heading": heading_text,
- "parent_headings": parents,
- }
- def _render(
- kind: str,
- name: str,
- image_type: str,
- description: str,
- footnotes_joined: str,
- equation_body: str,
- ) -> str:
- # NOTE: the `[Image Name]` / `[Table Name]` / `[Equation Name]`
- # leading labels below are a contract consumed by
- # ``lightrag.operate._parse_mm_display_name`` (regex
- # ``_MM_DISPLAY_NAME_PATTERN``). If you rename or restructure
- # these labels, update that regex too, or relation descriptions
- # will silently fall back to sidecar ids. The
- # ``test_parse_mm_display_name_on_real_builder_output``
- # regression pins this contract end-to-end.
- if kind == "drawing":
- head = f"[Image Name]{name}\n[Image Type]{image_type}"
- footnote_label = "Image Footnotes"
- elif kind == "table":
- head = f"[Table Name]{name}"
- footnote_label = "Table Footnotes"
- else: # equation
- head = f"{equation_body}\n[Equation Name]{name}"
- footnote_label = "Equation Footnotes"
- sections = [head, description]
- if footnotes_joined:
- sections.append(f"[{footnote_label}]{footnotes_joined}")
- return "\n\n".join(s for s in sections if s).strip()
- max_tokens = DEFAULT_MAX_EXTRACT_INPUT_TOKENS
- min_desc_tokens = DEFAULT_MM_CHUNK_DESCRIPTION_MIN_TOKENS
- for root_key, sidecar_path, kind in sidecar_defs:
- if not sidecar_path.exists():
- continue
- try:
- payload = json.loads(sidecar_path.read_text(encoding="utf-8"))
- except Exception:
- continue
- items = payload.get(root_key, {})
- if not isinstance(items, dict):
- continue
- for local_idx, (item_id, item) in enumerate(items.items()):
- if not isinstance(item, dict):
- continue
- analysis = item.get("llm_analyze_result")
- if not isinstance(analysis, dict):
- continue
- status = analysis.get("status")
- if status == "skipped":
- continue
- if status == "failure":
- raise MultimodalAnalysisError(
- f"{root_key}/{item_id}: llm_analyze_result.status='failure' "
- f"({analysis.get('message') or 'no message'})"
- )
- if status != "success":
- # Treat unknown / legacy status as missing — no chunk.
- continue
- name = str(analysis.get("name") or "").strip()
- description = str(analysis.get("description") or "").strip()
- equation_body = str(analysis.get("equation") or "").strip()
- image_type = str(analysis.get("type") or "").strip()
- if not name:
- raise MultimodalAnalysisError(
- f"{root_key}/{item_id}: success result missing 'name'"
- )
- if not description:
- raise MultimodalAnalysisError(
- f"{root_key}/{item_id}: success result missing 'description'"
- )
- if kind == "drawing" and not image_type:
- raise MultimodalAnalysisError(
- f"drawings/{item_id}: success result missing 'type'"
- )
- if kind == "equation" and not equation_body:
- raise MultimodalAnalysisError(
- f"equations/{item_id}: success result missing 'equation'"
- )
- footnotes_list = _norm_str_list(item.get("footnotes"))
- footnotes_joined = "; ".join(footnotes_list)
- def _compose(desc: str) -> str:
- return _render(
- kind=kind,
- name=name,
- image_type=image_type,
- description=desc,
- footnotes_joined=footnotes_joined,
- equation_body=equation_body,
- )
- chunk_content = _compose(description)
- tokens = len(self.tokenizer.encode(chunk_content))
- if tokens > max_tokens:
- # Truncate only the description, never name/type/equation.
- desc_tokens = self.tokenizer.encode(description)
- overflow = tokens - max_tokens
- keep = max(min_desc_tokens, len(desc_tokens) - overflow)
- while True:
- truncated_desc = self.tokenizer.decode(desc_tokens[:keep])
- chunk_content = _compose(truncated_desc)
- tokens = len(self.tokenizer.encode(chunk_content))
- if tokens <= max_tokens or keep <= min_desc_tokens:
- break
- keep = max(min_desc_tokens, keep - (tokens - max_tokens))
- if tokens > max_tokens:
- raise MultimodalAnalysisError(
- f"{root_key}/{item_id}: multimodal chunk exceeds "
- f"{max_tokens} tokens even after truncating description "
- f"to {min_desc_tokens} tokens"
- )
- if not chunk_content:
- continue
- heading_dict = _build_heading_dict(item)
- sidecar_block = {
- "type": kind,
- "id": str(item_id),
- "refs": [{"type": kind, "id": str(item_id)}],
- }
- cache_list = item.get("llm_cache_list")
- cache_list = (
- [str(c) for c in cache_list if str(c).strip()]
- if isinstance(cache_list, list)
- else []
- )
- chunk_dict: dict[str, Any] = {
- "chunk_id": f"{doc_id}-mm-{kind}-{local_idx:03d}",
- "chunk_order_index": order,
- "content": chunk_content,
- "tokens": tokens,
- "sidecar": sidecar_block,
- "llm_cache_list": cache_list,
- }
- if heading_dict is not None:
- chunk_dict["heading"] = heading_dict
- mm_chunks.append(chunk_dict)
- order += 1
- return mm_chunks
|