| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574 |
- """
- This module contains all document-related routes for the LightRAG API.
- """
- import asyncio
- import re
- import shutil
- import time
- from uuid import uuid4
- from lightrag.utils import logger, get_pinyin_sort_key, performance_timing_log
- import aiofiles
- import traceback
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Dict, List, Optional, Any, Literal
- from io import BytesIO
- from fastapi import (
- APIRouter,
- BackgroundTasks,
- Depends,
- File,
- HTTPException,
- UploadFile,
- )
- from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
- from lightrag import LightRAG
- from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
- from lightrag.constants import (
- FULL_DOCS_FORMAT_PENDING_PARSE,
- PARSER_ENGINE_LEGACY,
- PARSED_ARTIFACT_DIR_SUFFIXES,
- PARSED_DIR_NAME,
- PROCESS_OPTION_CHUNK_FIXED,
- PROCESS_OPTION_CHUNK_PARAGRAH,
- PROCESS_OPTION_CHUNK_RECURSIVE,
- PROCESS_OPTION_CHUNK_VECTOR,
- )
- from lightrag.parser.routing import (
- FilenameParserHintError,
- canonicalize_parser_hinted_basename,
- chunk_strategy_key,
- filename_parser_hint,
- resolve_chunk_options,
- resolve_file_parser_directives,
- )
- from lightrag.utils import (
- generate_track_id,
- move_file_to_parsed_dir,
- )
- from lightrag.api.utils_api import get_combined_auth_dependency
- from ..config import global_args
- # Function to format datetime to ISO format string with timezone information
- def format_datetime(dt: Any) -> Optional[str]:
- """Format datetime to ISO format string with timezone information
- Args:
- dt: Datetime object, string, or None
- Returns:
- ISO format string with timezone information, or None if input is None
- """
- if dt is None:
- return None
- if isinstance(dt, str):
- return dt
- # Check if datetime object has timezone information
- if isinstance(dt, datetime):
- # If datetime object has no timezone info (naive datetime), add UTC timezone
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- # Return ISO format string with timezone information
- return dt.isoformat()
- # NOTE: the APIRouter instance is created INSIDE `create_document_routes`
- # (not at module scope). A module-level router is shared across processes,
- # and re-running the factory — which the test suite does to validate
- # create_app for different `--api-prefix` values — would re-decorate the
- # same router each time, accumulating duplicate routes and triggering
- # FastAPI's "Duplicate Operation ID" warnings.
- # Temporary file prefix
- temp_prefix = "__tmp__"
- UNKNOWN_FILE_SOURCE = "unknown_source"
- LEGACY_EMPTY_FILE_PATH_SENTINELS = {"", "no-file-path"}
- ARCHIVED_FILE_SUFFIX_RE = re.compile(r"_(?:\d{3}|\d{10,})$")
- def normalize_file_path(file_path: str | None) -> str:
- """Normalize missing document sources to a single non-null sentinel."""
- if file_path is None:
- return UNKNOWN_FILE_SOURCE
- normalized = file_path.strip()
- if normalized in LEGACY_EMPTY_FILE_PATH_SENTINELS:
- return UNKNOWN_FILE_SOURCE
- return canonicalize_parser_hinted_basename(normalized) or UNKNOWN_FILE_SOURCE
- def is_valid_file_source(file_source: str | None) -> bool:
- if file_source is None:
- return False
- return normalize_file_path(file_source) != UNKNOWN_FILE_SOURCE
- def sanitize_filename(filename: str, input_dir: Path) -> str:
- """
- Sanitize uploaded filename to prevent Path Traversal attacks.
- Args:
- filename: The original filename from the upload
- input_dir: The target input directory
- Returns:
- str: Sanitized filename that is safe to use
- Raises:
- HTTPException: If the filename is unsafe or invalid
- """
- # Basic validation
- if not filename or not filename.strip():
- raise HTTPException(status_code=400, detail="Filename cannot be empty")
- # Remove path separators and traversal sequences
- clean_name = filename.replace("/", "").replace("\\", "")
- clean_name = clean_name.replace("..", "")
- # Remove control characters and null bytes
- clean_name = "".join(c for c in clean_name if ord(c) >= 32 and c != "\x7f")
- # Remove leading/trailing whitespace and dots
- clean_name = clean_name.strip().strip(".")
- # Check if anything is left after sanitization
- if not clean_name:
- raise HTTPException(
- status_code=400, detail="Invalid filename after sanitization"
- )
- # Verify the final path stays within the input directory
- try:
- final_path = (input_dir / clean_name).resolve()
- if not final_path.is_relative_to(input_dir.resolve()):
- raise HTTPException(status_code=400, detail="Unsafe filename detected")
- except (OSError, ValueError):
- raise HTTPException(status_code=400, detail="Invalid filename")
- return clean_name
- class ScanResponse(BaseModel):
- """Response model for document scanning operation
- Attributes:
- status: Status of the scanning operation. ``scanning_started`` when
- a new background scan has been scheduled;
- ``scanning_skipped_pipeline_busy`` when the request was rejected
- because indexing or another scan is already running.
- message: Optional message with additional details
- track_id: Tracking ID for monitoring scanning progress
- """
- status: Literal["scanning_started", "scanning_skipped_pipeline_busy"] = Field(
- description="Status of the scanning operation"
- )
- message: Optional[str] = Field(
- default=None, description="Additional details about the scanning operation"
- )
- track_id: str = Field(description="Tracking ID for monitoring scanning progress")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status": "scanning_started",
- "message": "Scanning process has been initiated in the background",
- "track_id": "scan_20250729_170612_abc123",
- }
- }
- )
- class ReprocessResponse(BaseModel):
- """Response model for reprocessing failed documents operation
- Attributes:
- status: Status of the reprocessing operation
- message: Message describing the operation result
- track_id: Always empty string. Reprocessed documents retain their original track_id.
- """
- status: Literal["reprocessing_started"] = Field(
- description="Status of the reprocessing operation"
- )
- message: str = Field(description="Human-readable message describing the operation")
- track_id: str = Field(
- default="",
- description="Always empty string. Reprocessed documents retain their original track_id from initial upload.",
- )
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status": "reprocessing_started",
- "message": "Reprocessing of failed documents has been initiated in background",
- "track_id": "",
- }
- }
- )
- class CancelPipelineResponse(BaseModel):
- """Response model for pipeline cancellation operation
- Attributes:
- status: Status of the cancellation request
- message: Message describing the operation result
- """
- status: Literal["cancellation_requested", "not_busy"] = Field(
- description="Status of the cancellation request"
- )
- message: str = Field(description="Human-readable message describing the operation")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status": "cancellation_requested",
- "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
- }
- }
- )
- TextChunkingStrategy = Literal[
- "fixed_token",
- "recursive_character",
- "semantic_vector",
- "paragraph_semantic",
- ]
- class _StrictChunkParams(BaseModel):
- """Base for per-strategy chunking params.
- ``strict=True`` rejects the Pydantic-v2 lax coercions that would
- otherwise let malformed requests through and fail later in the
- background chunker: bool-as-int (``true`` -> 1), numeric strings
- (``"5"`` -> 5), float-as-int. ``extra="forbid"`` turns unknown keys
- into a 422 (replacing a hand-rolled allow-list). ``chunk_token_size``
- is shared by every strategy; ``None`` means "not supplied — fall back
- to ``addon_params``/env default at process time".
- """
- model_config = ConfigDict(extra="forbid", strict=True)
- chunk_token_size: Optional[int] = Field(default=None, ge=1)
- class _OverlapChunkParams(_StrictChunkParams):
- chunk_overlap_token_size: Optional[int] = Field(default=None, ge=0)
- @model_validator(mode="after")
- def _overlap_lt_size(self) -> "_OverlapChunkParams":
- # Only enforceable when BOTH are explicit; when chunk_token_size
- # is None the effective size is resolved from addon_params/env at
- # process time and can't be compared against here.
- if (
- self.chunk_token_size is not None
- and self.chunk_overlap_token_size is not None
- and self.chunk_overlap_token_size >= self.chunk_token_size
- ):
- raise ValueError("chunk_overlap_token_size must be < chunk_token_size")
- return self
- class FixedTokenChunkParams(_OverlapChunkParams):
- split_by_character: Optional[str] = None
- split_by_character_only: Optional[bool] = None
- class RecursiveCharacterChunkParams(_OverlapChunkParams):
- separators: Optional[list[str]] = None
- class ParagraphSemanticChunkParams(_OverlapChunkParams):
- pass
- class SemanticVectorChunkParams(_StrictChunkParams):
- # Enum verified against the installed langchain_experimental
- # (text_splitter.py ``BreakpointThresholdType``), not from memory.
- breakpoint_threshold_type: Optional[
- Literal["percentile", "standard_deviation", "interquartile", "gradient"]
- ] = None
- # A strict ``float`` field still accepts an ``int`` (e.g. JSON ``95``) and
- # widens it losslessly to ``95.0`` — strict only rejects ``str`` / ``bool``
- # here, which is exactly what we want. Do NOT relax strict (that would let
- # numeric strings through) or switch to ``int | float`` (that would stop
- # normalizing ints to float). Locked by tests in test_document_routes_chunking.
- breakpoint_threshold_amount: Optional[float] = None
- buffer_size: Optional[int] = Field(default=None, ge=1)
- sentence_split_regex: Optional[str] = None
- @field_validator("sentence_split_regex")
- @classmethod
- def _valid_sentence_split_regex(cls, v: Optional[str]) -> Optional[str]:
- # The value is fed to LangChain's SemanticChunker and compiled during
- # split_text. A malformed pattern (e.g. "(") would only blow up in the
- # background, so compile it here to reject synchronously (HTTP 422).
- if v is None:
- return v
- try:
- re.compile(v)
- except re.error as exc:
- raise ValueError(
- f"sentence_split_regex is not a valid regular expression: {exc}"
- ) from exc
- return v
- @model_validator(mode="after")
- def _amount_in_range(self) -> "SemanticVectorChunkParams":
- amt = self.breakpoint_threshold_amount
- if amt is None:
- return self
- # ``> 0`` is type-independent (every threshold type wants a positive
- # magnitude), so it is safe to enforce at parse time.
- if amt <= 0:
- raise ValueError("breakpoint_threshold_amount must be > 0")
- # The ``(0, 100]`` ceiling is percentile/gradient-specific (those feed
- # np.percentile, which requires q in [0, 100]). It depends on the
- # threshold TYPE, so only enforce it here when the type is supplied in
- # the SAME request. When the type is omitted, the effective type is
- # resolved from addon_params/env later — assuming "percentile" here
- # would wrongly 422 a partial override that inherits
- # standard_deviation/interquartile (which allow amounts > 100). The
- # ceiling against the merged type is applied by
- # ``_validate_effective_semantic_amount`` in ``_resolve_text_chunking``.
- if self.breakpoint_threshold_type in ("percentile", "gradient") and amt > 100:
- raise ValueError(
- "breakpoint_threshold_amount must be within (0, 100] "
- "for percentile/gradient"
- )
- return self
- _CHUNKING_PARAMS_MODEL: dict[str, type[_StrictChunkParams]] = {
- "fixed_token": FixedTokenChunkParams,
- "recursive_character": RecursiveCharacterChunkParams,
- "semantic_vector": SemanticVectorChunkParams,
- "paragraph_semantic": ParagraphSemanticChunkParams,
- }
- class TextChunkingConfig(BaseModel):
- """Chunking strategy + strategy-specific params for a text insert.
- Validation is delegated to the per-strategy typed model so unknown
- keys, wrong types, and out-of-range values all raise synchronously
- during request parsing (HTTP 422) — never later in the background
- indexing task, where the HTTP response has already been sent.
- """
- model_config = ConfigDict(extra="forbid")
- strategy: TextChunkingStrategy = "fixed_token"
- params: Dict[str, Any] = Field(default_factory=dict)
- @model_validator(mode="after")
- def _validate_params(self) -> "TextChunkingConfig":
- typed = _CHUNKING_PARAMS_MODEL[self.strategy].model_validate(self.params)
- # Normalize down to exactly the keys the caller supplied with a real
- # value (validated + coerced) so the enqueue-time merge overrides only
- # what was set. ``exclude_none`` additionally drops explicit nulls:
- # every param field means "inherit the addon_params/env default" when
- # None, so an explicit ``"chunk_token_size": null`` must NOT be merged
- # over the resolved default — otherwise the route would 200 and the
- # background chunker would do ``int(None)`` and fail the document.
- self.params = typed.model_dump(exclude_unset=True, exclude_none=True)
- return self
- class InsertTextRequest(BaseModel):
- """Request model for inserting a single text document
- Attributes:
- text: The text content to be inserted into the RAG system
- file_source: Source of the text (optional)
- chunking: Optional chunking strategy + params; omit to keep the
- default fixed-token behavior and addon_params defaults.
- """
- text: str = Field(
- min_length=1,
- description="The text to insert",
- )
- file_source: Optional[str] = Field(
- default=None, min_length=0, description="File Source"
- )
- chunking: Optional[TextChunkingConfig] = Field(
- default=None,
- description="Chunking strategy and params; omit for default fixed-token chunking",
- )
- @field_validator("text", mode="after")
- @classmethod
- def strip_text_after(cls, text: str) -> str:
- return text.strip()
- @field_validator("file_source", mode="before")
- @classmethod
- def normalize_source_before(cls, file_source: Optional[str]) -> str:
- return normalize_file_path(file_source)
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "text": "This is a sample text to be inserted into the RAG system.",
- "file_source": "Source of the text (optional)",
- "chunking": {
- "strategy": "fixed_token",
- "params": {
- "chunk_token_size": 1200,
- "chunk_overlap_token_size": 100,
- "split_by_character": "\n\n",
- "split_by_character_only": True,
- },
- },
- }
- }
- )
- class InsertTextsRequest(BaseModel):
- """Request model for inserting multiple text documents
- Attributes:
- texts: List of text contents to be inserted into the RAG system
- file_sources: Sources of the texts (optional)
- """
- texts: list[str] = Field(
- min_length=1,
- description="The texts to insert",
- )
- file_sources: Optional[list[str]] = Field(
- default=None, min_length=0, description="Sources of the texts"
- )
- chunking: Optional[TextChunkingConfig] = Field(
- default=None,
- description="Shared chunking strategy and params for all texts; omit for default fixed-token chunking",
- )
- @field_validator("texts", mode="after")
- @classmethod
- def strip_texts_after(cls, texts: list[str]) -> list[str]:
- return [text.strip() for text in texts]
- @field_validator("file_sources", mode="before")
- @classmethod
- def normalize_sources_before(
- cls, file_sources: Optional[list[str]]
- ) -> Optional[list[str]]:
- if file_sources is None:
- return None
- return [normalize_file_path(file_source) for file_source in file_sources]
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "texts": [
- "This is the first text to be inserted.",
- "This is the second text to be inserted.",
- ],
- "file_sources": [
- "First file source (optional)",
- ],
- "chunking": {
- "strategy": "recursive_character",
- "params": {"chunk_token_size": 1000},
- },
- }
- }
- )
- class InsertResponse(BaseModel):
- """Response model for document insertion operations
- Attributes:
- status: Status of the operation (success, partial_success, failure).
- Same-name conflicts are rejected with HTTP 409 rather than being
- reported as a "duplicated" 200 response, so this field never
- takes that value any more.
- message: Detailed message describing the operation result
- track_id: Tracking ID for monitoring processing status
- """
- status: Literal["success", "partial_success", "failure"] = Field(
- description="Status of the operation"
- )
- message: str = Field(description="Message describing the operation result")
- track_id: str = Field(description="Tracking ID for monitoring processing status")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status": "success",
- "message": "File 'document.pdf' uploaded successfully. Processing will continue in background.",
- "track_id": "upload_20250729_170612_abc123",
- }
- }
- )
- class ClearDocumentsResponse(BaseModel):
- """Response model for document clearing operation
- Attributes:
- status: Status of the clear operation
- message: Detailed message describing the operation result
- """
- status: Literal["success", "partial_success", "busy", "fail"] = Field(
- description="Status of the clear operation"
- )
- message: str = Field(description="Message describing the operation result")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status": "success",
- "message": "All documents cleared successfully. Deleted 15 files.",
- }
- }
- )
- class ClearCacheRequest(BaseModel):
- """Request model for clearing cache
- This model is kept for API compatibility but no longer accepts any parameters.
- All cache will be cleared regardless of the request content.
- """
- model_config = ConfigDict(json_schema_extra={"example": {}})
- class ClearCacheResponse(BaseModel):
- """Response model for cache clearing operation
- Attributes:
- status: Status of the clear operation
- message: Detailed message describing the operation result
- """
- status: Literal["success", "fail"] = Field(
- description="Status of the clear operation"
- )
- message: str = Field(description="Message describing the operation result")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status": "success",
- "message": "Successfully cleared cache for modes: ['default', 'naive']",
- }
- }
- )
- """Response model for document status
- Attributes:
- id: Document identifier
- content_summary: Summary of document content
- content_length: Length of document content
- status: Current processing status
- created_at: Creation timestamp (ISO format string)
- updated_at: Last update timestamp (ISO format string)
- chunks_count: Number of chunks (optional)
- error: Error message if any (optional)
- metadata: Additional metadata (optional)
- file_path: Path to the document file
- """
- class DeleteDocRequest(BaseModel):
- doc_ids: List[str] = Field(..., description="The IDs of the documents to delete.")
- delete_file: bool = Field(
- default=False,
- description="Whether to delete the corresponding file in the upload directory.",
- )
- delete_llm_cache: bool = Field(
- default=False,
- description="Whether to delete cached LLM extraction results for the documents.",
- )
- @field_validator("doc_ids", mode="after")
- @classmethod
- def validate_doc_ids(cls, doc_ids: List[str]) -> List[str]:
- if not doc_ids:
- raise ValueError("Document IDs list cannot be empty")
- validated_ids = []
- for doc_id in doc_ids:
- if not doc_id or not doc_id.strip():
- raise ValueError("Document ID cannot be empty")
- validated_ids.append(doc_id.strip())
- # Check for duplicates
- if len(validated_ids) != len(set(validated_ids)):
- raise ValueError("Document IDs must be unique")
- return validated_ids
- class DeleteEntityRequest(BaseModel):
- entity_name: str = Field(..., description="The name of the entity to delete.")
- @field_validator("entity_name", mode="after")
- @classmethod
- def validate_entity_name(cls, entity_name: str) -> str:
- if not entity_name or not entity_name.strip():
- raise ValueError("Entity name cannot be empty")
- return entity_name.strip()
- class DeleteRelationRequest(BaseModel):
- source_entity: str = Field(..., description="The name of the source entity.")
- target_entity: str = Field(..., description="The name of the target entity.")
- @field_validator("source_entity", "target_entity", mode="after")
- @classmethod
- def validate_entity_names(cls, entity_name: str) -> str:
- if not entity_name or not entity_name.strip():
- raise ValueError("Entity name cannot be empty")
- return entity_name.strip()
- class DocStatusResponse(BaseModel):
- id: str = Field(description="Document identifier")
- content_summary: str = Field(description="Summary of document content")
- content_length: int = Field(description="Length of document content in characters")
- status: DocStatus = Field(description="Current processing status")
- created_at: str = Field(description="Creation timestamp (ISO format string)")
- updated_at: str = Field(description="Last update timestamp (ISO format string)")
- track_id: Optional[str] = Field(
- default=None, description="Tracking ID for monitoring progress"
- )
- chunks_count: Optional[int] = Field(
- default=None, description="Number of chunks the document was split into"
- )
- error_msg: Optional[str] = Field(
- default=None, description="Error message if processing failed"
- )
- metadata: Optional[dict[str, Any]] = Field(
- default=None, description="Additional metadata about the document"
- )
- file_path: str = Field(description="Path to the document file")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "id": "doc_123456",
- "content_summary": "Research paper on machine learning",
- "content_length": 15240,
- "status": "processed",
- "created_at": "2025-03-31T12:34:56",
- "updated_at": "2025-03-31T12:35:30",
- "track_id": "upload_20250729_170612_abc123",
- "chunks_count": 12,
- "error": None,
- "metadata": {"author": "John Doe", "year": 2025},
- "file_path": "research_paper.pdf",
- }
- }
- )
- class DocsStatusesResponse(BaseModel):
- """Response model for document statuses
- Attributes:
- statuses: Dictionary mapping document status to lists of document status responses
- """
- statuses: Dict[DocStatus, List[DocStatusResponse]] = Field(
- default_factory=dict,
- description="Dictionary mapping document status to lists of document status responses",
- )
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "statuses": {
- "PENDING": [
- {
- "id": "doc_123",
- "content_summary": "Pending document",
- "content_length": 5000,
- "status": "pending",
- "created_at": "2025-03-31T10:00:00",
- "updated_at": "2025-03-31T10:00:00",
- "track_id": "upload_20250331_100000_abc123",
- "chunks_count": None,
- "error": None,
- "metadata": None,
- "file_path": "pending_doc.pdf",
- }
- ],
- "PREPROCESSED": [
- {
- "id": "doc_789",
- "content_summary": "Document pending final indexing",
- "content_length": 7200,
- "status": "preprocessed",
- "created_at": "2025-03-31T09:30:00",
- "updated_at": "2025-03-31T09:35:00",
- "track_id": "upload_20250331_093000_xyz789",
- "chunks_count": 10,
- "error": None,
- "metadata": None,
- "file_path": "preprocessed_doc.pdf",
- }
- ],
- "PROCESSED": [
- {
- "id": "doc_456",
- "content_summary": "Processed document",
- "content_length": 8000,
- "status": "processed",
- "created_at": "2025-03-31T09:00:00",
- "updated_at": "2025-03-31T09:05:00",
- "track_id": "insert_20250331_090000_def456",
- "chunks_count": 8,
- "error": None,
- "metadata": {"author": "John Doe"},
- "file_path": "processed_doc.pdf",
- }
- ],
- }
- }
- }
- )
- class TrackStatusResponse(BaseModel):
- """Response model for tracking document processing status by track_id
- Attributes:
- track_id: The tracking ID
- documents: List of documents associated with this track_id
- total_count: Total number of documents for this track_id
- status_summary: Count of documents by status
- """
- track_id: str = Field(description="The tracking ID")
- documents: List[DocStatusResponse] = Field(
- description="List of documents associated with this track_id"
- )
- total_count: int = Field(description="Total number of documents for this track_id")
- status_summary: Dict[str, int] = Field(description="Count of documents by status")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "track_id": "upload_20250729_170612_abc123",
- "documents": [
- {
- "id": "doc_123456",
- "content_summary": "Research paper on machine learning",
- "content_length": 15240,
- "status": "PROCESSED",
- "created_at": "2025-03-31T12:34:56",
- "updated_at": "2025-03-31T12:35:30",
- "track_id": "upload_20250729_170612_abc123",
- "chunks_count": 12,
- "error": None,
- "metadata": {"author": "John Doe", "year": 2025},
- "file_path": "research_paper.pdf",
- }
- ],
- "total_count": 1,
- "status_summary": {"PROCESSED": 1},
- }
- }
- )
- class DocumentsRequest(BaseModel):
- """Request model for paginated document queries
- Attributes:
- status_filter: Legacy single-status filter, ignored when status_filters is set
- status_filters: Filter by multiple document statuses, None for all statuses
- page: Page number (1-based)
- page_size: Number of documents per page (10-200)
- sort_field: Field to sort by ('created_at', 'updated_at', 'id', 'file_path')
- sort_direction: Sort direction ('asc' or 'desc')
- """
- status_filter: Optional[DocStatus] = Field(
- default=None,
- description="Legacy single-status filter, ignored when status_filters is set",
- )
- status_filters: Optional[List[DocStatus]] = Field(
- default=None, description="Filter by multiple document statuses"
- )
- page: int = Field(default=1, ge=1, description="Page number (1-based)")
- page_size: int = Field(
- default=50, ge=10, le=200, description="Number of documents per page (10-200)"
- )
- sort_field: Literal["created_at", "updated_at", "id", "file_path"] = Field(
- default="updated_at", description="Field to sort by"
- )
- sort_direction: Literal["asc", "desc"] = Field(
- default="desc", description="Sort direction"
- )
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status_filters": ["PREPROCESSED", "PARSING", "ANALYZING"],
- "page": 1,
- "page_size": 50,
- "sort_field": "updated_at",
- "sort_direction": "desc",
- }
- }
- )
- class PaginationInfo(BaseModel):
- """Pagination information
- Attributes:
- page: Current page number
- page_size: Number of items per page
- total_count: Total number of items
- total_pages: Total number of pages
- has_next: Whether there is a next page
- has_prev: Whether there is a previous page
- """
- page: int = Field(description="Current page number")
- page_size: int = Field(description="Number of items per page")
- total_count: int = Field(description="Total number of items")
- total_pages: int = Field(description="Total number of pages")
- has_next: bool = Field(description="Whether there is a next page")
- has_prev: bool = Field(description="Whether there is a previous page")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "page": 1,
- "page_size": 50,
- "total_count": 150,
- "total_pages": 3,
- "has_next": True,
- "has_prev": False,
- }
- }
- )
- class PaginatedDocsResponse(BaseModel):
- """Response model for paginated document queries
- Attributes:
- documents: List of documents for the current page
- pagination: Pagination information
- status_counts: Count of documents by status for all documents
- """
- documents: List[DocStatusResponse] = Field(
- description="List of documents for the current page"
- )
- pagination: PaginationInfo = Field(description="Pagination information")
- status_counts: Dict[str, int] = Field(
- description="Count of documents by status for all documents"
- )
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "documents": [
- {
- "id": "doc_123456",
- "content_summary": "Research paper on machine learning",
- "content_length": 15240,
- "status": "PROCESSED",
- "created_at": "2025-03-31T12:34:56",
- "updated_at": "2025-03-31T12:35:30",
- "track_id": "upload_20250729_170612_abc123",
- "chunks_count": 12,
- "error_msg": None,
- "metadata": {"author": "John Doe", "year": 2025},
- "file_path": "research_paper.pdf",
- }
- ],
- "pagination": {
- "page": 1,
- "page_size": 50,
- "total_count": 150,
- "total_pages": 3,
- "has_next": True,
- "has_prev": False,
- },
- "status_counts": {
- "PENDING": 10,
- "PROCESSING": 5,
- "PREPROCESSED": 5,
- "PROCESSED": 130,
- "FAILED": 5,
- },
- }
- }
- )
- class StatusCountsResponse(BaseModel):
- """Response model for document status counts
- Attributes:
- status_counts: Count of documents by status
- """
- status_counts: Dict[str, int] = Field(description="Count of documents by status")
- model_config = ConfigDict(
- json_schema_extra={
- "example": {
- "status_counts": {
- "PENDING": 10,
- "PROCESSING": 5,
- "PREPROCESSED": 5,
- "PROCESSED": 130,
- "FAILED": 5,
- }
- }
- }
- )
- class PipelineStatusResponse(BaseModel):
- """Response model for pipeline status
- Attributes:
- autoscanned: Whether auto-scan has started
- busy: Whether the pipeline is currently busy
- job_name: Current job name (e.g., indexing files/indexing texts)
- job_start: Job start time as ISO format string with timezone (optional)
- docs: Total number of documents to be indexed
- batchs: Number of batches for processing documents
- cur_batch: Current processing batch
- request_pending: Flag for pending request for processing
- latest_message: Latest message from pipeline processing
- history_messages: List of history messages
- update_status: Status of update flags for all namespaces
- """
- autoscanned: bool = False
- busy: bool = False
- job_name: str = "Default Job"
- job_start: Optional[str] = None
- docs: int = 0
- batchs: int = 0
- cur_batch: int = 0
- request_pending: bool = False
- latest_message: str = ""
- history_messages: Optional[List[str]] = None
- update_status: Optional[dict] = None
- @field_validator("job_start", mode="before")
- @classmethod
- def parse_job_start(cls, value):
- """Process datetime and return as ISO format string with timezone"""
- return format_datetime(value)
- model_config = ConfigDict(extra="allow")
- class DocumentManager:
- def __init__(
- self,
- input_dir: str,
- workspace: str = "", # New parameter for workspace isolation
- supported_extensions: tuple = (
- ".txt",
- ".md",
- ".mdx", # MDX (Markdown + JSX)
- ".pdf",
- ".docx",
- ".pptx",
- ".xlsx",
- ".rtf", # Rich Text Format
- ".odt", # OpenDocument Text
- ".tex", # LaTeX
- ".epub", # Electronic Publication
- ".html", # HyperText Markup Language
- ".htm", # HyperText Markup Language
- ".csv", # Comma-Separated Values
- ".json", # JavaScript Object Notation
- ".xml", # eXtensible Markup Language
- ".yaml", # YAML Ain't Markup Language
- ".yml", # YAML
- ".log", # Log files
- ".conf", # Configuration files
- ".ini", # Initialization files
- ".properties", # Java properties files
- ".sql", # SQL scripts
- ".bat", # Batch files
- ".sh", # Shell scripts
- ".c", # C source code
- ".h", # C header
- ".cpp", # C++ source code
- ".hpp", # C++ header
- ".py", # Python source code
- ".java", # Java source code
- ".js", # JavaScript source code
- ".ts", # TypeScript source code
- ".swift", # Swift source code
- ".go", # Go source code
- ".rb", # Ruby source code
- ".php", # PHP source code
- ".css", # Cascading Style Sheets
- ".scss", # Sassy CSS
- ".less", # LESS CSS
- ),
- ):
- # Store the base input directory and workspace
- self.base_input_dir = Path(input_dir)
- self.workspace = workspace
- self.supported_extensions = supported_extensions
- self.indexed_files = set()
- # Create workspace-specific input directory
- # If workspace is provided, create a subdirectory for data isolation
- if workspace:
- self.input_dir = self.base_input_dir / workspace
- else:
- self.input_dir = self.base_input_dir
- # Create input directory if it doesn't exist
- self.input_dir.mkdir(parents=True, exist_ok=True)
- def scan_directory_for_new_files(self) -> List[Path]:
- """Scan input directory for new files"""
- new_files = []
- for ext in self.supported_extensions:
- logger.debug(f"Scanning for {ext} files in {self.input_dir}")
- for file_path in self.input_dir.glob(f"*{ext}"):
- if file_path not in self.indexed_files:
- new_files.append(file_path)
- return new_files
- def mark_as_indexed(self, file_path: Path):
- self.indexed_files.add(file_path)
- def is_supported_file(self, filename: str) -> bool:
- return any(filename.lower().endswith(ext) for ext in self.supported_extensions)
- def validate_file_path_security(file_path_str: str, base_dir: Path) -> Optional[Path]:
- """
- Validate file path security to prevent Path Traversal attacks.
- Args:
- file_path_str: The file path string to validate
- base_dir: The base directory that the file must be within
- Returns:
- Path: Safe file path if valid, None if unsafe or invalid
- """
- if not file_path_str or not file_path_str.strip():
- return None
- try:
- # Clean the file path string
- clean_path_str = file_path_str.strip()
- # Check for obvious path traversal patterns before processing
- # This catches both Unix (..) and Windows (..\) style traversals
- if ".." in clean_path_str:
- # Additional check for Windows-style backslash traversal
- if (
- "\\..\\" in clean_path_str
- or clean_path_str.startswith("..\\")
- or clean_path_str.endswith("\\..")
- ):
- # logger.warning(
- # f"Security violation: Windows path traversal attempt detected - {file_path_str}"
- # )
- return None
- # Normalize path separators (convert backslashes to forward slashes)
- # This helps handle Windows-style paths on Unix systems
- normalized_path = clean_path_str.replace("\\", "/")
- # Create path object and resolve it (handles symlinks and relative paths)
- candidate_path = (base_dir / normalized_path).resolve()
- base_dir_resolved = base_dir.resolve()
- # Check if the resolved path is within the base directory
- if not candidate_path.is_relative_to(base_dir_resolved):
- # logger.warning(
- # f"Security violation: Path traversal attempt detected - {file_path_str}"
- # )
- return None
- return candidate_path
- except (OSError, ValueError, Exception) as e:
- logger.warning(f"Invalid file path detected: {file_path_str} - {str(e)}")
- return None
- def get_doc_status_value(doc_status: Any) -> str:
- """Read status from dict or DocProcessingStatus-like objects."""
- status = (
- doc_status.get("status")
- if isinstance(doc_status, dict)
- else getattr(doc_status, "status", None)
- )
- if isinstance(status, DocStatus):
- return status.value
- return str(status or "")
- def get_doc_track_id(doc_status: Any) -> str:
- """Read track_id from dict or DocProcessingStatus-like objects."""
- track_id = (
- doc_status.get("track_id")
- if isinstance(doc_status, dict)
- else getattr(doc_status, "track_id", None)
- )
- return str(track_id or "")
- async def get_existing_doc_by_file_path_candidates(
- doc_status: Any, file_path: Path | str
- ) -> dict[str, Any] | None:
- """Find an existing document by canonical basename."""
- basename = normalize_file_path(str(file_path))
- if basename == UNKNOWN_FILE_SOURCE:
- return None
- match = await doc_status.get_doc_by_file_basename(basename)
- if not match:
- return None
- _, existing_doc_data = match
- return existing_doc_data
- async def _reserve_enqueue_slot(rag: LightRAG) -> bool:
- """Atomically check exclusive-writer state and reserve a
- pending-enqueue slot.
- Concurrent enqueues are permitted while the processing loop is
- running — the loop is notified via ``request_pending`` and picks up
- newly-enqueued docs after its current batch. This includes the
- scan task's processing phase: once classification is done, the
- scan transitions to driving the processing pipeline like any
- other enqueuer, and uploads can land alongside it.
- Two states block new uploads/inserts:
- - ``scanning_exclusive``: scan task is in its CLASSIFICATION
- phase — reading doc_status to classify files (PROCESSED →
- archive, FAILED-without-full_docs → retry-as-new, etc.) and
- possibly deleting stale stubs. Concurrent enqueue would race
- against scan's reads / stub deletions. ``scanning`` alone
- (the processing phase) does NOT block uploads.
- - ``destructive_busy``: a /documents/clear or per-doc delete is in
- flight. These DROP storages and remove input files; an enqueue
- accepted in this window would write to a storage that is being
- torn down and silently lose the document after the client saw
- success.
- ``pending_enqueues`` is incremented so the scan endpoint can refuse
- while bg tasks are mid-enqueue. The counter does NOT gate
- ``apipeline_process_enqueue_documents`` — concurrent processing is
- explicitly allowed and is what makes "upload while pipeline is
- busy" possible.
- A workspace whose ``pipeline_status`` has never been initialised
- (mocked test rigs) is treated as idle; no slot is reserved.
- Returns:
- True when a slot was reserved (caller MUST pair with
- ``_release_enqueue_slot``); False when pipeline_status is not
- bootstrapped.
- Raises:
- HTTPException(409): when
- ``pipeline_status['scanning_exclusive']`` or
- ``pipeline_status['destructive_busy']`` is set.
- """
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- return False
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- if pipeline_status.get("scanning_exclusive"):
- raise HTTPException(
- status_code=409,
- detail=(
- "Document scan is classifying files. "
- "Wait for the classification phase to finish before "
- "submitting new work."
- ),
- )
- if pipeline_status.get("destructive_busy"):
- raise HTTPException(
- status_code=409,
- detail=(
- "Pipeline is clearing or deleting documents. "
- "Wait for the running job to finish before submitting "
- "new work."
- ),
- )
- pipeline_status["pending_enqueues"] = (
- pipeline_status.get("pending_enqueues", 0) + 1
- )
- return True
- async def check_pipeline_busy_or_raise(rag: LightRAG) -> None:
- """Refuse the request with HTTP 409 when the document pipeline is busy.
- Intended for short, fine-grained graph mutations (entity/relation
- edit/create/delete/merge). Reads ``pipeline_status['busy']`` under
- the namespace lock and raises immediately on contention -- it does
- NOT set any flag, so it cannot block the pipeline itself.
- ``busy`` is set by the processing loop and by destructive jobs
- (``/documents/clear`` / per-doc delete). Both paths concurrently
- write the same graph storages that these endpoints mutate, so a
- 409 here mirrors the existing UI guard and tells clients to wait.
- A narrow race remains between this check and the underlying graph
- write: if the pipeline transitions to busy in that window, the
- per-edge/-node locks inside the storage layer are the last line of
- defense. That trade-off is deliberate -- holding ``busy`` here
- would serialise every UI edit against document ingestion, which is
- a worse user-visible failure mode than tolerating the race.
- No-op (returns silently) when ``pipeline_status`` was never
- bootstrapped, matching the behaviour of ``_acquire_destructive_busy``
- so test rigs without a real shared-storage Manager keep working.
- """
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- return
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- if pipeline_status.get("busy"):
- raise HTTPException(
- status_code=409,
- detail=(
- "Pipeline is busy with another operation. "
- "Wait for the running job to finish before editing "
- "the knowledge graph."
- ),
- )
- async def _acquire_destructive_busy(rag: LightRAG) -> tuple[bool, str | None]:
- """Atomically reserve the destructive busy slot for ``/documents/clear``
- or ``/documents/delete_document``.
- Both jobs DROP storages and (for clear) remove input files. They
- must serialise against:
- - any other ``busy`` work (processing loop, another destructive job),
- - an in-flight ``scanning`` task that reads/writes doc_status and
- INPUT/, and
- - any ``pending_enqueues`` reservation whose bg task has not yet
- written to doc_status — accepting the destructive job in that
- window would drop storages while the enqueue is mid-write,
- losing a document the client already saw success for.
- All three checks happen inside a single ``pipeline_status_lock``
- critical section together with the flag write, so a concurrent
- enqueue/scan reservation cannot squeeze past us.
- Caller is responsible for clearing both flags in its finally block.
- Returns:
- (acquired, reason). ``acquired=True`` and ``reason=None`` on
- success. ``acquired=False`` with a human-readable ``reason``
- when another writer has the lock; the caller surfaces this to
- the client (HTTP 200 with status="busy" for these endpoints).
- For test rigs where ``pipeline_status`` was never bootstrapped,
- returns (True, None) — there is nothing to coordinate against.
- """
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- return True, None
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- if pipeline_status.get("busy"):
- return False, "Pipeline is busy with another operation."
- if pipeline_status.get("scanning"):
- return False, (
- "Document scan is in progress. "
- "Wait for the scan to complete before clearing or deleting."
- )
- if pipeline_status.get("pending_enqueues", 0) > 0:
- return False, (
- "Document upload/insert is being enqueued. "
- "Wait for in-flight work to complete before clearing or "
- "deleting."
- )
- pipeline_status["busy"] = True
- pipeline_status["destructive_busy"] = True
- return True, None
- async def _release_destructive_busy(rag: LightRAG) -> None:
- """Release the destructive busy slot acquired by
- ``_acquire_destructive_busy``. Never raises.
- Distinct from ``_release_enqueue_slot``: that helper clears
- ``pending_enqueues`` (the upload/insert reservation), this one
- clears ``busy + destructive_busy`` (the clear/delete reservation).
- """
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- return
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- pipeline_status["busy"] = False
- pipeline_status["destructive_busy"] = False
- async def _release_enqueue_slot(rag: LightRAG) -> None:
- """Release a slot reserved by ``_reserve_enqueue_slot``.
- Pure decrement; the bg task itself drives processing by calling
- ``apipeline_process_enqueue_documents`` after enqueue (the call is
- a cheap no-op when the loop is already busy — it just sets
- ``request_pending``). Drain coordination across sibling bg tasks
- is unnecessary in the new contract: each task triggers processing
- independently and the loop's request_pending mechanism collapses
- duplicate triggers safely.
- Decrement is clamped at 0 so a stray release (e.g. from a workspace
- whose reservation returned False but whose bg task wrapper still
- calls release) is harmless. Never raises.
- """
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- return
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- current = pipeline_status.get("pending_enqueues", 0)
- if current > 0:
- pipeline_status["pending_enqueues"] = current - 1
- def find_existing_file_by_file_path(input_dir: Path, file_path: str) -> Path | None:
- """Find an input-dir file whose canonical basename matches ``file_path``.
- Callers pass the stored canonical ``file_path`` (already hint-stripped);
- on-disk filenames are normalized before comparison so a hint-bearing
- variant on disk still matches a canonical stored ``file_path``.
- """
- if not file_path or file_path == UNKNOWN_FILE_SOURCE:
- return None
- try:
- for candidate in input_dir.iterdir():
- if not candidate.is_file():
- continue
- if normalize_file_path(candidate.name) == file_path:
- return candidate
- except FileNotFoundError:
- return None
- return None
- def canonicalize_archived_file_variant_basename(
- file_path: Path | str, *, strip_archive_suffix: bool = False
- ) -> str:
- """Canonical basename for original files and numbered archive variants."""
- name = Path(file_path).name
- path = Path(name)
- stem = (
- ARCHIVED_FILE_SUFFIX_RE.sub("", path.stem)
- if strip_archive_suffix
- else path.stem
- )
- return normalize_file_path(f"{stem}{path.suffix}")
- def _file_path_for_parsed_artifact_dir(dir_name: str) -> str | None:
- """Return the canonical source basename for a parser artifact dir.
- Recognized layouts (suffix list in
- :data:`lightrag.constants.PARSED_ARTIFACT_DIR_SUFFIXES`):
- - ``<basename>.parsed[_NNN]/`` — sidecar output (every engine)
- - ``<basename>.mineru_raw[_NNN]/`` — MinerU preserved raw bundle
- - ``<basename>.docling_raw[_NNN]/`` — Docling preserved raw bundle
- Raw bundles are preserved across re-parses for cache reuse and on-demand
- diagnostics; they are cleaned only when the user deletes the document
- with ``delete_file=True`` so the raw artifacts and source file go away
- together.
- """
- stripped = ARCHIVED_FILE_SUFFIX_RE.sub("", dir_name)
- for suffix in PARSED_ARTIFACT_DIR_SUFFIXES:
- if stripped.endswith(suffix):
- basename = stripped[: -len(suffix)]
- if basename:
- return normalize_file_path(basename)
- return None
- def delete_file_variants_by_file_path(
- input_dir: Path,
- file_path: str | None,
- ) -> tuple[list[str], list[str]]:
- """Delete input/__parsed__ source files matching a canonical ``file_path``."""
- if not file_path:
- return [], []
- canonical = normalize_file_path(file_path)
- if canonical == UNKNOWN_FILE_SOURCE:
- return [], []
- canonical_names = {canonical}
- deleted_files: list[str] = []
- errors: list[str] = []
- candidate_dirs = [input_dir, input_dir / PARSED_DIR_NAME]
- input_dir_resolved = input_dir.resolve()
- for candidate_dir in candidate_dirs:
- try:
- candidates = list(candidate_dir.iterdir())
- except FileNotFoundError:
- continue
- except Exception as e:
- errors.append(f"Failed to scan {candidate_dir}: {e}")
- continue
- in_parsed_dir = candidate_dir.name == PARSED_DIR_NAME
- for candidate in candidates:
- if candidate.is_file():
- if (
- canonicalize_archived_file_variant_basename(
- candidate.name,
- strip_archive_suffix=in_parsed_dir,
- )
- not in canonical_names
- ):
- continue
- safe_candidate = validate_file_path_security(
- candidate.name, candidate_dir
- )
- if safe_candidate is None:
- errors.append(f"Unsafe file path skipped: {candidate.name}")
- continue
- try:
- safe_candidate.unlink()
- deleted_files.append(
- str(safe_candidate.relative_to(input_dir_resolved))
- )
- except Exception as e:
- errors.append(f"Failed to delete {candidate.name}: {e}")
- continue
- if in_parsed_dir and candidate.is_dir():
- canonical_for_dir = _file_path_for_parsed_artifact_dir(candidate.name)
- if (
- canonical_for_dir is None
- or canonical_for_dir not in canonical_names
- ):
- continue
- safe_candidate = validate_file_path_security(
- candidate.name, candidate_dir
- )
- if safe_candidate is None:
- errors.append(f"Unsafe artifact dir skipped: {candidate.name}")
- continue
- try:
- shutil.rmtree(safe_candidate)
- deleted_files.append(
- str(safe_candidate.relative_to(input_dir_resolved))
- )
- except Exception as e:
- errors.append(
- f"Failed to delete artifact dir {candidate.name}: {e}"
- )
- return deleted_files, errors
- async def record_scan_warning(rag: LightRAG, message: str) -> None:
- logger.warning(message)
- try:
- from lightrag.kg import shared_storage
- if not getattr(shared_storage, "_initialized", False):
- return
- workspace = getattr(rag, "workspace", "")
- pipeline_status = await shared_storage.get_namespace_data(
- "pipeline_status", workspace=workspace
- )
- pipeline_status_lock = shared_storage.get_namespace_lock(
- "pipeline_status", workspace=workspace
- )
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = message
- pipeline_status["history_messages"].append(message)
- except Exception:
- pass
- # Document processing helper functions (synchronous)
- # These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop
- def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str:
- """Extract PDF content using pypdf (synchronous).
- Args:
- file_bytes: PDF file content as bytes
- password: Optional password for encrypted PDFs
- Returns:
- str: Extracted text content
- Raises:
- Exception: If PDF is encrypted and password is incorrect or missing
- """
- from pypdf import PdfReader # type: ignore
- pdf_file = BytesIO(file_bytes)
- reader = PdfReader(pdf_file)
- # Check if PDF is encrypted
- if reader.is_encrypted:
- # Try empty password first (covers permission-only encrypted PDFs)
- decrypt_result = reader.decrypt(password or "")
- if decrypt_result == 0:
- if password:
- raise Exception("Incorrect PDF password")
- else:
- raise Exception("PDF is encrypted but no password provided")
- # Extract text from all pages
- content = ""
- for page in reader.pages:
- content += page.extract_text() + "\n"
- return content
- def _extract_docx(file_bytes: bytes) -> str:
- """Extract DOCX content including tables in document order (synchronous).
- Args:
- file_bytes: DOCX file content as bytes
- Returns:
- str: Extracted text content with tables in their original positions.
- Tables are separated from paragraphs with blank lines for clarity.
- """
- from docx import Document # type: ignore
- from docx.table import Table # type: ignore
- from docx.text.paragraph import Paragraph # type: ignore
- docx_file = BytesIO(file_bytes)
- doc = Document(docx_file)
- def escape_cell(cell_value: str | None) -> str:
- """Escape characters that would break tab-delimited layout.
- Escape order is critical: backslashes first, then tabs/newlines.
- This prevents double-escaping issues.
- Args:
- cell_value: The cell value to escape (can be None or str)
- Returns:
- str: Escaped cell value safe for tab-delimited format
- """
- if cell_value is None:
- return ""
- text = str(cell_value)
- # CRITICAL: Escape backslash first to avoid double-escaping
- return (
- text.replace("\\", "\\\\") # Must be first: \ -> \\
- .replace("\t", "  ") # Tab -> \t (visible)
- .replace("\r\n", "<br>") # Windows newline -> \n
- .replace("\r", "<br>") # Mac newline -> \n
- .replace("\n", "<br>") # Unix newline -> \n
- )
- content_parts = []
- in_table = False # Track if we're currently processing a table
- # Iterate through all body elements in document order
- for element in doc.element.body:
- # Check if element is a paragraph
- if element.tag.endswith("p"):
- # If coming out of a table, add blank line after table
- if in_table:
- content_parts.append("") # Blank line after table
- in_table = False
- paragraph = Paragraph(element, doc)
- text = paragraph.text
- # Always append to preserve document spacing (including blank paragraphs)
- content_parts.append(text)
- # Check if element is a table
- elif element.tag.endswith("tbl"):
- # Add blank line before table (if content exists)
- if content_parts and not in_table:
- content_parts.append("") # Blank line before table
- in_table = True
- table = Table(element, doc)
- for row in table.rows:
- row_text = []
- for cell in row.cells:
- cell_text = cell.text
- # Escape special characters to preserve tab-delimited structure
- row_text.append(escape_cell(cell_text))
- # Only add row if at least one cell has content
- if any(cell for cell in row_text):
- content_parts.append("\t".join(row_text))
- return "\n".join(content_parts)
- def _extract_pptx(file_bytes: bytes) -> str:
- """Extract PPTX content (synchronous).
- Args:
- file_bytes: PPTX file content as bytes
- Returns:
- str: Extracted text content
- """
- from pptx import Presentation # type: ignore
- pptx_file = BytesIO(file_bytes)
- prs = Presentation(pptx_file)
- content = ""
- for slide in prs.slides:
- for shape in slide.shapes:
- if hasattr(shape, "text"):
- content += shape.text + "\n"
- return content
- def _extract_xlsx(file_bytes: bytes) -> str:
- """Extract XLSX content in tab-delimited format with clear sheet separation.
- This function processes Excel workbooks and converts them to a structured text format
- suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with
- separator lines, and special characters are escaped to preserve the tab-delimited structure.
- Features:
- - Each sheet is wrapped with '====================' separators for visual distinction
- - Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption
- - Column alignment is preserved across all rows to maintain tabular structure
- - Empty rows are preserved as blank lines to maintain row structure
- - Uses sheet.max_column to determine column width efficiently
- Args:
- file_bytes: XLSX file content as bytes
- Returns:
- str: Extracted text content with all sheets in tab-delimited format.
- Format: Sheet separators, sheet name, then tab-delimited rows.
- Example output:
- ==================== Sheet: Data ====================
- Name\tAge\tCity
- Alice\t30\tNew York
- Bob\t25\tLondon
- ==================== Sheet: Summary ====================
- Total\t2
- ====================
- """
- from openpyxl import load_workbook # type: ignore
- xlsx_file = BytesIO(file_bytes)
- wb = load_workbook(xlsx_file)
- def escape_cell(cell_value: str | int | float | None) -> str:
- """Escape characters that would break tab-delimited layout.
- Escape order is critical: backslashes first, then tabs/newlines.
- This prevents double-escaping issues.
- Args:
- cell_value: The cell value to escape (can be None, str, int, or float)
- Returns:
- str: Escaped cell value safe for tab-delimited format
- """
- if cell_value is None:
- return ""
- text = str(cell_value)
- # CRITICAL: Escape backslash first to avoid double-escaping
- return (
- text.replace("\\", "\\\\") # Must be first: \ -> \\
- .replace("\t", "\\t") # Tab -> \t (visible)
- .replace("\r\n", "\\n") # Windows newline -> \n
- .replace("\r", "\\n") # Mac newline -> \n
- .replace("\n", "\\n") # Unix newline -> \n
- )
- def escape_sheet_title(title: str) -> str:
- """Escape sheet title to prevent formatting issues in separators.
- Args:
- title: Original sheet title
- Returns:
- str: Sanitized sheet title with tabs/newlines replaced
- """
- return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ")
- content_parts: list[str] = []
- sheet_separator = "=" * 20
- for idx, sheet in enumerate(wb):
- if idx > 0:
- content_parts.append("") # Blank line between sheets for readability
- # Escape sheet title to handle edge cases with special characters
- safe_title = escape_sheet_title(sheet.title)
- content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}")
- # Use sheet.max_column to get the maximum column width directly
- max_columns = sheet.max_column if sheet.max_column else 0
- # Extract rows with consistent width to preserve column alignment
- for row in sheet.iter_rows(values_only=True):
- row_parts = []
- # Build row up to max_columns width
- for idx in range(max_columns):
- if idx < len(row):
- row_parts.append(escape_cell(row[idx]))
- else:
- row_parts.append("") # Pad short rows
- # Check if row is completely empty
- if all(part == "" for part in row_parts):
- # Preserve empty rows as blank lines (maintains row structure)
- content_parts.append("")
- else:
- # Join all columns to maintain consistent column count
- content_parts.append("\t".join(row_parts))
- # Final separator for symmetry (makes parsing easier)
- content_parts.append(sheet_separator)
- return "\n".join(content_parts)
- async def pipeline_enqueue_file(
- rag: LightRAG,
- file_path: Path,
- track_id: str = None,
- from_scan: bool = False,
- ) -> tuple[bool, str]:
- """Add a file to the queue for processing
- Args:
- rag: LightRAG instance
- file_path: Path to the saved file
- track_id: Optional tracking ID, if not provided will be generated
- from_scan: True only when invoked by the scan-owned background task,
- which already holds ``pipeline_status["scanning"]``. Forwarded to
- ``apipeline_enqueue_documents`` so the scan can enqueue the files
- it just discovered without tripping the scanning guard there.
- Returns:
- tuple: (success: bool, track_id: str)
- """
- # Generate track_id if not provided
- if track_id is None:
- track_id = generate_track_id("unknown")
- try:
- content = ""
- ext = file_path.suffix.lower()
- file_size = 0
- # Get file size for error reporting
- try:
- stat = await asyncio.to_thread(file_path.stat)
- file_size = stat.st_size
- except Exception:
- file_size = 0
- try:
- extraction_engine, process_options = resolve_file_parser_directives(
- file_path
- )
- except FilenameParserHintError as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]Filename hint error",
- "original_error": str(e),
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(
- f"[File Extraction]Invalid filename hint in {file_path.name}: {e}"
- )
- return False, track_id
- api_process_options = process_options or PROCESS_OPTION_CHUNK_FIXED
- if extraction_engine != PARSER_ENGINE_LEGACY:
- try:
- enqueue_kwargs = {
- "file_paths": str(file_path),
- "track_id": track_id,
- "docs_format": FULL_DOCS_FORMAT_PENDING_PARSE,
- "parse_engine": extraction_engine,
- "process_options": api_process_options,
- "from_scan": from_scan,
- }
- enqueue_result = await rag.apipeline_enqueue_documents(
- "", **enqueue_kwargs
- )
- if enqueue_result is None:
- try:
- await move_file_to_parsed_dir(file_path)
- except Exception as move_error:
- logger.error(
- f"Failed to move duplicate file {file_path.name} to {PARSED_DIR_NAME} directory: {move_error}"
- )
- return False, track_id
- logger.info(
- f"[File Extraction]Deferred {file_path.name} to {extraction_engine} parser"
- )
- return True, track_id
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]Parser enqueue error",
- "original_error": f"Failed to enqueue file for parser: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(
- f"[File Extraction]Error enqueuing {file_path.name} for {extraction_engine}: {str(e)}"
- )
- return False, track_id
- file = None
- try:
- async with aiofiles.open(file_path, "rb") as f:
- file = await f.read()
- except PermissionError as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]Permission denied - cannot read file",
- "original_error": str(e),
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(
- f"[File Extraction]Permission denied reading file: {file_path.name}"
- )
- return False, track_id
- except FileNotFoundError as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]File not found",
- "original_error": str(e),
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(f"[File Extraction]File not found: {file_path.name}")
- return False, track_id
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]File reading error",
- "original_error": str(e),
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(
- f"[File Extraction]Error reading file {file_path.name}: {str(e)}"
- )
- return False, track_id
- # Process based on file type
- try:
- match ext:
- case (
- ".txt"
- | ".md"
- | ".mdx"
- | ".html"
- | ".htm"
- | ".tex"
- | ".json"
- | ".xml"
- | ".yaml"
- | ".yml"
- | ".rtf"
- | ".odt"
- | ".epub"
- | ".csv"
- | ".log"
- | ".conf"
- | ".ini"
- | ".properties"
- | ".sql"
- | ".bat"
- | ".sh"
- | ".c"
- | ".h"
- | ".cpp"
- | ".hpp"
- | ".py"
- | ".java"
- | ".js"
- | ".ts"
- | ".swift"
- | ".go"
- | ".rb"
- | ".php"
- | ".css"
- | ".scss"
- | ".less"
- ):
- try:
- # Try to decode as UTF-8 (offloaded to thread to avoid blocking the event loop)
- content = await asyncio.to_thread(file.decode, "utf-8")
- # Validate content
- if not content or len(content.strip()) == 0:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]Empty file content",
- "original_error": "File contains no content or only whitespace",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]Empty content in file: {file_path.name}"
- )
- return False, track_id
- # Check if content looks like binary data string representation
- if content.startswith("b'") or content.startswith('b"'):
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]Binary data in text file",
- "original_error": "File appears to contain binary data representation instead of text",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]File {file_path.name} appears to contain binary data representation instead of text"
- )
- return False, track_id
- except UnicodeDecodeError as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]UTF-8 encoding error, please convert it to UTF-8 before processing",
- "original_error": f"File is not valid UTF-8 encoded text: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing."
- )
- return False, track_id
- case ".pdf":
- try:
- content = await asyncio.to_thread(
- _extract_pdf_pypdf,
- file,
- global_args.pdf_decrypt_password,
- )
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]PDF processing error",
- "original_error": f"Failed to extract text from PDF: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]Error processing PDF {file_path.name}: {str(e)}"
- )
- return False, track_id
- case ".docx":
- try:
- content = await asyncio.to_thread(_extract_docx, file)
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]DOCX processing error",
- "original_error": f"Failed to extract text from DOCX: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]Error processing DOCX {file_path.name}: {str(e)}"
- )
- return False, track_id
- case ".pptx":
- try:
- content = await asyncio.to_thread(_extract_pptx, file)
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]PPTX processing error",
- "original_error": f"Failed to extract text from PPTX: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]Error processing PPTX {file_path.name}: {str(e)}"
- )
- return False, track_id
- case ".xlsx":
- try:
- content = await asyncio.to_thread(_extract_xlsx, file)
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]XLSX processing error",
- "original_error": f"Failed to extract text from XLSX: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(
- error_files, track_id
- )
- logger.error(
- f"[File Extraction]Error processing XLSX {file_path.name}: {str(e)}"
- )
- return False, track_id
- case _:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": f"[File Extraction]Unsupported file type: {ext}",
- "original_error": f"File extension {ext} is not supported",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(
- f"[File Extraction]Unsupported file type: {file_path.name} (extension {ext})"
- )
- return False, track_id
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]File format processing error",
- "original_error": f"Unexpected error during file extracting: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(
- f"[File Extraction]Unexpected error during {file_path.name} extracting: {str(e)}"
- )
- return False, track_id
- # Insert into the RAG queue
- if content:
- # Check if content contains only whitespace characters
- if not content.strip():
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "[File Extraction]File contains only whitespace",
- "original_error": "File content contains only whitespace characters",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.warning(
- f"[File Extraction]File contains only whitespace characters: {file_path.name}"
- )
- return False, track_id
- try:
- enqueue_kwargs = {
- "file_paths": file_path.name,
- "track_id": track_id,
- "parse_engine": PARSER_ENGINE_LEGACY,
- "process_options": api_process_options,
- "from_scan": from_scan,
- }
- enqueue_result = await rag.apipeline_enqueue_documents(
- content, **enqueue_kwargs
- )
- if enqueue_result is None:
- try:
- await move_file_to_parsed_dir(file_path)
- except Exception as move_error:
- logger.error(
- f"Failed to move duplicate file {file_path.name} to {PARSED_DIR_NAME} directory: {move_error}"
- )
- return False, track_id
- logger.info(
- f"Successfully extracted and enqueued file: {file_path.name}"
- )
- # Move file to __parsed__ directory after enqueuing (LR2-PRD: parsed output dir)
- try:
- await move_file_to_parsed_dir(file_path)
- except Exception as move_error:
- logger.error(
- f"Failed to move file {file_path.name} to {PARSED_DIR_NAME} directory: {move_error}"
- )
- # Don't affect the main function's success status
- return True, track_id
- except Exception as e:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "Document enqueue error",
- "original_error": f"Failed to enqueue document: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(f"Error enqueueing document {file_path.name}: {str(e)}")
- return False, track_id
- else:
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "No content extracted",
- "original_error": "No content could be extracted from file",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(f"No content extracted from file: {file_path.name}")
- return False, track_id
- except Exception as e:
- # Catch-all for any unexpected errors
- try:
- file_size = file_path.stat().st_size if file_path.exists() else 0
- except Exception:
- file_size = 0
- error_files = [
- {
- "file_path": str(file_path.name),
- "error_description": "Unexpected processing error",
- "original_error": f"Unexpected error: {str(e)}",
- "file_size": file_size,
- }
- ]
- await rag.apipeline_enqueue_error_documents(error_files, track_id)
- logger.error(f"Enqueuing file {file_path.name} error: {str(e)}")
- logger.error(traceback.format_exc())
- return False, track_id
- finally:
- if file_path.name.startswith(temp_prefix):
- try:
- file_path.unlink()
- except Exception as e:
- logger.error(f"Error deleting file {file_path}: {str(e)}")
- async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):
- """Index a file with track_id
- Args:
- rag: LightRAG instance
- file_path: Path to the saved file
- track_id: Optional tracking ID
- """
- try:
- success, _ = await pipeline_enqueue_file(rag, file_path, track_id)
- if success:
- await rag.apipeline_process_enqueue_documents()
- except Exception as e:
- logger.error(f"Error indexing file {file_path.name}: {str(e)}")
- logger.error(traceback.format_exc())
- async def pipeline_index_files(
- rag: LightRAG,
- file_paths: List[Path],
- track_id: str = None,
- from_scan: bool = False,
- ):
- """Index multiple files sequentially to avoid high CPU load
- Args:
- rag: LightRAG instance
- file_paths: Paths to the files to index
- track_id: Optional tracking ID to pass to all files
- from_scan: True only when invoked by the scan-owned background task.
- Forwarded to ``pipeline_enqueue_file`` so the per-file enqueue
- calls bypass the scanning guard inside
- ``apipeline_enqueue_documents`` (whose ``scanning`` flag the
- scan task itself owns).
- """
- if not file_paths:
- return
- try:
- enqueued = False
- # Use get_pinyin_sort_key for Chinese pinyin sorting
- sorted_file_paths = sorted(
- file_paths, key=lambda p: get_pinyin_sort_key(str(p))
- )
- # Process files sequentially with track_id
- for file_path in sorted_file_paths:
- success, _ = await pipeline_enqueue_file(
- rag,
- file_path,
- track_id,
- from_scan=from_scan,
- )
- if success:
- enqueued = True
- # Process the queue only if at least one file was successfully enqueued
- if enqueued:
- await rag.apipeline_process_enqueue_documents()
- except Exception as e:
- logger.error(f"Error indexing files: {str(e)}")
- logger.error(traceback.format_exc())
- _STRATEGY_TO_PROCESS_OPTION: Dict[str, str] = {
- "fixed_token": PROCESS_OPTION_CHUNK_FIXED,
- "recursive_character": PROCESS_OPTION_CHUNK_RECURSIVE,
- "semantic_vector": PROCESS_OPTION_CHUNK_VECTOR,
- "paragraph_semantic": PROCESS_OPTION_CHUNK_PARAGRAH,
- }
- def _resolve_text_chunking(
- chunking: Optional[TextChunkingConfig], rag: LightRAG
- ) -> tuple[str, dict]:
- """Freeze a ``chunking`` request into ``(process_options, chunk_options)``.
- When ``chunking`` is ``None`` this reproduces today's behavior exactly:
- fixed-token strategy with the snapshot built from
- ``rag.addon_params['chunker']``.
- Otherwise the validated, strategy-specific params are merged into the
- selected strategy's sub-dict. ``chunk_token_size`` rides along inside
- ``params`` like any other key — every strategy (F included, after the
- ``process_single_document`` cleanup) reads its size from its own
- sub-dict, with the top-level snapshot value as the shared fallback.
- Raises:
- ValueError: when the request lowers ``chunk_token_size`` below the
- *effective* ``chunk_overlap_token_size``. The overlap is often
- inherited from ``addon_params``/env (the overlay fills
- ``fixed_token``/``recursive_character``/``paragraph_semantic``
- overlap with ``CHUNK_*_OVERLAP_SIZE`` / ``CHUNK_OVERLAP_SIZE``),
- so this can only be checked here against the resolved snapshot,
- not in the request model. Callers on the request path invoke
- this synchronously so the failure surfaces as HTTP 422 before any
- background work is scheduled.
- """
- if chunking is None:
- # No request-driven config: reproduce today's behavior verbatim,
- # including not introducing new validation on the default path.
- process_options = PROCESS_OPTION_CHUNK_FIXED
- return process_options, resolve_chunk_options(
- rag.addon_params, process_options=process_options
- )
- process_options = _STRATEGY_TO_PROCESS_OPTION[chunking.strategy]
- chunk_options = resolve_chunk_options(
- rag.addon_params, process_options=process_options
- )
- strategy_key = chunk_strategy_key(process_options)
- chunk_options[strategy_key].update(chunking.params)
- _validate_effective_chunk_overlap(chunk_options, strategy_key, chunking.strategy)
- _validate_effective_semantic_amount(chunk_options, strategy_key)
- return process_options, chunk_options
- def _validate_effective_chunk_overlap(
- chunk_options: dict, strategy_key: str, strategy_name: str
- ) -> None:
- """Reject a resolved snapshot whose overlap is >= its chunk size.
- Operates on the fully-resolved ``chunk_options`` so it catches the case
- the request model cannot: ``chunk_token_size`` supplied in the request
- while ``chunk_overlap_token_size`` is inherited from addon_params/env
- (e.g. ``chunk_token_size=50`` with the default overlap ``100``). The
- effective size is the strategy sub-dict value, falling back to the
- top-level snapshot size; the effective overlap is the sub-dict value
- (``semantic_vector`` carries none, so it is skipped).
- """
- sub = chunk_options.get(strategy_key) or {}
- # Fixed-token delimiter-only mode (split_by_character set AND
- # split_by_character_only=True) never applies overlap:
- # chunking_by_token_size only validates each delimiter segment against
- # chunk_token_size and raises on an oversize segment — the overlap field
- # is unused. Enforcing overlap < size there would wrongly 422 a valid
- # request such as paragraph splitting with a small chunk_token_size.
- # (split_by_character_only is itself a no-op when split_by_character is
- # falsy, so both must be effective for overlap to be skipped.)
- if (
- strategy_key == "fixed_token"
- and sub.get("split_by_character")
- and sub.get("split_by_character_only")
- ):
- return
- overlap = sub.get("chunk_overlap_token_size")
- if overlap is None:
- return
- size = sub.get("chunk_token_size")
- if size is None:
- size = chunk_options.get("chunk_token_size")
- if size is not None and overlap >= size:
- raise ValueError(
- f"chunking for strategy '{strategy_name}': effective "
- f"chunk_overlap_token_size ({overlap}) must be < chunk_token_size "
- f"({size}). The overlap is inherited from addon_params/env when "
- f"not set in the request; raise chunk_token_size or lower "
- f"chunk_overlap_token_size."
- )
- def _validate_effective_semantic_amount(chunk_options: dict, strategy_key: str) -> None:
- """Reject a resolved semantic_vector snapshot whose breakpoint amount
- exceeds the percentile/gradient ceiling.
- Uses the *effective* ``breakpoint_threshold_type`` from the merged
- snapshot — the request model cannot, because the type may be inherited
- from ``addon_params``/``CHUNK_V_BREAKPOINT_THRESHOLD_TYPE`` while the
- request overrides only ``breakpoint_threshold_amount``. ``percentile`` /
- ``gradient`` feed ``np.percentile`` (q must be in ``[0, 100]``);
- ``standard_deviation`` / ``interquartile`` are multipliers with no upper
- bound, so a request amount > 100 is valid for them.
- """
- if strategy_key != "semantic_vector":
- return
- sub = chunk_options.get(strategy_key) or {}
- amt = sub.get("breakpoint_threshold_amount")
- if amt is None:
- return
- kind = sub.get("breakpoint_threshold_type") or "percentile"
- if kind in ("percentile", "gradient") and amt > 100:
- raise ValueError(
- f"chunking for strategy 'semantic_vector': "
- f"breakpoint_threshold_amount ({amt}) must be within (0, 100] for "
- f"breakpoint_threshold_type '{kind}'. The type is inherited from "
- f"addon_params/env when not set in the request."
- )
- async def pipeline_index_texts(
- rag: LightRAG,
- texts: List[str],
- file_sources: List[str] = None,
- track_id: str = None,
- chunking: Optional[TextChunkingConfig] = None,
- ):
- """Index a list of texts with track_id
- Args:
- rag: LightRAG instance
- texts: The texts to index
- file_sources: Sources of the texts
- track_id: Optional tracking ID
- chunking: Optional chunking strategy + params (already validated by
- the request model); when None, default fixed-token chunking is used
- """
- if not texts:
- return
- if not file_sources or len(file_sources) != len(texts):
- raise ValueError("A valid file source is required for each text")
- normalized_file_sources = [normalize_file_path(source) for source in file_sources]
- if any(source == UNKNOWN_FILE_SOURCE for source in normalized_file_sources):
- raise ValueError("A valid file source is required for each text")
- if len(set(normalized_file_sources)) != len(normalized_file_sources):
- raise ValueError("File sources must be unique by filename")
- process_options, chunk_options = _resolve_text_chunking(chunking, rag)
- await rag.apipeline_enqueue_documents(
- input=texts,
- file_paths=normalized_file_sources,
- track_id=track_id,
- process_options=process_options,
- chunk_options=chunk_options,
- )
- await rag.apipeline_process_enqueue_documents()
- async def run_scanning_process(
- rag: LightRAG, doc_manager: DocumentManager, track_id: str = None
- ):
- """Background task to scan and index documents
- Args:
- rag: LightRAG instance
- doc_manager: DocumentManager instance
- track_id: Optional tracking ID to pass to all scanned files
- """
- # The scan endpoint set ``scanning=True`` AND
- # ``scanning_exclusive=True`` synchronously before scheduling this
- # task. ``scanning`` covers the whole lifecycle (refuses
- # overlapping scans); ``scanning_exclusive`` covers only the
- # classification phase below — we clear it before invoking
- # pipeline_index_files so concurrent uploads can land while the
- # scan-driven processing finishes. Both MUST be cleared in
- # finally so subsequent uploads / scans can proceed even if the
- # body raises. When pipeline_status is not initialised (mocked
- # test rigs), the flags were never set so there's nothing to
- # clear — track that here to skip the namespace fetch.
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- pipeline_status = None
- pipeline_status_lock = None
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- pass
- try:
- new_files = doc_manager.scan_directory_for_new_files()
- total_files = len(new_files)
- logger.info(f"Found {total_files} files to index.")
- if new_files:
- # Group canonical-equivalent files so we can prefer hint-bearing
- # variants over plain ones. Within each group sort order is
- # preserved as a deterministic tiebreaker.
- files_by_canonical_name: dict[str, list[Path]] = {}
- for file_path in sorted(
- new_files, key=lambda p: get_pinyin_sort_key(str(p))
- ):
- canonical_name = normalize_file_path(str(file_path))
- files_by_canonical_name.setdefault(canonical_name, []).append(file_path)
- unique_files: list[Path] = []
- for canonical_name, group in files_by_canonical_name.items():
- # Prefer the first file carrying a supported parser hint so
- # the user's explicit engine choice wins over plain variants;
- # otherwise fall back to the first sorted entry.
- chosen = next(
- (f for f in group if filename_parser_hint(f.name) is not None),
- group[0],
- )
- unique_files.append(chosen)
- for duplicate in group:
- if duplicate is chosen:
- continue
- warning = (
- "Skipping duplicate file in scan batch: "
- f"{duplicate.name} duplicates {chosen.name} "
- f"(canonical: {canonical_name})"
- )
- await record_scan_warning(rag, warning)
- try:
- await move_file_to_parsed_dir(duplicate)
- except Exception as move_error:
- logger.error(
- f"Failed to move duplicate scan file {duplicate.name} to {PARSED_DIR_NAME}: {move_error}"
- )
- # Partition unique_files into:
- # * processed_files — already PROCESSED, archived and skipped.
- # * resume_files — same canonical basename matches an existing
- # non-PROCESSED doc_status row (PARSING /
- # FAILED / PROCESSING / ANALYZING / PENDING).
- # These must NOT go through pipeline_enqueue_file
- # because apipeline_enqueue_documents would
- # treat the same canonical name as a duplicate
- # (returning None) and pipeline_enqueue_file
- # would then archive the source as if it were
- # a duplicate — corrupting pending-parse cases
- # that still need the source on disk. The
- # pipeline's resume logic, triggered via
- # apipeline_process_enqueue_documents, will
- # advance them based on their existing
- # doc_status row.
- # * new_files — no existing record; standard enqueue path.
- new_files: list[Path] = []
- resume_files: list[Path] = []
- processed_files: list[str] = []
- for file_path in unique_files:
- filename = file_path.name
- # Inline the canonical-basename lookup so we keep both the
- # doc_id and the data: the FAILED-without-full_docs sub-case
- # below needs the doc_id to delete the stale stub.
- basename = normalize_file_path(str(file_path))
- existing_match = (
- await rag.doc_status.get_doc_by_file_basename(basename)
- if basename != UNKNOWN_FILE_SOURCE
- else None
- )
- existing_doc_id, existing_doc_data = (
- existing_match if existing_match else (None, None)
- )
- if (
- existing_doc_data
- and get_doc_status_value(existing_doc_data)
- == DocStatus.PROCESSED.value
- ):
- # File is already PROCESSED, skip it with warning and archive it.
- processed_files.append(filename)
- warning = f"Skipping already processed file: " f"{filename}"
- await record_scan_warning(rag, warning)
- try:
- await move_file_to_parsed_dir(file_path)
- except Exception as move_error:
- logger.error(
- f"Failed to move already processed file {filename} to {PARSED_DIR_NAME}: {move_error}"
- )
- elif existing_doc_data:
- # FAILED rows recorded by apipeline_enqueue_error_documents
- # never write a full_docs entry — extraction blew up before
- # any content was stored. _validate_and_fix_document_consistency
- # preserves them for manual review and removes them from the
- # processing list, so the resume path can never advance them.
- # When the user fixes the file and re-scans we want a real
- # retry: drop the stale stub and treat the file as new so
- # the standard enqueue path re-extracts content.
- status_value = get_doc_status_value(existing_doc_data)
- if status_value == DocStatus.FAILED.value:
- full_doc = await rag.full_docs.get_by_id(existing_doc_id)
- if full_doc is None:
- try:
- await rag.doc_status.delete([existing_doc_id])
- except Exception as delete_error:
- logger.error(
- "Failed to delete stale failed-extraction "
- f"doc_status stub {existing_doc_id} "
- f"({filename}): {delete_error}"
- )
- # Fall through to resume — at worst the row
- # remains preserved (current behaviour) rather
- # than re-enqueued.
- resume_files.append(file_path)
- continue
- logger.info(
- "Retrying previously failed extraction; "
- f"removed stale doc_status stub: {filename} "
- f"(doc_id: {existing_doc_id})"
- )
- new_files.append(file_path)
- continue
- logger.info(
- "Resuming previously unfinished file from scan: "
- f"{filename} (Status: {status_value})"
- )
- resume_files.append(file_path)
- else:
- new_files.append(file_path)
- # Classification phase complete — release ``scanning_exclusive``
- # so concurrent uploads/inserts can land in doc_status while
- # the scan-driven processing finishes. ``scanning`` stays
- # True for the rest of the task lifecycle (releases in
- # finally) so the /scan endpoint still refuses overlapping
- # scans. Any per-file enqueue or duplicate detected during
- # the processing phase is handled by
- # apipeline_enqueue_documents' in-batch dedup, identical to
- # the upload-during-busy case.
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["scanning_exclusive"] = False
- # New files take the standard enqueue + process path. When at
- # least one new file is successfully enqueued, pipeline_index_files
- # internally invokes apipeline_process_enqueue_documents, which
- # selects work by doc_status state and so will also pick up any
- # resume_files in the same run.
- if new_files:
- await pipeline_index_files(
- rag,
- new_files,
- track_id,
- from_scan=True,
- )
- # Resume targets must always trigger the pipeline explicitly:
- # pipeline_index_files only runs apipeline_process_enqueue_documents
- # after at least one new file successfully enqueues, so when every
- # new file is rejected (unsupported extension, empty body, content
- # / filename duplicate, ...) the resume rows would otherwise stay
- # stuck until an unrelated indexing run. When new files DID
- # enqueue, the inner call already drained the queue and this is a
- # cheap no-op that returns "No documents to process".
- if resume_files:
- await rag.apipeline_process_enqueue_documents()
- total_active = len(new_files) + len(resume_files)
- if total_active or processed_files:
- summary_parts: list[str] = []
- if total_active:
- summary_parts.append(f"{total_active} files Processed")
- if processed_files:
- summary_parts.append(f"{len(processed_files)} skipped")
- logger.info(f"Scanning process completed: {' '.join(summary_parts)}.")
- else:
- logger.info(
- "No files to process after filtering already processed files."
- )
- else:
- # No new files to index — classification is trivially done;
- # release ``scanning_exclusive`` before driving the queue so
- # concurrent uploads can land while process_enqueue runs.
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["scanning_exclusive"] = False
- logger.info(
- "No upload file found, check if there are any documents in the queue..."
- )
- await rag.apipeline_process_enqueue_documents()
- except Exception as e:
- logger.error(f"Error during scanning process: {str(e)}")
- logger.error(traceback.format_exc())
- finally:
- # Always release both scanning flags so future uploads / scans
- # are not blocked by a crashed task. Skip when pipeline_status
- # was never initialised for this workspace (test rigs).
- if pipeline_status is not None and pipeline_status_lock is not None:
- async with pipeline_status_lock:
- pipeline_status["scanning"] = False
- pipeline_status["scanning_exclusive"] = False
- async def background_delete_documents(
- rag: LightRAG,
- doc_manager: DocumentManager,
- doc_ids: List[str],
- delete_file: bool = False,
- delete_llm_cache: bool = False,
- ):
- """Background task to delete multiple documents"""
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- total_docs = len(doc_ids)
- successful_deletions = []
- failed_deletions = []
- # The /documents/delete_document endpoint has already reserved the
- # destructive slot synchronously: ``busy=True`` and
- # ``destructive_busy=True`` were set before the client got
- # ``deletion_started``, after checking busy + scanning +
- # pending_enqueues>0 atomically. Here we only update the
- # job-info fields; the busy reservation was acquired by the
- # endpoint and is released in the finally block below.
- async with pipeline_status_lock:
- pipeline_status.update(
- {
- # Job name can not be changed, it's verified in adelete_by_doc_id()
- "job_name": f"Deleting {total_docs} Documents",
- "job_start": datetime.now().isoformat(),
- "docs": total_docs,
- "batchs": total_docs,
- "cur_batch": 0,
- "latest_message": "Starting document deletion process",
- }
- )
- # Use slice assignment to clear the list in place
- pipeline_status["history_messages"][:] = ["Starting document deletion process"]
- if delete_llm_cache:
- pipeline_status["history_messages"].append(
- "LLM cache cleanup requested for this deletion job"
- )
- try:
- # Loop through each document ID and delete them one by one
- for i, doc_id in enumerate(doc_ids, 1):
- # Check for cancellation at the start of each document deletion
- async with pipeline_status_lock:
- if pipeline_status.get("cancellation_requested", False):
- cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
- logger.info(cancel_msg)
- pipeline_status["latest_message"] = cancel_msg
- pipeline_status["history_messages"].append(cancel_msg)
- # Add remaining documents to failed list with cancellation reason
- failed_deletions.extend(
- doc_ids[i - 1 :]
- ) # i-1 because enumerate starts at 1
- break # Exit the loop, remaining documents unchanged
- start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
- logger.info(start_msg)
- pipeline_status["cur_batch"] = i
- pipeline_status["latest_message"] = start_msg
- pipeline_status["history_messages"].append(start_msg)
- file_path = "#"
- try:
- result = await rag.adelete_by_doc_id(
- doc_id, delete_llm_cache=delete_llm_cache
- )
- file_path = (
- getattr(result, "file_path", "-") if "result" in locals() else "-"
- )
- if result.status == "success":
- successful_deletions.append(doc_id)
- success_msg = (
- f"Document deleted {i}/{total_docs}: {doc_id}[{file_path}]"
- )
- logger.info(success_msg)
- async with pipeline_status_lock:
- pipeline_status["history_messages"].append(success_msg)
- # Handle file deletion if requested and source information is available
- if (
- delete_file
- and result.file_path
- and result.file_path != UNKNOWN_FILE_SOURCE
- ):
- try:
- deleted_files, file_delete_errors = (
- delete_file_variants_by_file_path(
- doc_manager.input_dir,
- result.file_path,
- )
- )
- for file_delete_error in file_delete_errors:
- logger.warning(file_delete_error)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = (
- file_delete_error
- )
- pipeline_status["history_messages"].append(
- file_delete_error
- )
- if deleted_files:
- file_delete_msg = (
- "Successfully deleted source files: "
- + ", ".join(deleted_files)
- )
- logger.info(file_delete_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = file_delete_msg
- pipeline_status["history_messages"].append(
- file_delete_msg
- )
- else:
- file_error_msg = (
- "File deletion skipped, missing or unsafe file: "
- f"{result.file_path}"
- )
- logger.warning(file_error_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = file_error_msg
- pipeline_status["history_messages"].append(
- file_error_msg
- )
- except Exception as file_error:
- file_error_msg = f"Failed to delete file {result.file_path}: {str(file_error)}"
- logger.error(file_error_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = file_error_msg
- pipeline_status["history_messages"].append(
- file_error_msg
- )
- elif delete_file:
- no_file_msg = (
- f"File deletion skipped, missing file path: {doc_id}"
- )
- logger.warning(no_file_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = no_file_msg
- pipeline_status["history_messages"].append(no_file_msg)
- else:
- failed_deletions.append(doc_id)
- error_msg = f"Failed to delete {i}/{total_docs}: {doc_id}[{file_path}] - {result.message}"
- logger.error(error_msg)
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = error_msg
- pipeline_status["history_messages"].append(error_msg)
- except Exception as e:
- failed_deletions.append(doc_id)
- error_msg = f"Error deleting document {i}/{total_docs}: {doc_id}[{file_path}] - {str(e)}"
- logger.error(error_msg)
- logger.error(traceback.format_exc())
- async with pipeline_status_lock:
- pipeline_status["latest_message"] = error_msg
- pipeline_status["history_messages"].append(error_msg)
- except Exception as e:
- error_msg = f"Critical error during batch deletion: {str(e)}"
- logger.error(error_msg)
- logger.error(traceback.format_exc())
- async with pipeline_status_lock:
- pipeline_status["history_messages"].append(error_msg)
- finally:
- # Final summary and check for pending requests
- async with pipeline_status_lock:
- pipeline_status["busy"] = False
- pipeline_status["destructive_busy"] = False
- pipeline_status["pending_requests"] = False # Reset pending requests flag
- pipeline_status["cancellation_requested"] = (
- False # Always reset cancellation flag
- )
- completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
- pipeline_status["latest_message"] = completion_msg
- pipeline_status["history_messages"].append(completion_msg)
- # Check if there are pending document indexing requests
- has_pending_request = pipeline_status.get("request_pending", False)
- # If there are pending requests, start document processing pipeline
- if has_pending_request:
- try:
- logger.info(
- "Processing pending document indexing requests after deletion"
- )
- await rag.apipeline_process_enqueue_documents()
- except Exception as e:
- logger.error(f"Error processing pending documents after deletion: {e}")
- def create_document_routes(
- rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
- ):
- # Fresh router per call — see the note above the temp_prefix constant.
- router = APIRouter(
- prefix="/documents",
- tags=["documents"],
- )
- # Create combined auth dependency for document routes
- combined_auth = get_combined_auth_dependency(api_key)
- @router.post(
- "/scan", response_model=ScanResponse, dependencies=[Depends(combined_auth)]
- )
- async def scan_for_new_documents(background_tasks: BackgroundTasks):
- """
- Trigger the scanning process for new documents.
- Refuses to start a new scan with
- ``status='scanning_skipped_pipeline_busy'`` (and does not
- schedule a background task) when any of these is set:
- - ``pipeline_status["busy"]`` — the processing loop or another
- destructive job is running.
- - ``pipeline_status["scanning"]`` — another scan is already
- running (any phase: classification or processing).
- - ``pipeline_status["pending_enqueues"] > 0`` — an /upload,
- /text or /texts endpoint has reserved a slot whose bg task
- has not yet written to doc_status; starting a scan now would
- race scan's classification reads against that pending write.
- Both ``scanning`` and ``scanning_exclusive`` are acquired
- synchronously here so a subsequent fast-follow request hits the
- guard rather than racing against the not-yet-started task.
- ``run_scanning_process`` clears ``scanning_exclusive`` once
- classification is done, allowing concurrent uploads to land
- while the scan-driven processing finishes.
- Returns:
- ScanResponse: A response object containing the scanning status and track_id
- """
- from lightrag.exceptions import PipelineNotInitializedError
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- # Generate track_id with "scan" prefix for scanning operation
- track_id = generate_track_id("scan")
- try:
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- except PipelineNotInitializedError:
- # Workspace pipeline_status not yet bootstrapped (e.g. mocked
- # test rigs). Treat as idle and allow the scan to proceed; the
- # scanning flag has nowhere to live so it is effectively skipped.
- background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
- return ScanResponse(
- status="scanning_started",
- message="Scanning process has been initiated in the background",
- track_id=track_id,
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Atomically acquire the scanning flag. Scan is the exclusive
- # writer in this contract — it reads doc_status to make
- # classification decisions (PROCESSED / resume / retry-as-new /
- # archive) and would race with concurrent writers — so refuse if:
- # * pipeline is processing (busy=True): scan + processing both
- # read/mutate doc_status; serialise.
- # * another scan is in flight (scanning=True).
- # * any /upload, /text, /texts endpoint has reserved a
- # pending-enqueue slot (see _reserve_enqueue_slot): the bg
- # task has not yet written doc_status and we would otherwise
- # race with its mid-flight write.
- async with pipeline_status_lock:
- if pipeline_status.get("busy"):
- logger.warning(
- "Scan request skipped: pipeline is busy processing documents"
- )
- return ScanResponse(
- status="scanning_skipped_pipeline_busy",
- message=(
- "Pipeline is currently busy processing documents. "
- "Wait for the running job to finish before triggering another scan."
- ),
- track_id=track_id,
- )
- if pipeline_status.get("scanning"):
- logger.warning(
- "Scan request skipped: another scan is already in progress"
- )
- return ScanResponse(
- status="scanning_skipped_pipeline_busy",
- message=(
- "Another scan is already in progress. "
- "Wait for it to finish before triggering a new one."
- ),
- track_id=track_id,
- )
- pending_enqueues = pipeline_status.get("pending_enqueues", 0)
- if pending_enqueues > 0:
- logger.warning(
- "Scan request skipped: "
- f"{pending_enqueues} pending enqueue(s) reserved by "
- "upload/insert endpoints"
- )
- return ScanResponse(
- status="scanning_skipped_pipeline_busy",
- message=(
- "Document upload/insert is being enqueued. "
- "Wait for in-flight work to complete before triggering a scan."
- ),
- track_id=track_id,
- )
- # ``scanning`` covers the whole scan task lifecycle (used by
- # this endpoint to refuse overlapping scans).
- # ``scanning_exclusive`` is True only during the
- # classification phase: run_scanning_process clears it once
- # classification is done so concurrent uploads can land
- # while the scan-driven processing finishes.
- pipeline_status["scanning"] = True
- pipeline_status["scanning_exclusive"] = True
- # Start the scanning process in the background with track_id. The
- # task is responsible for clearing both flags in its finally block.
- background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
- return ScanResponse(
- status="scanning_started",
- message="Scanning process has been initiated in the background",
- track_id=track_id,
- )
- @router.post(
- "/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
- )
- async def upload_to_input_dir(
- background_tasks: BackgroundTasks, file: UploadFile = File(...)
- ):
- """
- Upload a file to the input directory and index it.
- This API endpoint accepts a file through an HTTP POST request, checks if the
- uploaded file is of a supported type, saves it in the specified input directory,
- indexes it for retrieval, and returns a success status with relevant details.
- **File Size Limit:**
- - Configurable via `MAX_UPLOAD_SIZE` environment variable (default: 100MB)
- - Set to `None` or `0` for unlimited upload size
- - Returns HTTP 413 (Request Entity Too Large) if file exceeds limit
- **Duplicate Detection Behavior:**
- This endpoint handles two types of duplicate scenarios differently:
- 1. **Filename Duplicate (Synchronous Detection)**:
- - Detected immediately, before any file is written.
- - File name is treated as the unique document key. Both
- ``doc_status`` and the INPUT directory are checked under the
- canonical (parser-hint stripped) basename so ``abc.docx`` and
- ``abc.[native].docx`` map to the same record.
- - **HTTP 409** is returned when a same-name record already exists.
- The response detail names the conflict source ("Document
- storage already contains ..." or "Input directory already
- contains ..."). Clients must delete the existing document
- (``DELETE /documents/{doc_id}``) before re-uploading; there is
- no longer a 200 ``status="duplicated"`` soft-fail response.
- 2. **Content Duplicate (Asynchronous Detection)**:
- - Detected during background processing after content extraction
- - Returns `status="success"` with a new track_id immediately
- - The duplicate is detected later when processing the file content
- - Use `/documents/track_status/{track_id}` to check the final result:
- - Document will have `status="FAILED"`
- - `error_msg` contains "Content already exists. Original doc_id: xxx"
- - `metadata.is_duplicate=true` with reference to original document
- - `metadata.original_doc_id` points to the existing document
- - `metadata.original_track_id` shows the original upload's track_id
- **Why Different Behavior?**
- - Filename check is fast (simple lookup), done synchronously
- - Content extraction is expensive (PDF/DOCX parsing), done asynchronously
- - This design prevents blocking the client during expensive operations
- **Concurrency Constraint:**
- - The endpoint refuses with HTTP 409 only while one of the
- following exclusive-writer states is set:
- ``pipeline_status["scanning_exclusive"]`` (a scan is in its
- classification phase, reading and possibly mutating doc_status)
- or ``pipeline_status["destructive_busy"]`` (``/documents/clear``
- or per-doc delete is dropping storages / removing input files).
- Wait for the running job to finish before re-submitting.
- - ``busy=True`` from the processing loop, and a scan in its
- processing phase (``scanning=True`` with
- ``scanning_exclusive=False``), do NOT block uploads — uploads
- are accepted concurrently and the running pipeline picks them
- up via its ``request_pending`` mechanism.
- Args:
- background_tasks: FastAPI BackgroundTasks for async processing
- file (UploadFile): The file to be uploaded. It must have an allowed extension.
- Returns:
- InsertResponse: A response object containing the upload status and a message.
- - status="success": File accepted and queued for processing
- Raises:
- HTTPException: 400 unsupported file type, 409 same-name
- conflict or scan-classifying / destructive job in
- flight, 413 file too large, 500 other errors.
- """
- slot_reserved = False
- try:
- # Reject upload while a scan is in its CLASSIFICATION
- # phase or a destructive job (clear / per-doc delete) is
- # in flight, AND reserve a pending-enqueue slot so a scan
- # request that arrives before the bg task runs cannot
- # transition scanning_exclusive=True under us. Concurrent
- # processing (``busy=True``) and a scan in its processing
- # phase (``scanning=True`` with
- # ``scanning_exclusive=False``) are permitted: the running
- # loop's ``request_pending`` mechanism picks up our doc
- # after the current batch.
- slot_reserved = await _reserve_enqueue_slot(rag)
- # Sanitize filename to prevent Path Traversal attacks
- safe_filename = sanitize_filename(file.filename, doc_manager.input_dir)
- if not doc_manager.is_supported_file(safe_filename):
- raise HTTPException(
- status_code=400,
- detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
- )
- # Check file size limit (if configured)
- if (
- global_args.max_upload_size is not None
- and global_args.max_upload_size > 0
- ):
- # Safe access to file size (not available in older Starlette versions)
- file_size = getattr(file, "size", None)
- # Pre-flight size check (only if size is available)
- if file_size is not None:
- if file_size > global_args.max_upload_size:
- raise HTTPException(
- status_code=413,
- detail=f"File too large. Maximum size: {global_args.max_upload_size / 1024 / 1024:.1f}MB, uploaded: {file_size / 1024 / 1024:.1f}MB",
- )
- else:
- # If size not available, we'll check during streaming
- logger.debug(
- f"File size not available in UploadFile for {safe_filename}, will check during streaming"
- )
- file_path = doc_manager.input_dir / safe_filename
- # Strict name pre-check. Both the INPUT directory and doc_status
- # must be free of any same-canonical-basename record before we
- # accept the upload. Replacing an existing document requires an
- # explicit DELETE first; we no longer write a "duplicated" 200
- # response that silently no-ops.
- existing_doc_data = await get_existing_doc_by_file_path_candidates(
- rag.doc_status, file_path
- )
- if existing_doc_data:
- status = get_doc_status_value(existing_doc_data) or "unknown"
- raise HTTPException(
- status_code=409,
- detail=(
- f"Document storage already contains '{safe_filename}' "
- f"(Status: {status}). Delete the existing record before re-uploading."
- ),
- )
- # INPUT directory check, using canonical parser-hint names.
- # Fast path: exact filename match avoids iterdir on large input directories.
- canonical_filename = normalize_file_path(safe_filename)
- if file_path.exists():
- existing_input_file: Path | None = file_path
- else:
- existing_input_file = find_existing_file_by_file_path(
- doc_manager.input_dir, canonical_filename
- )
- if existing_input_file:
- raise HTTPException(
- status_code=409,
- detail=(
- f"Input directory already contains a file with the same "
- f"canonical basename ('{existing_input_file.name}'). "
- f"Remove or rename it before re-uploading."
- ),
- )
- # Async streaming write with size check
- bytes_written = 0
- chunk_size = 1024 * 1024 # 1MB chunks
- needs_cleanup = False
- async with aiofiles.open(file_path, "wb") as out_file:
- while True:
- # Read chunk from upload stream
- chunk = await file.read(chunk_size)
- if not chunk:
- break
- # Check size limit during streaming (if not checked before)
- if (
- global_args.max_upload_size is not None
- and global_args.max_upload_size > 0
- ):
- bytes_written += len(chunk)
- if bytes_written > global_args.max_upload_size:
- needs_cleanup = True
- break
- # Write chunk to file
- await out_file.write(chunk)
- # Cleanup after file is closed
- if needs_cleanup:
- try:
- file_path.unlink()
- except Exception as cleanup_error:
- logger.error(
- f"Error cleaning up oversized file {safe_filename}: {cleanup_error}"
- )
- raise HTTPException(
- status_code=413,
- detail=f"File too large. Maximum size: {global_args.max_upload_size / 1024 / 1024:.1f}MB, uploaded: {bytes_written / 1024 / 1024:.1f}MB",
- )
- track_id = generate_track_id("upload")
- # Bg task: enqueue + trigger processing, then release the slot.
- # ``pipeline_index_file`` does both: it calls
- # ``pipeline_enqueue_file`` (writes doc_status / full_docs) and
- # then ``apipeline_process_enqueue_documents``. The latter is
- # safe to invoke even when the loop is already busy — it
- # collapses to a ``request_pending=True`` nudge and returns,
- # so concurrent uploads/inserts cooperate via the running
- # loop's request_pending mechanism.
- async def _indexing_task():
- try:
- await pipeline_index_file(rag, file_path, track_id)
- finally:
- await _release_enqueue_slot(rag)
- background_tasks.add_task(_indexing_task)
- # Ownership of the slot transferred to the bg task — the
- # finally block below must NOT release it again.
- slot_reserved = False
- return InsertResponse(
- status="success",
- message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
- track_id=track_id,
- )
- except HTTPException:
- # Re-raise HTTP exceptions (400, 413, etc.)
- raise
- except Exception as e:
- logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- finally:
- # If we reserved a slot but never scheduled the bg task
- # (e.g. early validation rejection or streaming-write
- # failure), release here. No drain coordination needed —
- # any sibling bg task triggers its own processing pass.
- if slot_reserved:
- await _release_enqueue_slot(rag)
- @router.post(
- "/text", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
- )
- async def insert_text(
- request: InsertTextRequest, background_tasks: BackgroundTasks
- ):
- """
- Insert text into the RAG system.
- This endpoint allows you to insert text data into the RAG system for later retrieval
- and use in generating responses.
- **Concurrency Constraint:**
- - Refuses with HTTP 409 only while
- ``pipeline_status["scanning_exclusive"]`` (a scan is in its
- classification phase) or ``pipeline_status["destructive_busy"]``
- (clear / per-doc delete is in flight) is set. ``busy=True``
- from the processing loop, and a scan in its processing phase,
- do NOT block — the running pipeline picks up the new doc via
- ``request_pending``.
- Args:
- request (InsertTextRequest): The request body containing the text to be inserted.
- background_tasks: FastAPI BackgroundTasks for async processing
- Returns:
- InsertResponse: A response object containing the status of the operation.
- Raises:
- HTTPException: 400 invalid file_source, 409 same-name conflict
- or scan/destructive job in flight, 500 other errors.
- """
- slot_reserved = False
- try:
- # Reject text insertion while a scan is in progress AND reserve
- # a pending-enqueue slot — see /upload for the rationale.
- slot_reserved = await _reserve_enqueue_slot(rag)
- # Check if file_source already exists in doc_status storage
- if not is_valid_file_source(request.file_source):
- raise HTTPException(
- status_code=400,
- detail="A valid file_source is required for text insertion",
- )
- normalized_file_source = normalize_file_path(request.file_source)
- existing_doc_data = await get_existing_doc_by_file_path_candidates(
- rag.doc_status, normalized_file_source
- )
- if existing_doc_data:
- status = get_doc_status_value(existing_doc_data) or "unknown"
- raise HTTPException(
- status_code=409,
- detail=(
- f"Document storage already contains '{normalized_file_source}' "
- f"(Status: {status}). Delete the existing record before re-inserting."
- ),
- )
- # Resolve + validate chunking synchronously so an invalid
- # effective config (e.g. chunk_token_size below the inherited
- # overlap) fails with HTTP 422 here, before any background work is
- # scheduled. pipeline_index_texts re-resolves from the same
- # addon_params inside the task.
- try:
- _resolve_text_chunking(request.chunking, rag)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=str(exc))
- # Generate track_id for text insertion
- track_id = generate_track_id("insert")
- async def _indexing_task():
- try:
- await pipeline_index_texts(
- rag,
- [request.text],
- file_sources=[normalized_file_source],
- track_id=track_id,
- chunking=request.chunking,
- )
- finally:
- await _release_enqueue_slot(rag)
- background_tasks.add_task(_indexing_task)
- slot_reserved = False
- return InsertResponse(
- status="success",
- message="Text successfully received. Processing will continue in background.",
- track_id=track_id,
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error /documents/text: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- finally:
- if slot_reserved:
- await _release_enqueue_slot(rag)
- @router.post(
- "/texts",
- response_model=InsertResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def insert_texts(
- request: InsertTextsRequest, background_tasks: BackgroundTasks
- ):
- """
- Insert multiple texts into the RAG system.
- This endpoint allows you to insert multiple text entries into the RAG system
- in a single request.
- **Concurrency Constraint:**
- - Refuses with HTTP 409 only while
- ``pipeline_status["scanning_exclusive"]`` (a scan is in its
- classification phase) or ``pipeline_status["destructive_busy"]``
- (clear / per-doc delete is in flight) is set. ``busy=True``
- from the processing loop, and a scan in its processing phase,
- do NOT block — the running pipeline picks up the new docs via
- ``request_pending``.
- Args:
- request (InsertTextsRequest): The request body containing the list of texts.
- background_tasks: FastAPI BackgroundTasks for async processing
- Returns:
- InsertResponse: A response object containing the status of the operation.
- Raises:
- HTTPException: 400 invalid file_sources, 409 same-name
- conflict or scan/destructive job in flight, 500 other
- errors.
- """
- slot_reserved = False
- try:
- # Reject batch text insertion while a scan is in progress AND
- # reserve a pending-enqueue slot — see /upload for the rationale.
- slot_reserved = await _reserve_enqueue_slot(rag)
- # Check if any file_sources already exist in doc_status storage
- if not request.file_sources or len(request.file_sources) != len(
- request.texts
- ):
- raise HTTPException(
- status_code=400,
- detail="A valid file_source is required for each text",
- )
- normalized_file_sources = [
- normalize_file_path(file_source) for file_source in request.file_sources
- ]
- if any(
- file_source == UNKNOWN_FILE_SOURCE
- for file_source in normalized_file_sources
- ):
- raise HTTPException(
- status_code=400,
- detail="A valid file_source is required for each text",
- )
- if len(set(normalized_file_sources)) != len(normalized_file_sources):
- raise HTTPException(
- status_code=400,
- detail="file_sources must be unique by filename",
- )
- for file_source in normalized_file_sources:
- existing_doc_data = await get_existing_doc_by_file_path_candidates(
- rag.doc_status, file_source
- )
- if existing_doc_data:
- status = get_doc_status_value(existing_doc_data) or "unknown"
- raise HTTPException(
- status_code=409,
- detail=(
- f"Document storage already contains '{file_source}' "
- f"(Status: {status}). Delete the existing record before re-inserting."
- ),
- )
- # Resolve + validate the shared chunking synchronously so an
- # invalid effective config (e.g. chunk_token_size below the
- # inherited overlap) fails with HTTP 422 here, before any
- # background work is scheduled. pipeline_index_texts re-resolves
- # from the same addon_params inside the task.
- try:
- _resolve_text_chunking(request.chunking, rag)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=str(exc))
- # Generate track_id for texts insertion
- track_id = generate_track_id("insert")
- async def _indexing_task():
- try:
- await pipeline_index_texts(
- rag,
- request.texts,
- file_sources=normalized_file_sources,
- track_id=track_id,
- chunking=request.chunking,
- )
- finally:
- await _release_enqueue_slot(rag)
- background_tasks.add_task(_indexing_task)
- slot_reserved = False
- return InsertResponse(
- status="success",
- message="Texts successfully received. Processing will continue in background.",
- track_id=track_id,
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error /documents/texts: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- finally:
- if slot_reserved:
- await _release_enqueue_slot(rag)
- @router.delete(
- "", response_model=ClearDocumentsResponse, dependencies=[Depends(combined_auth)]
- )
- async def clear_documents():
- """
- Clear all documents from the RAG system.
- This endpoint deletes all documents, entities, relationships, and files from the system.
- It uses the storage drop methods to properly clean up all data and removes all files
- from the input directory.
- **Concurrency Constraint:**
- - Atomically reserves the destructive slot (sets ``busy=True``
- and ``destructive_busy=True``) before dropping anything.
- Refuses with ``status="busy"`` when ANY of these is set:
- ``pipeline_status["busy"]`` (processing loop or another
- destructive job in flight), ``pipeline_status["scanning"]``
- (a scan is anywhere in its lifecycle), or
- ``pipeline_status["pending_enqueues"] > 0`` (an /upload,
- /text or /texts has reserved a slot whose bg task has not
- yet written to doc_status).
- Returns:
- ClearDocumentsResponse: A response object containing the status and message.
- - status="success": All documents and files were successfully cleared.
- - status="partial_success": Document clear job exit with some errors.
- - status="busy": Operation could not be completed because another
- writer (busy / scanning / pending enqueue) holds the pipeline.
- - status="fail": All storage drop operations failed, with message
- - message: Detailed information about the operation results, including counts
- of deleted files and any errors encountered.
- Raises:
- HTTPException: Raised when a serious error occurs during the clearing process,
- with status code 500 and error details in the detail field.
- """
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- # Get pipeline status and lock
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Atomically reserve the destructive slot. Checks busy +
- # scanning + pending_enqueues>0 in a single critical section
- # before flipping busy=True and destructive_busy=True together.
- # ``destructive_busy`` blocks reservation and the enqueue
- # last-line guard: clear is about to drop every storage and
- # remove every input file, so a concurrent upload accepted in
- # this window would write to storages mid-drop and silently
- # lose the document.
- acquired, reason = await _acquire_destructive_busy(rag)
- if not acquired:
- return ClearDocumentsResponse(status="busy", message=reason)
- async with pipeline_status_lock:
- pipeline_status.update(
- {
- "job_name": "Clearing Documents",
- "job_start": datetime.now().isoformat(),
- "docs": 0,
- "batchs": 0,
- "cur_batch": 0,
- "request_pending": False, # Clear any previous request
- "latest_message": "Starting document clearing process",
- }
- )
- # Cleaning history_messages without breaking it as a shared list object
- del pipeline_status["history_messages"][:]
- pipeline_status["history_messages"].append(
- "Starting document clearing process"
- )
- try:
- # Use drop method to clear all data
- drop_tasks = []
- storages = [
- rag.text_chunks,
- rag.full_docs,
- rag.full_entities,
- rag.full_relations,
- rag.entity_chunks,
- rag.relation_chunks,
- rag.entities_vdb,
- rag.relationships_vdb,
- rag.chunks_vdb,
- rag.chunk_entity_relation_graph,
- rag.doc_status,
- ]
- # Log storage drop start
- if "history_messages" in pipeline_status:
- pipeline_status["history_messages"].append(
- "Starting to drop storage components"
- )
- for storage in storages:
- if storage is not None:
- drop_tasks.append(storage.drop())
- # Wait for all drop tasks to complete
- drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
- # Check for errors and log results
- errors = []
- storage_success_count = 0
- storage_error_count = 0
- for i, result in enumerate(drop_results):
- storage_name = storages[i].__class__.__name__
- if isinstance(result, Exception):
- error_msg = f"Error dropping {storage_name}: {str(result)}"
- errors.append(error_msg)
- logger.error(error_msg)
- storage_error_count += 1
- else:
- namespace = storages[i].namespace
- workspace = storages[i].workspace
- logger.info(
- f"Successfully dropped {storage_name}: {workspace}/{namespace}"
- )
- storage_success_count += 1
- # Log storage drop results
- if "history_messages" in pipeline_status:
- if storage_error_count > 0:
- pipeline_status["history_messages"].append(
- f"Dropped {storage_success_count} storage components with {storage_error_count} errors"
- )
- else:
- pipeline_status["history_messages"].append(
- f"Successfully dropped all {storage_success_count} storage components"
- )
- # If all storage operations failed, return error status and don't proceed with file deletion
- if storage_success_count == 0 and storage_error_count > 0:
- error_message = "All storage drop operations failed. Aborting document clearing process."
- logger.error(error_message)
- if "history_messages" in pipeline_status:
- pipeline_status["history_messages"].append(error_message)
- return ClearDocumentsResponse(status="fail", message=error_message)
- # Log file deletion start
- if "history_messages" in pipeline_status:
- pipeline_status["history_messages"].append(
- "Starting to delete files in input directory"
- )
- # Delete only files in the current directory, preserve files in subdirectories
- deleted_files_count = 0
- file_errors_count = 0
- for file_path in doc_manager.input_dir.glob("*"):
- if file_path.is_file():
- try:
- file_path.unlink()
- deleted_files_count += 1
- except Exception as e:
- logger.error(f"Error deleting file {file_path}: {str(e)}")
- file_errors_count += 1
- # Log file deletion results
- if "history_messages" in pipeline_status:
- if file_errors_count > 0:
- pipeline_status["history_messages"].append(
- f"Deleted {deleted_files_count} files with {file_errors_count} errors"
- )
- errors.append(f"Failed to delete {file_errors_count} files")
- else:
- pipeline_status["history_messages"].append(
- f"Successfully deleted {deleted_files_count} files"
- )
- # Prepare final result message
- final_message = ""
- if errors:
- final_message = f"Cleared documents with some errors. Deleted {deleted_files_count} files."
- status = "partial_success"
- else:
- final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
- status = "success"
- # Log final result
- if "history_messages" in pipeline_status:
- pipeline_status["history_messages"].append(final_message)
- # Return response based on results
- return ClearDocumentsResponse(status=status, message=final_message)
- except Exception as e:
- error_msg = f"Error clearing documents: {str(e)}"
- logger.error(error_msg)
- logger.error(traceback.format_exc())
- if "history_messages" in pipeline_status:
- pipeline_status["history_messages"].append(error_msg)
- raise HTTPException(status_code=500, detail=str(e))
- finally:
- # Reset busy + destructive_busy after completion so the next
- # reservation / scan sees an idle pipeline.
- async with pipeline_status_lock:
- pipeline_status["busy"] = False
- pipeline_status["destructive_busy"] = False
- completion_msg = "Document clearing process completed"
- pipeline_status["latest_message"] = completion_msg
- if "history_messages" in pipeline_status:
- pipeline_status["history_messages"].append(completion_msg)
- @router.get(
- "/pipeline_status",
- dependencies=[Depends(combined_auth)],
- response_model=PipelineStatusResponse,
- )
- async def get_pipeline_status() -> PipelineStatusResponse:
- """
- Get the current status of the document indexing pipeline.
- This endpoint returns information about the current state of the document processing pipeline,
- including the processing status, progress information, and history messages.
- Returns:
- PipelineStatusResponse: A response object containing:
- - autoscanned (bool): Whether auto-scan has started
- - busy (bool): Whether the pipeline is currently busy
- - job_name (str): Current job name (e.g., indexing files/indexing texts)
- - job_start (str, optional): Job start time as ISO format string
- - docs (int): Total number of documents to be indexed
- - batchs (int): Number of batches for processing documents
- - cur_batch (int): Current processing batch
- - request_pending (bool): Flag for pending request for processing
- - latest_message (str): Latest message from pipeline processing
- - history_messages (List[str], optional): List of history messages (limited to latest 1000 entries,
- with truncation message if more than 1000 messages exist)
- Raises:
- HTTPException: If an error occurs while retrieving pipeline status (500)
- """
- try:
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- get_all_update_flags_status,
- )
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- # Get update flags status for all namespaces
- update_status = await get_all_update_flags_status(workspace=rag.workspace)
- # Convert MutableBoolean objects to regular boolean values
- processed_update_status = {}
- for namespace, flags in update_status.items():
- processed_flags = []
- for flag in flags:
- # Handle both multiprocess and single process cases
- if hasattr(flag, "value"):
- processed_flags.append(bool(flag.value))
- else:
- processed_flags.append(bool(flag))
- processed_update_status[namespace] = processed_flags
- async with pipeline_status_lock:
- # Convert to regular dict if it's a Manager.dict
- status_dict = dict(pipeline_status)
- # Add processed update_status to the status dictionary
- status_dict["update_status"] = processed_update_status
- # Convert history_messages to a regular list if it's a Manager.list
- # and limit to latest 1000 entries with truncation message if needed
- if "history_messages" in status_dict:
- history_list = list(status_dict["history_messages"])
- total_count = len(history_list)
- if total_count > 1000:
- # Calculate truncated message count
- truncated_count = total_count - 1000
- # Take only the latest 1000 messages
- latest_messages = history_list[-1000:]
- # Add truncation message at the beginning
- truncation_message = (
- f"[Truncated history messages: {truncated_count}/{total_count}]"
- )
- status_dict["history_messages"] = [
- truncation_message
- ] + latest_messages
- else:
- # No truncation needed, return all messages
- status_dict["history_messages"] = history_list
- # Ensure job_start is properly formatted as a string with timezone information
- if "job_start" in status_dict and status_dict["job_start"]:
- # Use format_datetime to ensure consistent formatting
- status_dict["job_start"] = format_datetime(status_dict["job_start"])
- return PipelineStatusResponse(**status_dict)
- except Exception as e:
- logger.error(f"Error getting pipeline status: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- # TODO: Deprecated, use /documents/paginated instead
- @router.get(
- "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
- )
- async def documents() -> DocsStatusesResponse:
- """
- Get the status of all documents in the system. This endpoint is deprecated; use /documents/paginated instead.
- To prevent excessive resource consumption, a maximum of 1,000 records is returned.
- This endpoint retrieves the current status of all documents, grouped by their
- processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are
- limited to 1000 total documents with fair distribution across all statuses.
- Returns:
- DocsStatusesResponse: A response object containing a dictionary where keys are
- DocStatus values and values are lists of DocStatusResponse
- objects representing documents in each status category.
- Maximum 1000 documents total will be returned.
- Raises:
- HTTPException: If an error occurs while retrieving document statuses (500).
- """
- try:
- statuses = (
- DocStatus.PENDING,
- DocStatus.PARSING,
- DocStatus.ANALYZING,
- DocStatus.PROCESSING,
- DocStatus.PREPROCESSED,
- DocStatus.PROCESSED,
- DocStatus.FAILED,
- )
- tasks = [rag.get_docs_by_status(status) for status in statuses]
- results: List[Dict[str, DocProcessingStatus]] = await asyncio.gather(*tasks)
- response = DocsStatusesResponse()
- total_documents = 0
- max_documents = 1000
- # Convert results to lists for easier processing
- status_documents = []
- for idx, result in enumerate(results):
- status = statuses[idx]
- docs_list = []
- for doc_id, doc_status in result.items():
- docs_list.append((doc_id, doc_status))
- status_documents.append((status, docs_list))
- # Fair distribution: round-robin across statuses
- status_indices = [0] * len(
- status_documents
- ) # Track current index for each status
- current_status_idx = 0
- while total_documents < max_documents:
- # Check if we have any documents left to process
- has_remaining = False
- for status_idx, (status, docs_list) in enumerate(status_documents):
- if status_indices[status_idx] < len(docs_list):
- has_remaining = True
- break
- if not has_remaining:
- break
- # Try to get a document from the current status
- status, docs_list = status_documents[current_status_idx]
- current_index = status_indices[current_status_idx]
- if current_index < len(docs_list):
- doc_id, doc_status = docs_list[current_index]
- if status not in response.statuses:
- response.statuses[status] = []
- response.statuses[status].append(
- DocStatusResponse(
- id=doc_id,
- content_summary=doc_status.content_summary,
- content_length=doc_status.content_length,
- status=doc_status.status,
- created_at=format_datetime(doc_status.created_at),
- updated_at=format_datetime(doc_status.updated_at),
- track_id=doc_status.track_id,
- chunks_count=doc_status.chunks_count,
- error_msg=doc_status.error_msg,
- metadata=doc_status.metadata,
- file_path=normalize_file_path(doc_status.file_path),
- )
- )
- status_indices[current_status_idx] += 1
- total_documents += 1
- # Move to next status (round-robin)
- current_status_idx = (current_status_idx + 1) % len(status_documents)
- return response
- except Exception as e:
- logger.error(f"Error GET /documents: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- class DeleteDocByIdResponse(BaseModel):
- """Response model for single document deletion operation."""
- status: Literal["deletion_started", "busy", "not_allowed"] = Field(
- description="Status of the deletion operation"
- )
- message: str = Field(description="Message describing the operation result")
- doc_id: str = Field(description="The ID of the document to delete")
- @router.delete(
- "/delete_document",
- response_model=DeleteDocByIdResponse,
- dependencies=[Depends(combined_auth)],
- summary="Delete a document and all its associated data by its ID.",
- )
- async def delete_document(
- delete_request: DeleteDocRequest,
- background_tasks: BackgroundTasks,
- ) -> DeleteDocByIdResponse:
- """
- Delete documents and all their associated data by their IDs using background processing.
- Deletes specific documents and all their associated data, including their status,
- text chunks, vector embeddings, and any related graph data. When requested,
- cached LLM extraction responses are removed after graph deletion/rebuild completes.
- The deletion process runs in the background to avoid blocking the client connection.
- This operation is irreversible and will interact with the pipeline status.
- **Concurrency Constraint:**
- - Atomically reserves the destructive slot (sets ``busy=True``
- and ``destructive_busy=True``) **synchronously** before
- returning ``deletion_started``, so a /scan or /upload that
- arrives before the bg task runs cannot race the delete.
- Refuses with ``status="busy"`` when ANY of these is set:
- ``pipeline_status["busy"]``, ``pipeline_status["scanning"]``,
- or ``pipeline_status["pending_enqueues"] > 0``.
- Args:
- delete_request (DeleteDocRequest): The request containing the document IDs and deletion options.
- background_tasks: FastAPI BackgroundTasks for async processing
- Returns:
- DeleteDocByIdResponse: The result of the deletion operation.
- - status="deletion_started": The document deletion has been initiated in the background.
- - status="busy": Another writer (busy / scanning / pending enqueue) holds the
- pipeline; nothing scheduled, retry after the running job finishes.
- Raises:
- HTTPException:
- - 500: If an unexpected internal error occurs during initialization.
- """
- doc_ids = delete_request.doc_ids
- slot_acquired = False
- try:
- # Atomically reserve the destructive slot BEFORE returning
- # ``deletion_started``. Without this, the bg task would set
- # destructive_busy only when it later runs — leaving a
- # window where a /scan or /upload can race the delete after
- # the client has already received success. The check
- # covers busy + scanning + pending_enqueues>0 in a single
- # critical section.
- acquired, reason = await _acquire_destructive_busy(rag)
- if not acquired:
- return DeleteDocByIdResponse(
- status="busy",
- message=reason or "Cannot delete documents while pipeline is busy",
- doc_id=", ".join(doc_ids),
- )
- slot_acquired = True
- background_tasks.add_task(
- background_delete_documents,
- rag,
- doc_manager,
- doc_ids,
- delete_request.delete_file,
- delete_request.delete_llm_cache,
- )
- # Ownership of the slot transferred to the bg task — it
- # will release in its finally. The endpoint's finally
- # below must NOT release it again.
- slot_acquired = False
- return DeleteDocByIdResponse(
- status="deletion_started",
- message=f"Document deletion for {len(doc_ids)} documents has been initiated. Processing will continue in background.",
- doc_id=", ".join(doc_ids),
- )
- except Exception as e:
- error_msg = f"Error initiating document deletion for {delete_request.doc_ids}: {str(e)}"
- logger.error(error_msg)
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=error_msg)
- finally:
- # If we reserved but never scheduled the bg task (e.g. an
- # unexpected error between acquire and add_task), release
- # so the next reservation / scan / enqueue can proceed.
- if slot_acquired:
- await _release_destructive_busy(rag)
- @router.post(
- "/clear_cache",
- response_model=ClearCacheResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def clear_cache(request: ClearCacheRequest):
- """
- Clear all cache data from the LLM response cache storage.
- This endpoint clears all cached LLM responses regardless of mode.
- The request body is accepted for API compatibility but is ignored.
- Args:
- request (ClearCacheRequest): The request body (ignored for compatibility).
- Returns:
- ClearCacheResponse: A response object containing the status and message.
- Raises:
- HTTPException: If an error occurs during cache clearing (500).
- """
- try:
- # Call the aclear_cache method (no modes parameter)
- await rag.aclear_cache()
- # Prepare success message
- message = "Successfully cleared all cache"
- return ClearCacheResponse(status="success", message=message)
- except Exception as e:
- logger.error(f"Error clearing cache: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete(
- "/delete_entity",
- response_model=DeletionResult,
- dependencies=[Depends(combined_auth)],
- )
- async def delete_entity(request: DeleteEntityRequest):
- """
- Delete an entity and all its relationships from the knowledge graph.
- Args:
- request (DeleteEntityRequest): The request body containing the entity name.
- Returns:
- DeletionResult: An object containing the outcome of the deletion process.
- Raises:
- HTTPException: If the entity is not found (404) or an error occurs (500).
- """
- try:
- await check_pipeline_busy_or_raise(rag)
- result = await rag.adelete_by_entity(entity_name=request.entity_name)
- if result.status == "not_found":
- raise HTTPException(status_code=404, detail=result.message)
- if result.status == "fail":
- raise HTTPException(status_code=500, detail=result.message)
- # Set doc_id to empty string since this is an entity operation, not document
- result.doc_id = ""
- return result
- except HTTPException:
- raise
- except Exception as e:
- error_msg = f"Error deleting entity '{request.entity_name}': {str(e)}"
- logger.error(error_msg)
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=error_msg)
- @router.delete(
- "/delete_relation",
- response_model=DeletionResult,
- dependencies=[Depends(combined_auth)],
- )
- async def delete_relation(request: DeleteRelationRequest):
- """
- Delete a relationship between two entities from the knowledge graph.
- Args:
- request (DeleteRelationRequest): The request body containing the source and target entity names.
- Returns:
- DeletionResult: An object containing the outcome of the deletion process.
- Raises:
- HTTPException: If the relation is not found (404) or an error occurs (500).
- """
- try:
- await check_pipeline_busy_or_raise(rag)
- result = await rag.adelete_by_relation(
- source_entity=request.source_entity,
- target_entity=request.target_entity,
- )
- if result.status == "not_found":
- raise HTTPException(status_code=404, detail=result.message)
- if result.status == "fail":
- raise HTTPException(status_code=500, detail=result.message)
- # Set doc_id to empty string since this is a relation operation, not document
- result.doc_id = ""
- return result
- except HTTPException:
- raise
- except Exception as e:
- error_msg = f"Error deleting relation from '{request.source_entity}' to '{request.target_entity}': {str(e)}"
- logger.error(error_msg)
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=error_msg)
- @router.get(
- "/track_status/{track_id}",
- response_model=TrackStatusResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def get_track_status(track_id: str) -> TrackStatusResponse:
- """
- Get the processing status of documents by tracking ID.
- This endpoint retrieves all documents associated with a specific tracking ID,
- allowing users to monitor the processing progress of their uploaded files or inserted texts.
- Args:
- track_id (str): The tracking ID returned from upload, text, or texts endpoints
- Returns:
- TrackStatusResponse: A response object containing:
- - track_id: The tracking ID
- - documents: List of documents associated with this track_id
- - total_count: Total number of documents for this track_id
- Raises:
- HTTPException: If track_id is invalid (400) or an error occurs (500).
- """
- try:
- # Validate track_id
- if not track_id or not track_id.strip():
- raise HTTPException(status_code=400, detail="Track ID cannot be empty")
- track_id = track_id.strip()
- # Get documents by track_id
- docs_by_track_id = await rag.aget_docs_by_track_id(track_id)
- # Convert to response format
- documents = []
- status_summary = {}
- for doc_id, doc_status in docs_by_track_id.items():
- documents.append(
- DocStatusResponse(
- id=doc_id,
- content_summary=doc_status.content_summary,
- content_length=doc_status.content_length,
- status=doc_status.status,
- created_at=format_datetime(doc_status.created_at),
- updated_at=format_datetime(doc_status.updated_at),
- track_id=doc_status.track_id,
- chunks_count=doc_status.chunks_count,
- error_msg=doc_status.error_msg,
- metadata=doc_status.metadata,
- file_path=normalize_file_path(doc_status.file_path),
- )
- )
- # Build status summary
- # Handle both DocStatus enum and string cases for robust deserialization
- status_key = str(doc_status.status)
- status_summary[status_key] = status_summary.get(status_key, 0) + 1
- return TrackStatusResponse(
- track_id=track_id,
- documents=documents,
- total_count=len(documents),
- status_summary=status_summary,
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error getting track status for {track_id}: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- @router.post(
- "/paginated",
- response_model=PaginatedDocsResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def get_documents_paginated(
- request: DocumentsRequest,
- ) -> PaginatedDocsResponse:
- """
- Get documents with pagination support.
- This endpoint retrieves documents with pagination, filtering, and sorting capabilities.
- It provides better performance for large document collections by loading only the
- requested page of data.
- Args:
- request (DocumentsRequest): The request body containing pagination parameters
- Returns:
- PaginatedDocsResponse: A response object containing:
- - documents: List of documents for the current page
- - pagination: Pagination information (page, total_count, etc.)
- - status_counts: Count of documents by status for all documents
- Raises:
- HTTPException: If an error occurs while retrieving documents (500).
- """
- trace_id = uuid4().hex[:8]
- request_start = time.perf_counter()
- status_filter_value = (
- request.status_filter.value if request.status_filter is not None else None
- )
- workspace = getattr(rag, "workspace", None)
- performance_timing_log(
- "[documents/paginated][%s] Request start workspace=%s status_filter=%s page=%s page_size=%s sort_field=%s sort_direction=%s",
- trace_id,
- workspace,
- status_filter_value,
- request.page,
- request.page_size,
- request.sort_field,
- request.sort_direction,
- )
- try:
- async def _timed_call(operation_name: str, operation):
- operation_start = time.perf_counter()
- performance_timing_log(
- "[documents/paginated][%s] %s started",
- trace_id,
- operation_name,
- )
- try:
- result = await operation
- except Exception:
- elapsed = time.perf_counter() - operation_start
- performance_timing_log(
- "[documents/paginated][%s] %s failed after %.4fs",
- trace_id,
- operation_name,
- elapsed,
- )
- raise
- elapsed = time.perf_counter() - operation_start
- performance_timing_log(
- "[documents/paginated][%s] %s completed in %.4fs",
- trace_id,
- operation_name,
- elapsed,
- )
- return result
- query_task_create_start = time.perf_counter()
- docs_task = asyncio.create_task(
- _timed_call(
- "get_docs_paginated",
- rag.doc_status.get_docs_paginated(
- status_filter=request.status_filter,
- status_filters=request.status_filters,
- page=request.page,
- page_size=request.page_size,
- sort_field=request.sort_field,
- sort_direction=request.sort_direction,
- ),
- )
- )
- status_counts_task = asyncio.create_task(
- _timed_call(
- "get_all_status_counts",
- rag.doc_status.get_all_status_counts(),
- )
- )
- query_task_create_elapsed = time.perf_counter() - query_task_create_start
- performance_timing_log(
- "[documents/paginated][%s] Query tasks created in %.4fs",
- trace_id,
- query_task_create_elapsed,
- )
- query_await_start = time.perf_counter()
- (documents_with_ids, total_count), status_counts = await asyncio.gather(
- docs_task, status_counts_task
- )
- query_await_elapsed = time.perf_counter() - query_await_start
- performance_timing_log(
- "[documents/paginated][%s] Query tasks awaited in %.4fs",
- trace_id,
- query_await_elapsed,
- )
- # Convert documents to response format
- response_assembly_start = time.perf_counter()
- doc_responses = []
- for doc_id, doc in documents_with_ids:
- doc_responses.append(
- DocStatusResponse(
- id=doc_id,
- content_summary=doc.content_summary,
- content_length=doc.content_length,
- status=doc.status,
- created_at=format_datetime(doc.created_at),
- updated_at=format_datetime(doc.updated_at),
- track_id=doc.track_id,
- chunks_count=doc.chunks_count,
- error_msg=doc.error_msg,
- metadata=doc.metadata,
- file_path=normalize_file_path(doc.file_path),
- )
- )
- # Calculate pagination info
- total_pages = (total_count + request.page_size - 1) // request.page_size
- has_next = request.page < total_pages
- has_prev = request.page > 1
- pagination = PaginationInfo(
- page=request.page,
- page_size=request.page_size,
- total_count=total_count,
- total_pages=total_pages,
- has_next=has_next,
- has_prev=has_prev,
- )
- response = PaginatedDocsResponse(
- documents=doc_responses,
- pagination=pagination,
- status_counts=status_counts,
- )
- response_assembly_elapsed = time.perf_counter() - response_assembly_start
- total_elapsed = time.perf_counter() - request_start
- performance_timing_log(
- "[documents/paginated][%s] Response assembled in %.4fs",
- trace_id,
- response_assembly_elapsed,
- )
- performance_timing_log(
- "[documents/paginated][%s] Request completed in %.4fs returned_rows=%s total_count=%s status_count_keys=%s",
- trace_id,
- total_elapsed,
- len(doc_responses),
- total_count,
- sorted(status_counts.keys()),
- )
- return response
- except Exception as e:
- total_elapsed = time.perf_counter() - request_start
- performance_timing_log(
- "[documents/paginated][%s] Request failed after %.4fs",
- trace_id,
- total_elapsed,
- )
- logger.error(f"Error getting paginated documents: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- @router.get(
- "/status_counts",
- response_model=StatusCountsResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def get_document_status_counts() -> StatusCountsResponse:
- """
- Get counts of documents by status.
- This endpoint retrieves the count of documents in each processing status
- (PENDING, PROCESSING, PROCESSED, FAILED) for all documents in the system.
- Returns:
- StatusCountsResponse: A response object containing status counts
- Raises:
- HTTPException: If an error occurs while retrieving status counts (500).
- """
- try:
- status_counts = await rag.doc_status.get_all_status_counts()
- return StatusCountsResponse(status_counts=status_counts)
- except Exception as e:
- logger.error(f"Error getting document status counts: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- @router.post(
- "/reprocess_failed",
- response_model=ReprocessResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def reprocess_failed_documents(background_tasks: BackgroundTasks):
- """
- Reprocess failed and pending documents.
- This endpoint triggers the document processing pipeline which automatically
- picks up and reprocesses documents in the following statuses:
- - FAILED: Documents that failed during previous processing attempts
- - PENDING: Documents waiting to be processed
- - PROCESSING: Documents with abnormally terminated processing (e.g., server crashes)
- This is useful for recovering from server crashes, network errors, LLM service
- outages, or other temporary failures that caused document processing to fail.
- The processing happens in the background and can be monitored by checking the
- pipeline status. The reprocessed documents retain their original track_id from
- initial upload, so use their original track_id to monitor progress.
- Returns:
- ReprocessResponse: Response with status and message.
- track_id is always empty string because reprocessed documents retain
- their original track_id from initial upload.
- Raises:
- HTTPException: If an error occurs while initiating reprocessing (500).
- """
- try:
- # Start the reprocessing in the background
- # Note: Reprocessed documents retain their original track_id from initial upload
- background_tasks.add_task(rag.apipeline_process_enqueue_documents)
- logger.info("Reprocessing of failed documents initiated")
- return ReprocessResponse(
- status="reprocessing_started",
- message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
- )
- except Exception as e:
- logger.error(f"Error initiating reprocessing of failed documents: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- @router.post(
- "/cancel_pipeline",
- response_model=CancelPipelineResponse,
- dependencies=[Depends(combined_auth)],
- )
- async def cancel_pipeline():
- """
- Request cancellation of the currently running pipeline.
- This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
- 1. Check this flag at key processing points
- 2. Stop processing new documents
- 3. Cancel all running document processing tasks
- 4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
- The cancellation is graceful and ensures data consistency. Documents that have
- completed processing will remain in PROCESSED status.
- Returns:
- CancelPipelineResponse: Response with status and message
- - status="cancellation_requested": Cancellation flag has been set
- - status="not_busy": Pipeline is not currently running
- Raises:
- HTTPException: If an error occurs while setting cancellation flag (500).
- """
- try:
- from lightrag.kg.shared_storage import (
- get_namespace_data,
- get_namespace_lock,
- )
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- if not pipeline_status.get("busy", False):
- return CancelPipelineResponse(
- status="not_busy",
- message="Pipeline is not currently running. No cancellation needed.",
- )
- # Set cancellation flag
- pipeline_status["cancellation_requested"] = True
- cancel_msg = "Pipeline cancellation requested by user"
- logger.info(cancel_msg)
- pipeline_status["latest_message"] = cancel_msg
- pipeline_status["history_messages"].append(cancel_msg)
- return CancelPipelineResponse(
- status="cancellation_requested",
- message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
- )
- except Exception as e:
- logger.error(f"Error requesting pipeline cancellation: {str(e)}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- return router
|