document_routes.py 186 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574
  1. """
  2. This module contains all document-related routes for the LightRAG API.
  3. """
  4. import asyncio
  5. import re
  6. import shutil
  7. import time
  8. from uuid import uuid4
  9. from lightrag.utils import logger, get_pinyin_sort_key, performance_timing_log
  10. import aiofiles
  11. import traceback
  12. from datetime import datetime, timezone
  13. from pathlib import Path
  14. from typing import Dict, List, Optional, Any, Literal
  15. from io import BytesIO
  16. from fastapi import (
  17. APIRouter,
  18. BackgroundTasks,
  19. Depends,
  20. File,
  21. HTTPException,
  22. UploadFile,
  23. )
  24. from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
  25. from lightrag import LightRAG
  26. from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
  27. from lightrag.constants import (
  28. FULL_DOCS_FORMAT_PENDING_PARSE,
  29. PARSER_ENGINE_LEGACY,
  30. PARSED_ARTIFACT_DIR_SUFFIXES,
  31. PARSED_DIR_NAME,
  32. PROCESS_OPTION_CHUNK_FIXED,
  33. PROCESS_OPTION_CHUNK_PARAGRAH,
  34. PROCESS_OPTION_CHUNK_RECURSIVE,
  35. PROCESS_OPTION_CHUNK_VECTOR,
  36. )
  37. from lightrag.parser.routing import (
  38. FilenameParserHintError,
  39. canonicalize_parser_hinted_basename,
  40. chunk_strategy_key,
  41. filename_parser_hint,
  42. resolve_chunk_options,
  43. resolve_file_parser_directives,
  44. )
  45. from lightrag.utils import (
  46. generate_track_id,
  47. move_file_to_parsed_dir,
  48. )
  49. from lightrag.api.utils_api import get_combined_auth_dependency
  50. from ..config import global_args
  51. # Function to format datetime to ISO format string with timezone information
  52. def format_datetime(dt: Any) -> Optional[str]:
  53. """Format datetime to ISO format string with timezone information
  54. Args:
  55. dt: Datetime object, string, or None
  56. Returns:
  57. ISO format string with timezone information, or None if input is None
  58. """
  59. if dt is None:
  60. return None
  61. if isinstance(dt, str):
  62. return dt
  63. # Check if datetime object has timezone information
  64. if isinstance(dt, datetime):
  65. # If datetime object has no timezone info (naive datetime), add UTC timezone
  66. if dt.tzinfo is None:
  67. dt = dt.replace(tzinfo=timezone.utc)
  68. # Return ISO format string with timezone information
  69. return dt.isoformat()
  70. # NOTE: the APIRouter instance is created INSIDE `create_document_routes`
  71. # (not at module scope). A module-level router is shared across processes,
  72. # and re-running the factory — which the test suite does to validate
  73. # create_app for different `--api-prefix` values — would re-decorate the
  74. # same router each time, accumulating duplicate routes and triggering
  75. # FastAPI's "Duplicate Operation ID" warnings.
  76. # Temporary file prefix
  77. temp_prefix = "__tmp__"
  78. UNKNOWN_FILE_SOURCE = "unknown_source"
  79. LEGACY_EMPTY_FILE_PATH_SENTINELS = {"", "no-file-path"}
  80. ARCHIVED_FILE_SUFFIX_RE = re.compile(r"_(?:\d{3}|\d{10,})$")
  81. def normalize_file_path(file_path: str | None) -> str:
  82. """Normalize missing document sources to a single non-null sentinel."""
  83. if file_path is None:
  84. return UNKNOWN_FILE_SOURCE
  85. normalized = file_path.strip()
  86. if normalized in LEGACY_EMPTY_FILE_PATH_SENTINELS:
  87. return UNKNOWN_FILE_SOURCE
  88. return canonicalize_parser_hinted_basename(normalized) or UNKNOWN_FILE_SOURCE
  89. def is_valid_file_source(file_source: str | None) -> bool:
  90. if file_source is None:
  91. return False
  92. return normalize_file_path(file_source) != UNKNOWN_FILE_SOURCE
  93. def sanitize_filename(filename: str, input_dir: Path) -> str:
  94. """
  95. Sanitize uploaded filename to prevent Path Traversal attacks.
  96. Args:
  97. filename: The original filename from the upload
  98. input_dir: The target input directory
  99. Returns:
  100. str: Sanitized filename that is safe to use
  101. Raises:
  102. HTTPException: If the filename is unsafe or invalid
  103. """
  104. # Basic validation
  105. if not filename or not filename.strip():
  106. raise HTTPException(status_code=400, detail="Filename cannot be empty")
  107. # Remove path separators and traversal sequences
  108. clean_name = filename.replace("/", "").replace("\\", "")
  109. clean_name = clean_name.replace("..", "")
  110. # Remove control characters and null bytes
  111. clean_name = "".join(c for c in clean_name if ord(c) >= 32 and c != "\x7f")
  112. # Remove leading/trailing whitespace and dots
  113. clean_name = clean_name.strip().strip(".")
  114. # Check if anything is left after sanitization
  115. if not clean_name:
  116. raise HTTPException(
  117. status_code=400, detail="Invalid filename after sanitization"
  118. )
  119. # Verify the final path stays within the input directory
  120. try:
  121. final_path = (input_dir / clean_name).resolve()
  122. if not final_path.is_relative_to(input_dir.resolve()):
  123. raise HTTPException(status_code=400, detail="Unsafe filename detected")
  124. except (OSError, ValueError):
  125. raise HTTPException(status_code=400, detail="Invalid filename")
  126. return clean_name
  127. class ScanResponse(BaseModel):
  128. """Response model for document scanning operation
  129. Attributes:
  130. status: Status of the scanning operation. ``scanning_started`` when
  131. a new background scan has been scheduled;
  132. ``scanning_skipped_pipeline_busy`` when the request was rejected
  133. because indexing or another scan is already running.
  134. message: Optional message with additional details
  135. track_id: Tracking ID for monitoring scanning progress
  136. """
  137. status: Literal["scanning_started", "scanning_skipped_pipeline_busy"] = Field(
  138. description="Status of the scanning operation"
  139. )
  140. message: Optional[str] = Field(
  141. default=None, description="Additional details about the scanning operation"
  142. )
  143. track_id: str = Field(description="Tracking ID for monitoring scanning progress")
  144. model_config = ConfigDict(
  145. json_schema_extra={
  146. "example": {
  147. "status": "scanning_started",
  148. "message": "Scanning process has been initiated in the background",
  149. "track_id": "scan_20250729_170612_abc123",
  150. }
  151. }
  152. )
  153. class ReprocessResponse(BaseModel):
  154. """Response model for reprocessing failed documents operation
  155. Attributes:
  156. status: Status of the reprocessing operation
  157. message: Message describing the operation result
  158. track_id: Always empty string. Reprocessed documents retain their original track_id.
  159. """
  160. status: Literal["reprocessing_started"] = Field(
  161. description="Status of the reprocessing operation"
  162. )
  163. message: str = Field(description="Human-readable message describing the operation")
  164. track_id: str = Field(
  165. default="",
  166. description="Always empty string. Reprocessed documents retain their original track_id from initial upload.",
  167. )
  168. model_config = ConfigDict(
  169. json_schema_extra={
  170. "example": {
  171. "status": "reprocessing_started",
  172. "message": "Reprocessing of failed documents has been initiated in background",
  173. "track_id": "",
  174. }
  175. }
  176. )
  177. class CancelPipelineResponse(BaseModel):
  178. """Response model for pipeline cancellation operation
  179. Attributes:
  180. status: Status of the cancellation request
  181. message: Message describing the operation result
  182. """
  183. status: Literal["cancellation_requested", "not_busy"] = Field(
  184. description="Status of the cancellation request"
  185. )
  186. message: str = Field(description="Human-readable message describing the operation")
  187. model_config = ConfigDict(
  188. json_schema_extra={
  189. "example": {
  190. "status": "cancellation_requested",
  191. "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
  192. }
  193. }
  194. )
  195. TextChunkingStrategy = Literal[
  196. "fixed_token",
  197. "recursive_character",
  198. "semantic_vector",
  199. "paragraph_semantic",
  200. ]
  201. class _StrictChunkParams(BaseModel):
  202. """Base for per-strategy chunking params.
  203. ``strict=True`` rejects the Pydantic-v2 lax coercions that would
  204. otherwise let malformed requests through and fail later in the
  205. background chunker: bool-as-int (``true`` -> 1), numeric strings
  206. (``"5"`` -> 5), float-as-int. ``extra="forbid"`` turns unknown keys
  207. into a 422 (replacing a hand-rolled allow-list). ``chunk_token_size``
  208. is shared by every strategy; ``None`` means "not supplied — fall back
  209. to ``addon_params``/env default at process time".
  210. """
  211. model_config = ConfigDict(extra="forbid", strict=True)
  212. chunk_token_size: Optional[int] = Field(default=None, ge=1)
  213. class _OverlapChunkParams(_StrictChunkParams):
  214. chunk_overlap_token_size: Optional[int] = Field(default=None, ge=0)
  215. @model_validator(mode="after")
  216. def _overlap_lt_size(self) -> "_OverlapChunkParams":
  217. # Only enforceable when BOTH are explicit; when chunk_token_size
  218. # is None the effective size is resolved from addon_params/env at
  219. # process time and can't be compared against here.
  220. if (
  221. self.chunk_token_size is not None
  222. and self.chunk_overlap_token_size is not None
  223. and self.chunk_overlap_token_size >= self.chunk_token_size
  224. ):
  225. raise ValueError("chunk_overlap_token_size must be < chunk_token_size")
  226. return self
  227. class FixedTokenChunkParams(_OverlapChunkParams):
  228. split_by_character: Optional[str] = None
  229. split_by_character_only: Optional[bool] = None
  230. class RecursiveCharacterChunkParams(_OverlapChunkParams):
  231. separators: Optional[list[str]] = None
  232. class ParagraphSemanticChunkParams(_OverlapChunkParams):
  233. pass
  234. class SemanticVectorChunkParams(_StrictChunkParams):
  235. # Enum verified against the installed langchain_experimental
  236. # (text_splitter.py ``BreakpointThresholdType``), not from memory.
  237. breakpoint_threshold_type: Optional[
  238. Literal["percentile", "standard_deviation", "interquartile", "gradient"]
  239. ] = None
  240. # A strict ``float`` field still accepts an ``int`` (e.g. JSON ``95``) and
  241. # widens it losslessly to ``95.0`` — strict only rejects ``str`` / ``bool``
  242. # here, which is exactly what we want. Do NOT relax strict (that would let
  243. # numeric strings through) or switch to ``int | float`` (that would stop
  244. # normalizing ints to float). Locked by tests in test_document_routes_chunking.
  245. breakpoint_threshold_amount: Optional[float] = None
  246. buffer_size: Optional[int] = Field(default=None, ge=1)
  247. sentence_split_regex: Optional[str] = None
  248. @field_validator("sentence_split_regex")
  249. @classmethod
  250. def _valid_sentence_split_regex(cls, v: Optional[str]) -> Optional[str]:
  251. # The value is fed to LangChain's SemanticChunker and compiled during
  252. # split_text. A malformed pattern (e.g. "(") would only blow up in the
  253. # background, so compile it here to reject synchronously (HTTP 422).
  254. if v is None:
  255. return v
  256. try:
  257. re.compile(v)
  258. except re.error as exc:
  259. raise ValueError(
  260. f"sentence_split_regex is not a valid regular expression: {exc}"
  261. ) from exc
  262. return v
  263. @model_validator(mode="after")
  264. def _amount_in_range(self) -> "SemanticVectorChunkParams":
  265. amt = self.breakpoint_threshold_amount
  266. if amt is None:
  267. return self
  268. # ``> 0`` is type-independent (every threshold type wants a positive
  269. # magnitude), so it is safe to enforce at parse time.
  270. if amt <= 0:
  271. raise ValueError("breakpoint_threshold_amount must be > 0")
  272. # The ``(0, 100]`` ceiling is percentile/gradient-specific (those feed
  273. # np.percentile, which requires q in [0, 100]). It depends on the
  274. # threshold TYPE, so only enforce it here when the type is supplied in
  275. # the SAME request. When the type is omitted, the effective type is
  276. # resolved from addon_params/env later — assuming "percentile" here
  277. # would wrongly 422 a partial override that inherits
  278. # standard_deviation/interquartile (which allow amounts > 100). The
  279. # ceiling against the merged type is applied by
  280. # ``_validate_effective_semantic_amount`` in ``_resolve_text_chunking``.
  281. if self.breakpoint_threshold_type in ("percentile", "gradient") and amt > 100:
  282. raise ValueError(
  283. "breakpoint_threshold_amount must be within (0, 100] "
  284. "for percentile/gradient"
  285. )
  286. return self
  287. _CHUNKING_PARAMS_MODEL: dict[str, type[_StrictChunkParams]] = {
  288. "fixed_token": FixedTokenChunkParams,
  289. "recursive_character": RecursiveCharacterChunkParams,
  290. "semantic_vector": SemanticVectorChunkParams,
  291. "paragraph_semantic": ParagraphSemanticChunkParams,
  292. }
  293. class TextChunkingConfig(BaseModel):
  294. """Chunking strategy + strategy-specific params for a text insert.
  295. Validation is delegated to the per-strategy typed model so unknown
  296. keys, wrong types, and out-of-range values all raise synchronously
  297. during request parsing (HTTP 422) — never later in the background
  298. indexing task, where the HTTP response has already been sent.
  299. """
  300. model_config = ConfigDict(extra="forbid")
  301. strategy: TextChunkingStrategy = "fixed_token"
  302. params: Dict[str, Any] = Field(default_factory=dict)
  303. @model_validator(mode="after")
  304. def _validate_params(self) -> "TextChunkingConfig":
  305. typed = _CHUNKING_PARAMS_MODEL[self.strategy].model_validate(self.params)
  306. # Normalize down to exactly the keys the caller supplied with a real
  307. # value (validated + coerced) so the enqueue-time merge overrides only
  308. # what was set. ``exclude_none`` additionally drops explicit nulls:
  309. # every param field means "inherit the addon_params/env default" when
  310. # None, so an explicit ``"chunk_token_size": null`` must NOT be merged
  311. # over the resolved default — otherwise the route would 200 and the
  312. # background chunker would do ``int(None)`` and fail the document.
  313. self.params = typed.model_dump(exclude_unset=True, exclude_none=True)
  314. return self
  315. class InsertTextRequest(BaseModel):
  316. """Request model for inserting a single text document
  317. Attributes:
  318. text: The text content to be inserted into the RAG system
  319. file_source: Source of the text (optional)
  320. chunking: Optional chunking strategy + params; omit to keep the
  321. default fixed-token behavior and addon_params defaults.
  322. """
  323. text: str = Field(
  324. min_length=1,
  325. description="The text to insert",
  326. )
  327. file_source: Optional[str] = Field(
  328. default=None, min_length=0, description="File Source"
  329. )
  330. chunking: Optional[TextChunkingConfig] = Field(
  331. default=None,
  332. description="Chunking strategy and params; omit for default fixed-token chunking",
  333. )
  334. @field_validator("text", mode="after")
  335. @classmethod
  336. def strip_text_after(cls, text: str) -> str:
  337. return text.strip()
  338. @field_validator("file_source", mode="before")
  339. @classmethod
  340. def normalize_source_before(cls, file_source: Optional[str]) -> str:
  341. return normalize_file_path(file_source)
  342. model_config = ConfigDict(
  343. json_schema_extra={
  344. "example": {
  345. "text": "This is a sample text to be inserted into the RAG system.",
  346. "file_source": "Source of the text (optional)",
  347. "chunking": {
  348. "strategy": "fixed_token",
  349. "params": {
  350. "chunk_token_size": 1200,
  351. "chunk_overlap_token_size": 100,
  352. "split_by_character": "\n\n",
  353. "split_by_character_only": True,
  354. },
  355. },
  356. }
  357. }
  358. )
  359. class InsertTextsRequest(BaseModel):
  360. """Request model for inserting multiple text documents
  361. Attributes:
  362. texts: List of text contents to be inserted into the RAG system
  363. file_sources: Sources of the texts (optional)
  364. """
  365. texts: list[str] = Field(
  366. min_length=1,
  367. description="The texts to insert",
  368. )
  369. file_sources: Optional[list[str]] = Field(
  370. default=None, min_length=0, description="Sources of the texts"
  371. )
  372. chunking: Optional[TextChunkingConfig] = Field(
  373. default=None,
  374. description="Shared chunking strategy and params for all texts; omit for default fixed-token chunking",
  375. )
  376. @field_validator("texts", mode="after")
  377. @classmethod
  378. def strip_texts_after(cls, texts: list[str]) -> list[str]:
  379. return [text.strip() for text in texts]
  380. @field_validator("file_sources", mode="before")
  381. @classmethod
  382. def normalize_sources_before(
  383. cls, file_sources: Optional[list[str]]
  384. ) -> Optional[list[str]]:
  385. if file_sources is None:
  386. return None
  387. return [normalize_file_path(file_source) for file_source in file_sources]
  388. model_config = ConfigDict(
  389. json_schema_extra={
  390. "example": {
  391. "texts": [
  392. "This is the first text to be inserted.",
  393. "This is the second text to be inserted.",
  394. ],
  395. "file_sources": [
  396. "First file source (optional)",
  397. ],
  398. "chunking": {
  399. "strategy": "recursive_character",
  400. "params": {"chunk_token_size": 1000},
  401. },
  402. }
  403. }
  404. )
  405. class InsertResponse(BaseModel):
  406. """Response model for document insertion operations
  407. Attributes:
  408. status: Status of the operation (success, partial_success, failure).
  409. Same-name conflicts are rejected with HTTP 409 rather than being
  410. reported as a "duplicated" 200 response, so this field never
  411. takes that value any more.
  412. message: Detailed message describing the operation result
  413. track_id: Tracking ID for monitoring processing status
  414. """
  415. status: Literal["success", "partial_success", "failure"] = Field(
  416. description="Status of the operation"
  417. )
  418. message: str = Field(description="Message describing the operation result")
  419. track_id: str = Field(description="Tracking ID for monitoring processing status")
  420. model_config = ConfigDict(
  421. json_schema_extra={
  422. "example": {
  423. "status": "success",
  424. "message": "File 'document.pdf' uploaded successfully. Processing will continue in background.",
  425. "track_id": "upload_20250729_170612_abc123",
  426. }
  427. }
  428. )
  429. class ClearDocumentsResponse(BaseModel):
  430. """Response model for document clearing operation
  431. Attributes:
  432. status: Status of the clear operation
  433. message: Detailed message describing the operation result
  434. """
  435. status: Literal["success", "partial_success", "busy", "fail"] = Field(
  436. description="Status of the clear operation"
  437. )
  438. message: str = Field(description="Message describing the operation result")
  439. model_config = ConfigDict(
  440. json_schema_extra={
  441. "example": {
  442. "status": "success",
  443. "message": "All documents cleared successfully. Deleted 15 files.",
  444. }
  445. }
  446. )
  447. class ClearCacheRequest(BaseModel):
  448. """Request model for clearing cache
  449. This model is kept for API compatibility but no longer accepts any parameters.
  450. All cache will be cleared regardless of the request content.
  451. """
  452. model_config = ConfigDict(json_schema_extra={"example": {}})
  453. class ClearCacheResponse(BaseModel):
  454. """Response model for cache clearing operation
  455. Attributes:
  456. status: Status of the clear operation
  457. message: Detailed message describing the operation result
  458. """
  459. status: Literal["success", "fail"] = Field(
  460. description="Status of the clear operation"
  461. )
  462. message: str = Field(description="Message describing the operation result")
  463. model_config = ConfigDict(
  464. json_schema_extra={
  465. "example": {
  466. "status": "success",
  467. "message": "Successfully cleared cache for modes: ['default', 'naive']",
  468. }
  469. }
  470. )
  471. """Response model for document status
  472. Attributes:
  473. id: Document identifier
  474. content_summary: Summary of document content
  475. content_length: Length of document content
  476. status: Current processing status
  477. created_at: Creation timestamp (ISO format string)
  478. updated_at: Last update timestamp (ISO format string)
  479. chunks_count: Number of chunks (optional)
  480. error: Error message if any (optional)
  481. metadata: Additional metadata (optional)
  482. file_path: Path to the document file
  483. """
  484. class DeleteDocRequest(BaseModel):
  485. doc_ids: List[str] = Field(..., description="The IDs of the documents to delete.")
  486. delete_file: bool = Field(
  487. default=False,
  488. description="Whether to delete the corresponding file in the upload directory.",
  489. )
  490. delete_llm_cache: bool = Field(
  491. default=False,
  492. description="Whether to delete cached LLM extraction results for the documents.",
  493. )
  494. @field_validator("doc_ids", mode="after")
  495. @classmethod
  496. def validate_doc_ids(cls, doc_ids: List[str]) -> List[str]:
  497. if not doc_ids:
  498. raise ValueError("Document IDs list cannot be empty")
  499. validated_ids = []
  500. for doc_id in doc_ids:
  501. if not doc_id or not doc_id.strip():
  502. raise ValueError("Document ID cannot be empty")
  503. validated_ids.append(doc_id.strip())
  504. # Check for duplicates
  505. if len(validated_ids) != len(set(validated_ids)):
  506. raise ValueError("Document IDs must be unique")
  507. return validated_ids
  508. class DeleteEntityRequest(BaseModel):
  509. entity_name: str = Field(..., description="The name of the entity to delete.")
  510. @field_validator("entity_name", mode="after")
  511. @classmethod
  512. def validate_entity_name(cls, entity_name: str) -> str:
  513. if not entity_name or not entity_name.strip():
  514. raise ValueError("Entity name cannot be empty")
  515. return entity_name.strip()
  516. class DeleteRelationRequest(BaseModel):
  517. source_entity: str = Field(..., description="The name of the source entity.")
  518. target_entity: str = Field(..., description="The name of the target entity.")
  519. @field_validator("source_entity", "target_entity", mode="after")
  520. @classmethod
  521. def validate_entity_names(cls, entity_name: str) -> str:
  522. if not entity_name or not entity_name.strip():
  523. raise ValueError("Entity name cannot be empty")
  524. return entity_name.strip()
  525. class DocStatusResponse(BaseModel):
  526. id: str = Field(description="Document identifier")
  527. content_summary: str = Field(description="Summary of document content")
  528. content_length: int = Field(description="Length of document content in characters")
  529. status: DocStatus = Field(description="Current processing status")
  530. created_at: str = Field(description="Creation timestamp (ISO format string)")
  531. updated_at: str = Field(description="Last update timestamp (ISO format string)")
  532. track_id: Optional[str] = Field(
  533. default=None, description="Tracking ID for monitoring progress"
  534. )
  535. chunks_count: Optional[int] = Field(
  536. default=None, description="Number of chunks the document was split into"
  537. )
  538. error_msg: Optional[str] = Field(
  539. default=None, description="Error message if processing failed"
  540. )
  541. metadata: Optional[dict[str, Any]] = Field(
  542. default=None, description="Additional metadata about the document"
  543. )
  544. file_path: str = Field(description="Path to the document file")
  545. model_config = ConfigDict(
  546. json_schema_extra={
  547. "example": {
  548. "id": "doc_123456",
  549. "content_summary": "Research paper on machine learning",
  550. "content_length": 15240,
  551. "status": "processed",
  552. "created_at": "2025-03-31T12:34:56",
  553. "updated_at": "2025-03-31T12:35:30",
  554. "track_id": "upload_20250729_170612_abc123",
  555. "chunks_count": 12,
  556. "error": None,
  557. "metadata": {"author": "John Doe", "year": 2025},
  558. "file_path": "research_paper.pdf",
  559. }
  560. }
  561. )
  562. class DocsStatusesResponse(BaseModel):
  563. """Response model for document statuses
  564. Attributes:
  565. statuses: Dictionary mapping document status to lists of document status responses
  566. """
  567. statuses: Dict[DocStatus, List[DocStatusResponse]] = Field(
  568. default_factory=dict,
  569. description="Dictionary mapping document status to lists of document status responses",
  570. )
  571. model_config = ConfigDict(
  572. json_schema_extra={
  573. "example": {
  574. "statuses": {
  575. "PENDING": [
  576. {
  577. "id": "doc_123",
  578. "content_summary": "Pending document",
  579. "content_length": 5000,
  580. "status": "pending",
  581. "created_at": "2025-03-31T10:00:00",
  582. "updated_at": "2025-03-31T10:00:00",
  583. "track_id": "upload_20250331_100000_abc123",
  584. "chunks_count": None,
  585. "error": None,
  586. "metadata": None,
  587. "file_path": "pending_doc.pdf",
  588. }
  589. ],
  590. "PREPROCESSED": [
  591. {
  592. "id": "doc_789",
  593. "content_summary": "Document pending final indexing",
  594. "content_length": 7200,
  595. "status": "preprocessed",
  596. "created_at": "2025-03-31T09:30:00",
  597. "updated_at": "2025-03-31T09:35:00",
  598. "track_id": "upload_20250331_093000_xyz789",
  599. "chunks_count": 10,
  600. "error": None,
  601. "metadata": None,
  602. "file_path": "preprocessed_doc.pdf",
  603. }
  604. ],
  605. "PROCESSED": [
  606. {
  607. "id": "doc_456",
  608. "content_summary": "Processed document",
  609. "content_length": 8000,
  610. "status": "processed",
  611. "created_at": "2025-03-31T09:00:00",
  612. "updated_at": "2025-03-31T09:05:00",
  613. "track_id": "insert_20250331_090000_def456",
  614. "chunks_count": 8,
  615. "error": None,
  616. "metadata": {"author": "John Doe"},
  617. "file_path": "processed_doc.pdf",
  618. }
  619. ],
  620. }
  621. }
  622. }
  623. )
  624. class TrackStatusResponse(BaseModel):
  625. """Response model for tracking document processing status by track_id
  626. Attributes:
  627. track_id: The tracking ID
  628. documents: List of documents associated with this track_id
  629. total_count: Total number of documents for this track_id
  630. status_summary: Count of documents by status
  631. """
  632. track_id: str = Field(description="The tracking ID")
  633. documents: List[DocStatusResponse] = Field(
  634. description="List of documents associated with this track_id"
  635. )
  636. total_count: int = Field(description="Total number of documents for this track_id")
  637. status_summary: Dict[str, int] = Field(description="Count of documents by status")
  638. model_config = ConfigDict(
  639. json_schema_extra={
  640. "example": {
  641. "track_id": "upload_20250729_170612_abc123",
  642. "documents": [
  643. {
  644. "id": "doc_123456",
  645. "content_summary": "Research paper on machine learning",
  646. "content_length": 15240,
  647. "status": "PROCESSED",
  648. "created_at": "2025-03-31T12:34:56",
  649. "updated_at": "2025-03-31T12:35:30",
  650. "track_id": "upload_20250729_170612_abc123",
  651. "chunks_count": 12,
  652. "error": None,
  653. "metadata": {"author": "John Doe", "year": 2025},
  654. "file_path": "research_paper.pdf",
  655. }
  656. ],
  657. "total_count": 1,
  658. "status_summary": {"PROCESSED": 1},
  659. }
  660. }
  661. )
  662. class DocumentsRequest(BaseModel):
  663. """Request model for paginated document queries
  664. Attributes:
  665. status_filter: Legacy single-status filter, ignored when status_filters is set
  666. status_filters: Filter by multiple document statuses, None for all statuses
  667. page: Page number (1-based)
  668. page_size: Number of documents per page (10-200)
  669. sort_field: Field to sort by ('created_at', 'updated_at', 'id', 'file_path')
  670. sort_direction: Sort direction ('asc' or 'desc')
  671. """
  672. status_filter: Optional[DocStatus] = Field(
  673. default=None,
  674. description="Legacy single-status filter, ignored when status_filters is set",
  675. )
  676. status_filters: Optional[List[DocStatus]] = Field(
  677. default=None, description="Filter by multiple document statuses"
  678. )
  679. page: int = Field(default=1, ge=1, description="Page number (1-based)")
  680. page_size: int = Field(
  681. default=50, ge=10, le=200, description="Number of documents per page (10-200)"
  682. )
  683. sort_field: Literal["created_at", "updated_at", "id", "file_path"] = Field(
  684. default="updated_at", description="Field to sort by"
  685. )
  686. sort_direction: Literal["asc", "desc"] = Field(
  687. default="desc", description="Sort direction"
  688. )
  689. model_config = ConfigDict(
  690. json_schema_extra={
  691. "example": {
  692. "status_filters": ["PREPROCESSED", "PARSING", "ANALYZING"],
  693. "page": 1,
  694. "page_size": 50,
  695. "sort_field": "updated_at",
  696. "sort_direction": "desc",
  697. }
  698. }
  699. )
  700. class PaginationInfo(BaseModel):
  701. """Pagination information
  702. Attributes:
  703. page: Current page number
  704. page_size: Number of items per page
  705. total_count: Total number of items
  706. total_pages: Total number of pages
  707. has_next: Whether there is a next page
  708. has_prev: Whether there is a previous page
  709. """
  710. page: int = Field(description="Current page number")
  711. page_size: int = Field(description="Number of items per page")
  712. total_count: int = Field(description="Total number of items")
  713. total_pages: int = Field(description="Total number of pages")
  714. has_next: bool = Field(description="Whether there is a next page")
  715. has_prev: bool = Field(description="Whether there is a previous page")
  716. model_config = ConfigDict(
  717. json_schema_extra={
  718. "example": {
  719. "page": 1,
  720. "page_size": 50,
  721. "total_count": 150,
  722. "total_pages": 3,
  723. "has_next": True,
  724. "has_prev": False,
  725. }
  726. }
  727. )
  728. class PaginatedDocsResponse(BaseModel):
  729. """Response model for paginated document queries
  730. Attributes:
  731. documents: List of documents for the current page
  732. pagination: Pagination information
  733. status_counts: Count of documents by status for all documents
  734. """
  735. documents: List[DocStatusResponse] = Field(
  736. description="List of documents for the current page"
  737. )
  738. pagination: PaginationInfo = Field(description="Pagination information")
  739. status_counts: Dict[str, int] = Field(
  740. description="Count of documents by status for all documents"
  741. )
  742. model_config = ConfigDict(
  743. json_schema_extra={
  744. "example": {
  745. "documents": [
  746. {
  747. "id": "doc_123456",
  748. "content_summary": "Research paper on machine learning",
  749. "content_length": 15240,
  750. "status": "PROCESSED",
  751. "created_at": "2025-03-31T12:34:56",
  752. "updated_at": "2025-03-31T12:35:30",
  753. "track_id": "upload_20250729_170612_abc123",
  754. "chunks_count": 12,
  755. "error_msg": None,
  756. "metadata": {"author": "John Doe", "year": 2025},
  757. "file_path": "research_paper.pdf",
  758. }
  759. ],
  760. "pagination": {
  761. "page": 1,
  762. "page_size": 50,
  763. "total_count": 150,
  764. "total_pages": 3,
  765. "has_next": True,
  766. "has_prev": False,
  767. },
  768. "status_counts": {
  769. "PENDING": 10,
  770. "PROCESSING": 5,
  771. "PREPROCESSED": 5,
  772. "PROCESSED": 130,
  773. "FAILED": 5,
  774. },
  775. }
  776. }
  777. )
  778. class StatusCountsResponse(BaseModel):
  779. """Response model for document status counts
  780. Attributes:
  781. status_counts: Count of documents by status
  782. """
  783. status_counts: Dict[str, int] = Field(description="Count of documents by status")
  784. model_config = ConfigDict(
  785. json_schema_extra={
  786. "example": {
  787. "status_counts": {
  788. "PENDING": 10,
  789. "PROCESSING": 5,
  790. "PREPROCESSED": 5,
  791. "PROCESSED": 130,
  792. "FAILED": 5,
  793. }
  794. }
  795. }
  796. )
  797. class PipelineStatusResponse(BaseModel):
  798. """Response model for pipeline status
  799. Attributes:
  800. autoscanned: Whether auto-scan has started
  801. busy: Whether the pipeline is currently busy
  802. job_name: Current job name (e.g., indexing files/indexing texts)
  803. job_start: Job start time as ISO format string with timezone (optional)
  804. docs: Total number of documents to be indexed
  805. batchs: Number of batches for processing documents
  806. cur_batch: Current processing batch
  807. request_pending: Flag for pending request for processing
  808. latest_message: Latest message from pipeline processing
  809. history_messages: List of history messages
  810. update_status: Status of update flags for all namespaces
  811. """
  812. autoscanned: bool = False
  813. busy: bool = False
  814. job_name: str = "Default Job"
  815. job_start: Optional[str] = None
  816. docs: int = 0
  817. batchs: int = 0
  818. cur_batch: int = 0
  819. request_pending: bool = False
  820. latest_message: str = ""
  821. history_messages: Optional[List[str]] = None
  822. update_status: Optional[dict] = None
  823. @field_validator("job_start", mode="before")
  824. @classmethod
  825. def parse_job_start(cls, value):
  826. """Process datetime and return as ISO format string with timezone"""
  827. return format_datetime(value)
  828. model_config = ConfigDict(extra="allow")
  829. class DocumentManager:
  830. def __init__(
  831. self,
  832. input_dir: str,
  833. workspace: str = "", # New parameter for workspace isolation
  834. supported_extensions: tuple = (
  835. ".txt",
  836. ".md",
  837. ".mdx", # MDX (Markdown + JSX)
  838. ".pdf",
  839. ".docx",
  840. ".pptx",
  841. ".xlsx",
  842. ".rtf", # Rich Text Format
  843. ".odt", # OpenDocument Text
  844. ".tex", # LaTeX
  845. ".epub", # Electronic Publication
  846. ".html", # HyperText Markup Language
  847. ".htm", # HyperText Markup Language
  848. ".csv", # Comma-Separated Values
  849. ".json", # JavaScript Object Notation
  850. ".xml", # eXtensible Markup Language
  851. ".yaml", # YAML Ain't Markup Language
  852. ".yml", # YAML
  853. ".log", # Log files
  854. ".conf", # Configuration files
  855. ".ini", # Initialization files
  856. ".properties", # Java properties files
  857. ".sql", # SQL scripts
  858. ".bat", # Batch files
  859. ".sh", # Shell scripts
  860. ".c", # C source code
  861. ".h", # C header
  862. ".cpp", # C++ source code
  863. ".hpp", # C++ header
  864. ".py", # Python source code
  865. ".java", # Java source code
  866. ".js", # JavaScript source code
  867. ".ts", # TypeScript source code
  868. ".swift", # Swift source code
  869. ".go", # Go source code
  870. ".rb", # Ruby source code
  871. ".php", # PHP source code
  872. ".css", # Cascading Style Sheets
  873. ".scss", # Sassy CSS
  874. ".less", # LESS CSS
  875. ),
  876. ):
  877. # Store the base input directory and workspace
  878. self.base_input_dir = Path(input_dir)
  879. self.workspace = workspace
  880. self.supported_extensions = supported_extensions
  881. self.indexed_files = set()
  882. # Create workspace-specific input directory
  883. # If workspace is provided, create a subdirectory for data isolation
  884. if workspace:
  885. self.input_dir = self.base_input_dir / workspace
  886. else:
  887. self.input_dir = self.base_input_dir
  888. # Create input directory if it doesn't exist
  889. self.input_dir.mkdir(parents=True, exist_ok=True)
  890. def scan_directory_for_new_files(self) -> List[Path]:
  891. """Scan input directory for new files"""
  892. new_files = []
  893. for ext in self.supported_extensions:
  894. logger.debug(f"Scanning for {ext} files in {self.input_dir}")
  895. for file_path in self.input_dir.glob(f"*{ext}"):
  896. if file_path not in self.indexed_files:
  897. new_files.append(file_path)
  898. return new_files
  899. def mark_as_indexed(self, file_path: Path):
  900. self.indexed_files.add(file_path)
  901. def is_supported_file(self, filename: str) -> bool:
  902. return any(filename.lower().endswith(ext) for ext in self.supported_extensions)
  903. def validate_file_path_security(file_path_str: str, base_dir: Path) -> Optional[Path]:
  904. """
  905. Validate file path security to prevent Path Traversal attacks.
  906. Args:
  907. file_path_str: The file path string to validate
  908. base_dir: The base directory that the file must be within
  909. Returns:
  910. Path: Safe file path if valid, None if unsafe or invalid
  911. """
  912. if not file_path_str or not file_path_str.strip():
  913. return None
  914. try:
  915. # Clean the file path string
  916. clean_path_str = file_path_str.strip()
  917. # Check for obvious path traversal patterns before processing
  918. # This catches both Unix (..) and Windows (..\) style traversals
  919. if ".." in clean_path_str:
  920. # Additional check for Windows-style backslash traversal
  921. if (
  922. "\\..\\" in clean_path_str
  923. or clean_path_str.startswith("..\\")
  924. or clean_path_str.endswith("\\..")
  925. ):
  926. # logger.warning(
  927. # f"Security violation: Windows path traversal attempt detected - {file_path_str}"
  928. # )
  929. return None
  930. # Normalize path separators (convert backslashes to forward slashes)
  931. # This helps handle Windows-style paths on Unix systems
  932. normalized_path = clean_path_str.replace("\\", "/")
  933. # Create path object and resolve it (handles symlinks and relative paths)
  934. candidate_path = (base_dir / normalized_path).resolve()
  935. base_dir_resolved = base_dir.resolve()
  936. # Check if the resolved path is within the base directory
  937. if not candidate_path.is_relative_to(base_dir_resolved):
  938. # logger.warning(
  939. # f"Security violation: Path traversal attempt detected - {file_path_str}"
  940. # )
  941. return None
  942. return candidate_path
  943. except (OSError, ValueError, Exception) as e:
  944. logger.warning(f"Invalid file path detected: {file_path_str} - {str(e)}")
  945. return None
  946. def get_doc_status_value(doc_status: Any) -> str:
  947. """Read status from dict or DocProcessingStatus-like objects."""
  948. status = (
  949. doc_status.get("status")
  950. if isinstance(doc_status, dict)
  951. else getattr(doc_status, "status", None)
  952. )
  953. if isinstance(status, DocStatus):
  954. return status.value
  955. return str(status or "")
  956. def get_doc_track_id(doc_status: Any) -> str:
  957. """Read track_id from dict or DocProcessingStatus-like objects."""
  958. track_id = (
  959. doc_status.get("track_id")
  960. if isinstance(doc_status, dict)
  961. else getattr(doc_status, "track_id", None)
  962. )
  963. return str(track_id or "")
  964. async def get_existing_doc_by_file_path_candidates(
  965. doc_status: Any, file_path: Path | str
  966. ) -> dict[str, Any] | None:
  967. """Find an existing document by canonical basename."""
  968. basename = normalize_file_path(str(file_path))
  969. if basename == UNKNOWN_FILE_SOURCE:
  970. return None
  971. match = await doc_status.get_doc_by_file_basename(basename)
  972. if not match:
  973. return None
  974. _, existing_doc_data = match
  975. return existing_doc_data
  976. async def _reserve_enqueue_slot(rag: LightRAG) -> bool:
  977. """Atomically check exclusive-writer state and reserve a
  978. pending-enqueue slot.
  979. Concurrent enqueues are permitted while the processing loop is
  980. running — the loop is notified via ``request_pending`` and picks up
  981. newly-enqueued docs after its current batch. This includes the
  982. scan task's processing phase: once classification is done, the
  983. scan transitions to driving the processing pipeline like any
  984. other enqueuer, and uploads can land alongside it.
  985. Two states block new uploads/inserts:
  986. - ``scanning_exclusive``: scan task is in its CLASSIFICATION
  987. phase — reading doc_status to classify files (PROCESSED →
  988. archive, FAILED-without-full_docs → retry-as-new, etc.) and
  989. possibly deleting stale stubs. Concurrent enqueue would race
  990. against scan's reads / stub deletions. ``scanning`` alone
  991. (the processing phase) does NOT block uploads.
  992. - ``destructive_busy``: a /documents/clear or per-doc delete is in
  993. flight. These DROP storages and remove input files; an enqueue
  994. accepted in this window would write to a storage that is being
  995. torn down and silently lose the document after the client saw
  996. success.
  997. ``pending_enqueues`` is incremented so the scan endpoint can refuse
  998. while bg tasks are mid-enqueue. The counter does NOT gate
  999. ``apipeline_process_enqueue_documents`` — concurrent processing is
  1000. explicitly allowed and is what makes "upload while pipeline is
  1001. busy" possible.
  1002. A workspace whose ``pipeline_status`` has never been initialised
  1003. (mocked test rigs) is treated as idle; no slot is reserved.
  1004. Returns:
  1005. True when a slot was reserved (caller MUST pair with
  1006. ``_release_enqueue_slot``); False when pipeline_status is not
  1007. bootstrapped.
  1008. Raises:
  1009. HTTPException(409): when
  1010. ``pipeline_status['scanning_exclusive']`` or
  1011. ``pipeline_status['destructive_busy']`` is set.
  1012. """
  1013. from lightrag.exceptions import PipelineNotInitializedError
  1014. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  1015. try:
  1016. pipeline_status = await get_namespace_data(
  1017. "pipeline_status", workspace=rag.workspace
  1018. )
  1019. except PipelineNotInitializedError:
  1020. return False
  1021. pipeline_status_lock = get_namespace_lock(
  1022. "pipeline_status", workspace=rag.workspace
  1023. )
  1024. async with pipeline_status_lock:
  1025. if pipeline_status.get("scanning_exclusive"):
  1026. raise HTTPException(
  1027. status_code=409,
  1028. detail=(
  1029. "Document scan is classifying files. "
  1030. "Wait for the classification phase to finish before "
  1031. "submitting new work."
  1032. ),
  1033. )
  1034. if pipeline_status.get("destructive_busy"):
  1035. raise HTTPException(
  1036. status_code=409,
  1037. detail=(
  1038. "Pipeline is clearing or deleting documents. "
  1039. "Wait for the running job to finish before submitting "
  1040. "new work."
  1041. ),
  1042. )
  1043. pipeline_status["pending_enqueues"] = (
  1044. pipeline_status.get("pending_enqueues", 0) + 1
  1045. )
  1046. return True
  1047. async def check_pipeline_busy_or_raise(rag: LightRAG) -> None:
  1048. """Refuse the request with HTTP 409 when the document pipeline is busy.
  1049. Intended for short, fine-grained graph mutations (entity/relation
  1050. edit/create/delete/merge). Reads ``pipeline_status['busy']`` under
  1051. the namespace lock and raises immediately on contention -- it does
  1052. NOT set any flag, so it cannot block the pipeline itself.
  1053. ``busy`` is set by the processing loop and by destructive jobs
  1054. (``/documents/clear`` / per-doc delete). Both paths concurrently
  1055. write the same graph storages that these endpoints mutate, so a
  1056. 409 here mirrors the existing UI guard and tells clients to wait.
  1057. A narrow race remains between this check and the underlying graph
  1058. write: if the pipeline transitions to busy in that window, the
  1059. per-edge/-node locks inside the storage layer are the last line of
  1060. defense. That trade-off is deliberate -- holding ``busy`` here
  1061. would serialise every UI edit against document ingestion, which is
  1062. a worse user-visible failure mode than tolerating the race.
  1063. No-op (returns silently) when ``pipeline_status`` was never
  1064. bootstrapped, matching the behaviour of ``_acquire_destructive_busy``
  1065. so test rigs without a real shared-storage Manager keep working.
  1066. """
  1067. from lightrag.exceptions import PipelineNotInitializedError
  1068. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  1069. try:
  1070. pipeline_status = await get_namespace_data(
  1071. "pipeline_status", workspace=rag.workspace
  1072. )
  1073. except PipelineNotInitializedError:
  1074. return
  1075. pipeline_status_lock = get_namespace_lock(
  1076. "pipeline_status", workspace=rag.workspace
  1077. )
  1078. async with pipeline_status_lock:
  1079. if pipeline_status.get("busy"):
  1080. raise HTTPException(
  1081. status_code=409,
  1082. detail=(
  1083. "Pipeline is busy with another operation. "
  1084. "Wait for the running job to finish before editing "
  1085. "the knowledge graph."
  1086. ),
  1087. )
  1088. async def _acquire_destructive_busy(rag: LightRAG) -> tuple[bool, str | None]:
  1089. """Atomically reserve the destructive busy slot for ``/documents/clear``
  1090. or ``/documents/delete_document``.
  1091. Both jobs DROP storages and (for clear) remove input files. They
  1092. must serialise against:
  1093. - any other ``busy`` work (processing loop, another destructive job),
  1094. - an in-flight ``scanning`` task that reads/writes doc_status and
  1095. INPUT/, and
  1096. - any ``pending_enqueues`` reservation whose bg task has not yet
  1097. written to doc_status — accepting the destructive job in that
  1098. window would drop storages while the enqueue is mid-write,
  1099. losing a document the client already saw success for.
  1100. All three checks happen inside a single ``pipeline_status_lock``
  1101. critical section together with the flag write, so a concurrent
  1102. enqueue/scan reservation cannot squeeze past us.
  1103. Caller is responsible for clearing both flags in its finally block.
  1104. Returns:
  1105. (acquired, reason). ``acquired=True`` and ``reason=None`` on
  1106. success. ``acquired=False`` with a human-readable ``reason``
  1107. when another writer has the lock; the caller surfaces this to
  1108. the client (HTTP 200 with status="busy" for these endpoints).
  1109. For test rigs where ``pipeline_status`` was never bootstrapped,
  1110. returns (True, None) — there is nothing to coordinate against.
  1111. """
  1112. from lightrag.exceptions import PipelineNotInitializedError
  1113. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  1114. try:
  1115. pipeline_status = await get_namespace_data(
  1116. "pipeline_status", workspace=rag.workspace
  1117. )
  1118. except PipelineNotInitializedError:
  1119. return True, None
  1120. pipeline_status_lock = get_namespace_lock(
  1121. "pipeline_status", workspace=rag.workspace
  1122. )
  1123. async with pipeline_status_lock:
  1124. if pipeline_status.get("busy"):
  1125. return False, "Pipeline is busy with another operation."
  1126. if pipeline_status.get("scanning"):
  1127. return False, (
  1128. "Document scan is in progress. "
  1129. "Wait for the scan to complete before clearing or deleting."
  1130. )
  1131. if pipeline_status.get("pending_enqueues", 0) > 0:
  1132. return False, (
  1133. "Document upload/insert is being enqueued. "
  1134. "Wait for in-flight work to complete before clearing or "
  1135. "deleting."
  1136. )
  1137. pipeline_status["busy"] = True
  1138. pipeline_status["destructive_busy"] = True
  1139. return True, None
  1140. async def _release_destructive_busy(rag: LightRAG) -> None:
  1141. """Release the destructive busy slot acquired by
  1142. ``_acquire_destructive_busy``. Never raises.
  1143. Distinct from ``_release_enqueue_slot``: that helper clears
  1144. ``pending_enqueues`` (the upload/insert reservation), this one
  1145. clears ``busy + destructive_busy`` (the clear/delete reservation).
  1146. """
  1147. from lightrag.exceptions import PipelineNotInitializedError
  1148. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  1149. try:
  1150. pipeline_status = await get_namespace_data(
  1151. "pipeline_status", workspace=rag.workspace
  1152. )
  1153. except PipelineNotInitializedError:
  1154. return
  1155. pipeline_status_lock = get_namespace_lock(
  1156. "pipeline_status", workspace=rag.workspace
  1157. )
  1158. async with pipeline_status_lock:
  1159. pipeline_status["busy"] = False
  1160. pipeline_status["destructive_busy"] = False
  1161. async def _release_enqueue_slot(rag: LightRAG) -> None:
  1162. """Release a slot reserved by ``_reserve_enqueue_slot``.
  1163. Pure decrement; the bg task itself drives processing by calling
  1164. ``apipeline_process_enqueue_documents`` after enqueue (the call is
  1165. a cheap no-op when the loop is already busy — it just sets
  1166. ``request_pending``). Drain coordination across sibling bg tasks
  1167. is unnecessary in the new contract: each task triggers processing
  1168. independently and the loop's request_pending mechanism collapses
  1169. duplicate triggers safely.
  1170. Decrement is clamped at 0 so a stray release (e.g. from a workspace
  1171. whose reservation returned False but whose bg task wrapper still
  1172. calls release) is harmless. Never raises.
  1173. """
  1174. from lightrag.exceptions import PipelineNotInitializedError
  1175. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  1176. try:
  1177. pipeline_status = await get_namespace_data(
  1178. "pipeline_status", workspace=rag.workspace
  1179. )
  1180. except PipelineNotInitializedError:
  1181. return
  1182. pipeline_status_lock = get_namespace_lock(
  1183. "pipeline_status", workspace=rag.workspace
  1184. )
  1185. async with pipeline_status_lock:
  1186. current = pipeline_status.get("pending_enqueues", 0)
  1187. if current > 0:
  1188. pipeline_status["pending_enqueues"] = current - 1
  1189. def find_existing_file_by_file_path(input_dir: Path, file_path: str) -> Path | None:
  1190. """Find an input-dir file whose canonical basename matches ``file_path``.
  1191. Callers pass the stored canonical ``file_path`` (already hint-stripped);
  1192. on-disk filenames are normalized before comparison so a hint-bearing
  1193. variant on disk still matches a canonical stored ``file_path``.
  1194. """
  1195. if not file_path or file_path == UNKNOWN_FILE_SOURCE:
  1196. return None
  1197. try:
  1198. for candidate in input_dir.iterdir():
  1199. if not candidate.is_file():
  1200. continue
  1201. if normalize_file_path(candidate.name) == file_path:
  1202. return candidate
  1203. except FileNotFoundError:
  1204. return None
  1205. return None
  1206. def canonicalize_archived_file_variant_basename(
  1207. file_path: Path | str, *, strip_archive_suffix: bool = False
  1208. ) -> str:
  1209. """Canonical basename for original files and numbered archive variants."""
  1210. name = Path(file_path).name
  1211. path = Path(name)
  1212. stem = (
  1213. ARCHIVED_FILE_SUFFIX_RE.sub("", path.stem)
  1214. if strip_archive_suffix
  1215. else path.stem
  1216. )
  1217. return normalize_file_path(f"{stem}{path.suffix}")
  1218. def _file_path_for_parsed_artifact_dir(dir_name: str) -> str | None:
  1219. """Return the canonical source basename for a parser artifact dir.
  1220. Recognized layouts (suffix list in
  1221. :data:`lightrag.constants.PARSED_ARTIFACT_DIR_SUFFIXES`):
  1222. - ``<basename>.parsed[_NNN]/`` — sidecar output (every engine)
  1223. - ``<basename>.mineru_raw[_NNN]/`` — MinerU preserved raw bundle
  1224. - ``<basename>.docling_raw[_NNN]/`` — Docling preserved raw bundle
  1225. Raw bundles are preserved across re-parses for cache reuse and on-demand
  1226. diagnostics; they are cleaned only when the user deletes the document
  1227. with ``delete_file=True`` so the raw artifacts and source file go away
  1228. together.
  1229. """
  1230. stripped = ARCHIVED_FILE_SUFFIX_RE.sub("", dir_name)
  1231. for suffix in PARSED_ARTIFACT_DIR_SUFFIXES:
  1232. if stripped.endswith(suffix):
  1233. basename = stripped[: -len(suffix)]
  1234. if basename:
  1235. return normalize_file_path(basename)
  1236. return None
  1237. def delete_file_variants_by_file_path(
  1238. input_dir: Path,
  1239. file_path: str | None,
  1240. ) -> tuple[list[str], list[str]]:
  1241. """Delete input/__parsed__ source files matching a canonical ``file_path``."""
  1242. if not file_path:
  1243. return [], []
  1244. canonical = normalize_file_path(file_path)
  1245. if canonical == UNKNOWN_FILE_SOURCE:
  1246. return [], []
  1247. canonical_names = {canonical}
  1248. deleted_files: list[str] = []
  1249. errors: list[str] = []
  1250. candidate_dirs = [input_dir, input_dir / PARSED_DIR_NAME]
  1251. input_dir_resolved = input_dir.resolve()
  1252. for candidate_dir in candidate_dirs:
  1253. try:
  1254. candidates = list(candidate_dir.iterdir())
  1255. except FileNotFoundError:
  1256. continue
  1257. except Exception as e:
  1258. errors.append(f"Failed to scan {candidate_dir}: {e}")
  1259. continue
  1260. in_parsed_dir = candidate_dir.name == PARSED_DIR_NAME
  1261. for candidate in candidates:
  1262. if candidate.is_file():
  1263. if (
  1264. canonicalize_archived_file_variant_basename(
  1265. candidate.name,
  1266. strip_archive_suffix=in_parsed_dir,
  1267. )
  1268. not in canonical_names
  1269. ):
  1270. continue
  1271. safe_candidate = validate_file_path_security(
  1272. candidate.name, candidate_dir
  1273. )
  1274. if safe_candidate is None:
  1275. errors.append(f"Unsafe file path skipped: {candidate.name}")
  1276. continue
  1277. try:
  1278. safe_candidate.unlink()
  1279. deleted_files.append(
  1280. str(safe_candidate.relative_to(input_dir_resolved))
  1281. )
  1282. except Exception as e:
  1283. errors.append(f"Failed to delete {candidate.name}: {e}")
  1284. continue
  1285. if in_parsed_dir and candidate.is_dir():
  1286. canonical_for_dir = _file_path_for_parsed_artifact_dir(candidate.name)
  1287. if (
  1288. canonical_for_dir is None
  1289. or canonical_for_dir not in canonical_names
  1290. ):
  1291. continue
  1292. safe_candidate = validate_file_path_security(
  1293. candidate.name, candidate_dir
  1294. )
  1295. if safe_candidate is None:
  1296. errors.append(f"Unsafe artifact dir skipped: {candidate.name}")
  1297. continue
  1298. try:
  1299. shutil.rmtree(safe_candidate)
  1300. deleted_files.append(
  1301. str(safe_candidate.relative_to(input_dir_resolved))
  1302. )
  1303. except Exception as e:
  1304. errors.append(
  1305. f"Failed to delete artifact dir {candidate.name}: {e}"
  1306. )
  1307. return deleted_files, errors
  1308. async def record_scan_warning(rag: LightRAG, message: str) -> None:
  1309. logger.warning(message)
  1310. try:
  1311. from lightrag.kg import shared_storage
  1312. if not getattr(shared_storage, "_initialized", False):
  1313. return
  1314. workspace = getattr(rag, "workspace", "")
  1315. pipeline_status = await shared_storage.get_namespace_data(
  1316. "pipeline_status", workspace=workspace
  1317. )
  1318. pipeline_status_lock = shared_storage.get_namespace_lock(
  1319. "pipeline_status", workspace=workspace
  1320. )
  1321. async with pipeline_status_lock:
  1322. pipeline_status["latest_message"] = message
  1323. pipeline_status["history_messages"].append(message)
  1324. except Exception:
  1325. pass
  1326. # Document processing helper functions (synchronous)
  1327. # These functions run in thread pool via asyncio.to_thread() to avoid blocking the event loop
  1328. def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str:
  1329. """Extract PDF content using pypdf (synchronous).
  1330. Args:
  1331. file_bytes: PDF file content as bytes
  1332. password: Optional password for encrypted PDFs
  1333. Returns:
  1334. str: Extracted text content
  1335. Raises:
  1336. Exception: If PDF is encrypted and password is incorrect or missing
  1337. """
  1338. from pypdf import PdfReader # type: ignore
  1339. pdf_file = BytesIO(file_bytes)
  1340. reader = PdfReader(pdf_file)
  1341. # Check if PDF is encrypted
  1342. if reader.is_encrypted:
  1343. # Try empty password first (covers permission-only encrypted PDFs)
  1344. decrypt_result = reader.decrypt(password or "")
  1345. if decrypt_result == 0:
  1346. if password:
  1347. raise Exception("Incorrect PDF password")
  1348. else:
  1349. raise Exception("PDF is encrypted but no password provided")
  1350. # Extract text from all pages
  1351. content = ""
  1352. for page in reader.pages:
  1353. content += page.extract_text() + "\n"
  1354. return content
  1355. def _extract_docx(file_bytes: bytes) -> str:
  1356. """Extract DOCX content including tables in document order (synchronous).
  1357. Args:
  1358. file_bytes: DOCX file content as bytes
  1359. Returns:
  1360. str: Extracted text content with tables in their original positions.
  1361. Tables are separated from paragraphs with blank lines for clarity.
  1362. """
  1363. from docx import Document # type: ignore
  1364. from docx.table import Table # type: ignore
  1365. from docx.text.paragraph import Paragraph # type: ignore
  1366. docx_file = BytesIO(file_bytes)
  1367. doc = Document(docx_file)
  1368. def escape_cell(cell_value: str | None) -> str:
  1369. """Escape characters that would break tab-delimited layout.
  1370. Escape order is critical: backslashes first, then tabs/newlines.
  1371. This prevents double-escaping issues.
  1372. Args:
  1373. cell_value: The cell value to escape (can be None or str)
  1374. Returns:
  1375. str: Escaped cell value safe for tab-delimited format
  1376. """
  1377. if cell_value is None:
  1378. return ""
  1379. text = str(cell_value)
  1380. # CRITICAL: Escape backslash first to avoid double-escaping
  1381. return (
  1382. text.replace("\\", "\\\\") # Must be first: \ -> \\
  1383. .replace("\t", "&emsp;&emsp;") # Tab -> \t (visible)
  1384. .replace("\r\n", "<br>") # Windows newline -> \n
  1385. .replace("\r", "<br>") # Mac newline -> \n
  1386. .replace("\n", "<br>") # Unix newline -> \n
  1387. )
  1388. content_parts = []
  1389. in_table = False # Track if we're currently processing a table
  1390. # Iterate through all body elements in document order
  1391. for element in doc.element.body:
  1392. # Check if element is a paragraph
  1393. if element.tag.endswith("p"):
  1394. # If coming out of a table, add blank line after table
  1395. if in_table:
  1396. content_parts.append("") # Blank line after table
  1397. in_table = False
  1398. paragraph = Paragraph(element, doc)
  1399. text = paragraph.text
  1400. # Always append to preserve document spacing (including blank paragraphs)
  1401. content_parts.append(text)
  1402. # Check if element is a table
  1403. elif element.tag.endswith("tbl"):
  1404. # Add blank line before table (if content exists)
  1405. if content_parts and not in_table:
  1406. content_parts.append("") # Blank line before table
  1407. in_table = True
  1408. table = Table(element, doc)
  1409. for row in table.rows:
  1410. row_text = []
  1411. for cell in row.cells:
  1412. cell_text = cell.text
  1413. # Escape special characters to preserve tab-delimited structure
  1414. row_text.append(escape_cell(cell_text))
  1415. # Only add row if at least one cell has content
  1416. if any(cell for cell in row_text):
  1417. content_parts.append("\t".join(row_text))
  1418. return "\n".join(content_parts)
  1419. def _extract_pptx(file_bytes: bytes) -> str:
  1420. """Extract PPTX content (synchronous).
  1421. Args:
  1422. file_bytes: PPTX file content as bytes
  1423. Returns:
  1424. str: Extracted text content
  1425. """
  1426. from pptx import Presentation # type: ignore
  1427. pptx_file = BytesIO(file_bytes)
  1428. prs = Presentation(pptx_file)
  1429. content = ""
  1430. for slide in prs.slides:
  1431. for shape in slide.shapes:
  1432. if hasattr(shape, "text"):
  1433. content += shape.text + "\n"
  1434. return content
  1435. def _extract_xlsx(file_bytes: bytes) -> str:
  1436. """Extract XLSX content in tab-delimited format with clear sheet separation.
  1437. This function processes Excel workbooks and converts them to a structured text format
  1438. suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with
  1439. separator lines, and special characters are escaped to preserve the tab-delimited structure.
  1440. Features:
  1441. - Each sheet is wrapped with '====================' separators for visual distinction
  1442. - Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption
  1443. - Column alignment is preserved across all rows to maintain tabular structure
  1444. - Empty rows are preserved as blank lines to maintain row structure
  1445. - Uses sheet.max_column to determine column width efficiently
  1446. Args:
  1447. file_bytes: XLSX file content as bytes
  1448. Returns:
  1449. str: Extracted text content with all sheets in tab-delimited format.
  1450. Format: Sheet separators, sheet name, then tab-delimited rows.
  1451. Example output:
  1452. ==================== Sheet: Data ====================
  1453. Name\tAge\tCity
  1454. Alice\t30\tNew York
  1455. Bob\t25\tLondon
  1456. ==================== Sheet: Summary ====================
  1457. Total\t2
  1458. ====================
  1459. """
  1460. from openpyxl import load_workbook # type: ignore
  1461. xlsx_file = BytesIO(file_bytes)
  1462. wb = load_workbook(xlsx_file)
  1463. def escape_cell(cell_value: str | int | float | None) -> str:
  1464. """Escape characters that would break tab-delimited layout.
  1465. Escape order is critical: backslashes first, then tabs/newlines.
  1466. This prevents double-escaping issues.
  1467. Args:
  1468. cell_value: The cell value to escape (can be None, str, int, or float)
  1469. Returns:
  1470. str: Escaped cell value safe for tab-delimited format
  1471. """
  1472. if cell_value is None:
  1473. return ""
  1474. text = str(cell_value)
  1475. # CRITICAL: Escape backslash first to avoid double-escaping
  1476. return (
  1477. text.replace("\\", "\\\\") # Must be first: \ -> \\
  1478. .replace("\t", "\\t") # Tab -> \t (visible)
  1479. .replace("\r\n", "\\n") # Windows newline -> \n
  1480. .replace("\r", "\\n") # Mac newline -> \n
  1481. .replace("\n", "\\n") # Unix newline -> \n
  1482. )
  1483. def escape_sheet_title(title: str) -> str:
  1484. """Escape sheet title to prevent formatting issues in separators.
  1485. Args:
  1486. title: Original sheet title
  1487. Returns:
  1488. str: Sanitized sheet title with tabs/newlines replaced
  1489. """
  1490. return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ")
  1491. content_parts: list[str] = []
  1492. sheet_separator = "=" * 20
  1493. for idx, sheet in enumerate(wb):
  1494. if idx > 0:
  1495. content_parts.append("") # Blank line between sheets for readability
  1496. # Escape sheet title to handle edge cases with special characters
  1497. safe_title = escape_sheet_title(sheet.title)
  1498. content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}")
  1499. # Use sheet.max_column to get the maximum column width directly
  1500. max_columns = sheet.max_column if sheet.max_column else 0
  1501. # Extract rows with consistent width to preserve column alignment
  1502. for row in sheet.iter_rows(values_only=True):
  1503. row_parts = []
  1504. # Build row up to max_columns width
  1505. for idx in range(max_columns):
  1506. if idx < len(row):
  1507. row_parts.append(escape_cell(row[idx]))
  1508. else:
  1509. row_parts.append("") # Pad short rows
  1510. # Check if row is completely empty
  1511. if all(part == "" for part in row_parts):
  1512. # Preserve empty rows as blank lines (maintains row structure)
  1513. content_parts.append("")
  1514. else:
  1515. # Join all columns to maintain consistent column count
  1516. content_parts.append("\t".join(row_parts))
  1517. # Final separator for symmetry (makes parsing easier)
  1518. content_parts.append(sheet_separator)
  1519. return "\n".join(content_parts)
  1520. async def pipeline_enqueue_file(
  1521. rag: LightRAG,
  1522. file_path: Path,
  1523. track_id: str = None,
  1524. from_scan: bool = False,
  1525. ) -> tuple[bool, str]:
  1526. """Add a file to the queue for processing
  1527. Args:
  1528. rag: LightRAG instance
  1529. file_path: Path to the saved file
  1530. track_id: Optional tracking ID, if not provided will be generated
  1531. from_scan: True only when invoked by the scan-owned background task,
  1532. which already holds ``pipeline_status["scanning"]``. Forwarded to
  1533. ``apipeline_enqueue_documents`` so the scan can enqueue the files
  1534. it just discovered without tripping the scanning guard there.
  1535. Returns:
  1536. tuple: (success: bool, track_id: str)
  1537. """
  1538. # Generate track_id if not provided
  1539. if track_id is None:
  1540. track_id = generate_track_id("unknown")
  1541. try:
  1542. content = ""
  1543. ext = file_path.suffix.lower()
  1544. file_size = 0
  1545. # Get file size for error reporting
  1546. try:
  1547. stat = await asyncio.to_thread(file_path.stat)
  1548. file_size = stat.st_size
  1549. except Exception:
  1550. file_size = 0
  1551. try:
  1552. extraction_engine, process_options = resolve_file_parser_directives(
  1553. file_path
  1554. )
  1555. except FilenameParserHintError as e:
  1556. error_files = [
  1557. {
  1558. "file_path": str(file_path.name),
  1559. "error_description": "[File Extraction]Filename hint error",
  1560. "original_error": str(e),
  1561. "file_size": file_size,
  1562. }
  1563. ]
  1564. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1565. logger.error(
  1566. f"[File Extraction]Invalid filename hint in {file_path.name}: {e}"
  1567. )
  1568. return False, track_id
  1569. api_process_options = process_options or PROCESS_OPTION_CHUNK_FIXED
  1570. if extraction_engine != PARSER_ENGINE_LEGACY:
  1571. try:
  1572. enqueue_kwargs = {
  1573. "file_paths": str(file_path),
  1574. "track_id": track_id,
  1575. "docs_format": FULL_DOCS_FORMAT_PENDING_PARSE,
  1576. "parse_engine": extraction_engine,
  1577. "process_options": api_process_options,
  1578. "from_scan": from_scan,
  1579. }
  1580. enqueue_result = await rag.apipeline_enqueue_documents(
  1581. "", **enqueue_kwargs
  1582. )
  1583. if enqueue_result is None:
  1584. try:
  1585. await move_file_to_parsed_dir(file_path)
  1586. except Exception as move_error:
  1587. logger.error(
  1588. f"Failed to move duplicate file {file_path.name} to {PARSED_DIR_NAME} directory: {move_error}"
  1589. )
  1590. return False, track_id
  1591. logger.info(
  1592. f"[File Extraction]Deferred {file_path.name} to {extraction_engine} parser"
  1593. )
  1594. return True, track_id
  1595. except Exception as e:
  1596. error_files = [
  1597. {
  1598. "file_path": str(file_path.name),
  1599. "error_description": "[File Extraction]Parser enqueue error",
  1600. "original_error": f"Failed to enqueue file for parser: {str(e)}",
  1601. "file_size": file_size,
  1602. }
  1603. ]
  1604. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1605. logger.error(
  1606. f"[File Extraction]Error enqueuing {file_path.name} for {extraction_engine}: {str(e)}"
  1607. )
  1608. return False, track_id
  1609. file = None
  1610. try:
  1611. async with aiofiles.open(file_path, "rb") as f:
  1612. file = await f.read()
  1613. except PermissionError as e:
  1614. error_files = [
  1615. {
  1616. "file_path": str(file_path.name),
  1617. "error_description": "[File Extraction]Permission denied - cannot read file",
  1618. "original_error": str(e),
  1619. "file_size": file_size,
  1620. }
  1621. ]
  1622. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1623. logger.error(
  1624. f"[File Extraction]Permission denied reading file: {file_path.name}"
  1625. )
  1626. return False, track_id
  1627. except FileNotFoundError as e:
  1628. error_files = [
  1629. {
  1630. "file_path": str(file_path.name),
  1631. "error_description": "[File Extraction]File not found",
  1632. "original_error": str(e),
  1633. "file_size": file_size,
  1634. }
  1635. ]
  1636. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1637. logger.error(f"[File Extraction]File not found: {file_path.name}")
  1638. return False, track_id
  1639. except Exception as e:
  1640. error_files = [
  1641. {
  1642. "file_path": str(file_path.name),
  1643. "error_description": "[File Extraction]File reading error",
  1644. "original_error": str(e),
  1645. "file_size": file_size,
  1646. }
  1647. ]
  1648. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1649. logger.error(
  1650. f"[File Extraction]Error reading file {file_path.name}: {str(e)}"
  1651. )
  1652. return False, track_id
  1653. # Process based on file type
  1654. try:
  1655. match ext:
  1656. case (
  1657. ".txt"
  1658. | ".md"
  1659. | ".mdx"
  1660. | ".html"
  1661. | ".htm"
  1662. | ".tex"
  1663. | ".json"
  1664. | ".xml"
  1665. | ".yaml"
  1666. | ".yml"
  1667. | ".rtf"
  1668. | ".odt"
  1669. | ".epub"
  1670. | ".csv"
  1671. | ".log"
  1672. | ".conf"
  1673. | ".ini"
  1674. | ".properties"
  1675. | ".sql"
  1676. | ".bat"
  1677. | ".sh"
  1678. | ".c"
  1679. | ".h"
  1680. | ".cpp"
  1681. | ".hpp"
  1682. | ".py"
  1683. | ".java"
  1684. | ".js"
  1685. | ".ts"
  1686. | ".swift"
  1687. | ".go"
  1688. | ".rb"
  1689. | ".php"
  1690. | ".css"
  1691. | ".scss"
  1692. | ".less"
  1693. ):
  1694. try:
  1695. # Try to decode as UTF-8 (offloaded to thread to avoid blocking the event loop)
  1696. content = await asyncio.to_thread(file.decode, "utf-8")
  1697. # Validate content
  1698. if not content or len(content.strip()) == 0:
  1699. error_files = [
  1700. {
  1701. "file_path": str(file_path.name),
  1702. "error_description": "[File Extraction]Empty file content",
  1703. "original_error": "File contains no content or only whitespace",
  1704. "file_size": file_size,
  1705. }
  1706. ]
  1707. await rag.apipeline_enqueue_error_documents(
  1708. error_files, track_id
  1709. )
  1710. logger.error(
  1711. f"[File Extraction]Empty content in file: {file_path.name}"
  1712. )
  1713. return False, track_id
  1714. # Check if content looks like binary data string representation
  1715. if content.startswith("b'") or content.startswith('b"'):
  1716. error_files = [
  1717. {
  1718. "file_path": str(file_path.name),
  1719. "error_description": "[File Extraction]Binary data in text file",
  1720. "original_error": "File appears to contain binary data representation instead of text",
  1721. "file_size": file_size,
  1722. }
  1723. ]
  1724. await rag.apipeline_enqueue_error_documents(
  1725. error_files, track_id
  1726. )
  1727. logger.error(
  1728. f"[File Extraction]File {file_path.name} appears to contain binary data representation instead of text"
  1729. )
  1730. return False, track_id
  1731. except UnicodeDecodeError as e:
  1732. error_files = [
  1733. {
  1734. "file_path": str(file_path.name),
  1735. "error_description": "[File Extraction]UTF-8 encoding error, please convert it to UTF-8 before processing",
  1736. "original_error": f"File is not valid UTF-8 encoded text: {str(e)}",
  1737. "file_size": file_size,
  1738. }
  1739. ]
  1740. await rag.apipeline_enqueue_error_documents(
  1741. error_files, track_id
  1742. )
  1743. logger.error(
  1744. f"[File Extraction]File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing."
  1745. )
  1746. return False, track_id
  1747. case ".pdf":
  1748. try:
  1749. content = await asyncio.to_thread(
  1750. _extract_pdf_pypdf,
  1751. file,
  1752. global_args.pdf_decrypt_password,
  1753. )
  1754. except Exception as e:
  1755. error_files = [
  1756. {
  1757. "file_path": str(file_path.name),
  1758. "error_description": "[File Extraction]PDF processing error",
  1759. "original_error": f"Failed to extract text from PDF: {str(e)}",
  1760. "file_size": file_size,
  1761. }
  1762. ]
  1763. await rag.apipeline_enqueue_error_documents(
  1764. error_files, track_id
  1765. )
  1766. logger.error(
  1767. f"[File Extraction]Error processing PDF {file_path.name}: {str(e)}"
  1768. )
  1769. return False, track_id
  1770. case ".docx":
  1771. try:
  1772. content = await asyncio.to_thread(_extract_docx, file)
  1773. except Exception as e:
  1774. error_files = [
  1775. {
  1776. "file_path": str(file_path.name),
  1777. "error_description": "[File Extraction]DOCX processing error",
  1778. "original_error": f"Failed to extract text from DOCX: {str(e)}",
  1779. "file_size": file_size,
  1780. }
  1781. ]
  1782. await rag.apipeline_enqueue_error_documents(
  1783. error_files, track_id
  1784. )
  1785. logger.error(
  1786. f"[File Extraction]Error processing DOCX {file_path.name}: {str(e)}"
  1787. )
  1788. return False, track_id
  1789. case ".pptx":
  1790. try:
  1791. content = await asyncio.to_thread(_extract_pptx, file)
  1792. except Exception as e:
  1793. error_files = [
  1794. {
  1795. "file_path": str(file_path.name),
  1796. "error_description": "[File Extraction]PPTX processing error",
  1797. "original_error": f"Failed to extract text from PPTX: {str(e)}",
  1798. "file_size": file_size,
  1799. }
  1800. ]
  1801. await rag.apipeline_enqueue_error_documents(
  1802. error_files, track_id
  1803. )
  1804. logger.error(
  1805. f"[File Extraction]Error processing PPTX {file_path.name}: {str(e)}"
  1806. )
  1807. return False, track_id
  1808. case ".xlsx":
  1809. try:
  1810. content = await asyncio.to_thread(_extract_xlsx, file)
  1811. except Exception as e:
  1812. error_files = [
  1813. {
  1814. "file_path": str(file_path.name),
  1815. "error_description": "[File Extraction]XLSX processing error",
  1816. "original_error": f"Failed to extract text from XLSX: {str(e)}",
  1817. "file_size": file_size,
  1818. }
  1819. ]
  1820. await rag.apipeline_enqueue_error_documents(
  1821. error_files, track_id
  1822. )
  1823. logger.error(
  1824. f"[File Extraction]Error processing XLSX {file_path.name}: {str(e)}"
  1825. )
  1826. return False, track_id
  1827. case _:
  1828. error_files = [
  1829. {
  1830. "file_path": str(file_path.name),
  1831. "error_description": f"[File Extraction]Unsupported file type: {ext}",
  1832. "original_error": f"File extension {ext} is not supported",
  1833. "file_size": file_size,
  1834. }
  1835. ]
  1836. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1837. logger.error(
  1838. f"[File Extraction]Unsupported file type: {file_path.name} (extension {ext})"
  1839. )
  1840. return False, track_id
  1841. except Exception as e:
  1842. error_files = [
  1843. {
  1844. "file_path": str(file_path.name),
  1845. "error_description": "[File Extraction]File format processing error",
  1846. "original_error": f"Unexpected error during file extracting: {str(e)}",
  1847. "file_size": file_size,
  1848. }
  1849. ]
  1850. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1851. logger.error(
  1852. f"[File Extraction]Unexpected error during {file_path.name} extracting: {str(e)}"
  1853. )
  1854. return False, track_id
  1855. # Insert into the RAG queue
  1856. if content:
  1857. # Check if content contains only whitespace characters
  1858. if not content.strip():
  1859. error_files = [
  1860. {
  1861. "file_path": str(file_path.name),
  1862. "error_description": "[File Extraction]File contains only whitespace",
  1863. "original_error": "File content contains only whitespace characters",
  1864. "file_size": file_size,
  1865. }
  1866. ]
  1867. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1868. logger.warning(
  1869. f"[File Extraction]File contains only whitespace characters: {file_path.name}"
  1870. )
  1871. return False, track_id
  1872. try:
  1873. enqueue_kwargs = {
  1874. "file_paths": file_path.name,
  1875. "track_id": track_id,
  1876. "parse_engine": PARSER_ENGINE_LEGACY,
  1877. "process_options": api_process_options,
  1878. "from_scan": from_scan,
  1879. }
  1880. enqueue_result = await rag.apipeline_enqueue_documents(
  1881. content, **enqueue_kwargs
  1882. )
  1883. if enqueue_result is None:
  1884. try:
  1885. await move_file_to_parsed_dir(file_path)
  1886. except Exception as move_error:
  1887. logger.error(
  1888. f"Failed to move duplicate file {file_path.name} to {PARSED_DIR_NAME} directory: {move_error}"
  1889. )
  1890. return False, track_id
  1891. logger.info(
  1892. f"Successfully extracted and enqueued file: {file_path.name}"
  1893. )
  1894. # Move file to __parsed__ directory after enqueuing (LR2-PRD: parsed output dir)
  1895. try:
  1896. await move_file_to_parsed_dir(file_path)
  1897. except Exception as move_error:
  1898. logger.error(
  1899. f"Failed to move file {file_path.name} to {PARSED_DIR_NAME} directory: {move_error}"
  1900. )
  1901. # Don't affect the main function's success status
  1902. return True, track_id
  1903. except Exception as e:
  1904. error_files = [
  1905. {
  1906. "file_path": str(file_path.name),
  1907. "error_description": "Document enqueue error",
  1908. "original_error": f"Failed to enqueue document: {str(e)}",
  1909. "file_size": file_size,
  1910. }
  1911. ]
  1912. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1913. logger.error(f"Error enqueueing document {file_path.name}: {str(e)}")
  1914. return False, track_id
  1915. else:
  1916. error_files = [
  1917. {
  1918. "file_path": str(file_path.name),
  1919. "error_description": "No content extracted",
  1920. "original_error": "No content could be extracted from file",
  1921. "file_size": file_size,
  1922. }
  1923. ]
  1924. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1925. logger.error(f"No content extracted from file: {file_path.name}")
  1926. return False, track_id
  1927. except Exception as e:
  1928. # Catch-all for any unexpected errors
  1929. try:
  1930. file_size = file_path.stat().st_size if file_path.exists() else 0
  1931. except Exception:
  1932. file_size = 0
  1933. error_files = [
  1934. {
  1935. "file_path": str(file_path.name),
  1936. "error_description": "Unexpected processing error",
  1937. "original_error": f"Unexpected error: {str(e)}",
  1938. "file_size": file_size,
  1939. }
  1940. ]
  1941. await rag.apipeline_enqueue_error_documents(error_files, track_id)
  1942. logger.error(f"Enqueuing file {file_path.name} error: {str(e)}")
  1943. logger.error(traceback.format_exc())
  1944. return False, track_id
  1945. finally:
  1946. if file_path.name.startswith(temp_prefix):
  1947. try:
  1948. file_path.unlink()
  1949. except Exception as e:
  1950. logger.error(f"Error deleting file {file_path}: {str(e)}")
  1951. async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):
  1952. """Index a file with track_id
  1953. Args:
  1954. rag: LightRAG instance
  1955. file_path: Path to the saved file
  1956. track_id: Optional tracking ID
  1957. """
  1958. try:
  1959. success, _ = await pipeline_enqueue_file(rag, file_path, track_id)
  1960. if success:
  1961. await rag.apipeline_process_enqueue_documents()
  1962. except Exception as e:
  1963. logger.error(f"Error indexing file {file_path.name}: {str(e)}")
  1964. logger.error(traceback.format_exc())
  1965. async def pipeline_index_files(
  1966. rag: LightRAG,
  1967. file_paths: List[Path],
  1968. track_id: str = None,
  1969. from_scan: bool = False,
  1970. ):
  1971. """Index multiple files sequentially to avoid high CPU load
  1972. Args:
  1973. rag: LightRAG instance
  1974. file_paths: Paths to the files to index
  1975. track_id: Optional tracking ID to pass to all files
  1976. from_scan: True only when invoked by the scan-owned background task.
  1977. Forwarded to ``pipeline_enqueue_file`` so the per-file enqueue
  1978. calls bypass the scanning guard inside
  1979. ``apipeline_enqueue_documents`` (whose ``scanning`` flag the
  1980. scan task itself owns).
  1981. """
  1982. if not file_paths:
  1983. return
  1984. try:
  1985. enqueued = False
  1986. # Use get_pinyin_sort_key for Chinese pinyin sorting
  1987. sorted_file_paths = sorted(
  1988. file_paths, key=lambda p: get_pinyin_sort_key(str(p))
  1989. )
  1990. # Process files sequentially with track_id
  1991. for file_path in sorted_file_paths:
  1992. success, _ = await pipeline_enqueue_file(
  1993. rag,
  1994. file_path,
  1995. track_id,
  1996. from_scan=from_scan,
  1997. )
  1998. if success:
  1999. enqueued = True
  2000. # Process the queue only if at least one file was successfully enqueued
  2001. if enqueued:
  2002. await rag.apipeline_process_enqueue_documents()
  2003. except Exception as e:
  2004. logger.error(f"Error indexing files: {str(e)}")
  2005. logger.error(traceback.format_exc())
  2006. _STRATEGY_TO_PROCESS_OPTION: Dict[str, str] = {
  2007. "fixed_token": PROCESS_OPTION_CHUNK_FIXED,
  2008. "recursive_character": PROCESS_OPTION_CHUNK_RECURSIVE,
  2009. "semantic_vector": PROCESS_OPTION_CHUNK_VECTOR,
  2010. "paragraph_semantic": PROCESS_OPTION_CHUNK_PARAGRAH,
  2011. }
  2012. def _resolve_text_chunking(
  2013. chunking: Optional[TextChunkingConfig], rag: LightRAG
  2014. ) -> tuple[str, dict]:
  2015. """Freeze a ``chunking`` request into ``(process_options, chunk_options)``.
  2016. When ``chunking`` is ``None`` this reproduces today's behavior exactly:
  2017. fixed-token strategy with the snapshot built from
  2018. ``rag.addon_params['chunker']``.
  2019. Otherwise the validated, strategy-specific params are merged into the
  2020. selected strategy's sub-dict. ``chunk_token_size`` rides along inside
  2021. ``params`` like any other key — every strategy (F included, after the
  2022. ``process_single_document`` cleanup) reads its size from its own
  2023. sub-dict, with the top-level snapshot value as the shared fallback.
  2024. Raises:
  2025. ValueError: when the request lowers ``chunk_token_size`` below the
  2026. *effective* ``chunk_overlap_token_size``. The overlap is often
  2027. inherited from ``addon_params``/env (the overlay fills
  2028. ``fixed_token``/``recursive_character``/``paragraph_semantic``
  2029. overlap with ``CHUNK_*_OVERLAP_SIZE`` / ``CHUNK_OVERLAP_SIZE``),
  2030. so this can only be checked here against the resolved snapshot,
  2031. not in the request model. Callers on the request path invoke
  2032. this synchronously so the failure surfaces as HTTP 422 before any
  2033. background work is scheduled.
  2034. """
  2035. if chunking is None:
  2036. # No request-driven config: reproduce today's behavior verbatim,
  2037. # including not introducing new validation on the default path.
  2038. process_options = PROCESS_OPTION_CHUNK_FIXED
  2039. return process_options, resolve_chunk_options(
  2040. rag.addon_params, process_options=process_options
  2041. )
  2042. process_options = _STRATEGY_TO_PROCESS_OPTION[chunking.strategy]
  2043. chunk_options = resolve_chunk_options(
  2044. rag.addon_params, process_options=process_options
  2045. )
  2046. strategy_key = chunk_strategy_key(process_options)
  2047. chunk_options[strategy_key].update(chunking.params)
  2048. _validate_effective_chunk_overlap(chunk_options, strategy_key, chunking.strategy)
  2049. _validate_effective_semantic_amount(chunk_options, strategy_key)
  2050. return process_options, chunk_options
  2051. def _validate_effective_chunk_overlap(
  2052. chunk_options: dict, strategy_key: str, strategy_name: str
  2053. ) -> None:
  2054. """Reject a resolved snapshot whose overlap is >= its chunk size.
  2055. Operates on the fully-resolved ``chunk_options`` so it catches the case
  2056. the request model cannot: ``chunk_token_size`` supplied in the request
  2057. while ``chunk_overlap_token_size`` is inherited from addon_params/env
  2058. (e.g. ``chunk_token_size=50`` with the default overlap ``100``). The
  2059. effective size is the strategy sub-dict value, falling back to the
  2060. top-level snapshot size; the effective overlap is the sub-dict value
  2061. (``semantic_vector`` carries none, so it is skipped).
  2062. """
  2063. sub = chunk_options.get(strategy_key) or {}
  2064. # Fixed-token delimiter-only mode (split_by_character set AND
  2065. # split_by_character_only=True) never applies overlap:
  2066. # chunking_by_token_size only validates each delimiter segment against
  2067. # chunk_token_size and raises on an oversize segment — the overlap field
  2068. # is unused. Enforcing overlap < size there would wrongly 422 a valid
  2069. # request such as paragraph splitting with a small chunk_token_size.
  2070. # (split_by_character_only is itself a no-op when split_by_character is
  2071. # falsy, so both must be effective for overlap to be skipped.)
  2072. if (
  2073. strategy_key == "fixed_token"
  2074. and sub.get("split_by_character")
  2075. and sub.get("split_by_character_only")
  2076. ):
  2077. return
  2078. overlap = sub.get("chunk_overlap_token_size")
  2079. if overlap is None:
  2080. return
  2081. size = sub.get("chunk_token_size")
  2082. if size is None:
  2083. size = chunk_options.get("chunk_token_size")
  2084. if size is not None and overlap >= size:
  2085. raise ValueError(
  2086. f"chunking for strategy '{strategy_name}': effective "
  2087. f"chunk_overlap_token_size ({overlap}) must be < chunk_token_size "
  2088. f"({size}). The overlap is inherited from addon_params/env when "
  2089. f"not set in the request; raise chunk_token_size or lower "
  2090. f"chunk_overlap_token_size."
  2091. )
  2092. def _validate_effective_semantic_amount(chunk_options: dict, strategy_key: str) -> None:
  2093. """Reject a resolved semantic_vector snapshot whose breakpoint amount
  2094. exceeds the percentile/gradient ceiling.
  2095. Uses the *effective* ``breakpoint_threshold_type`` from the merged
  2096. snapshot — the request model cannot, because the type may be inherited
  2097. from ``addon_params``/``CHUNK_V_BREAKPOINT_THRESHOLD_TYPE`` while the
  2098. request overrides only ``breakpoint_threshold_amount``. ``percentile`` /
  2099. ``gradient`` feed ``np.percentile`` (q must be in ``[0, 100]``);
  2100. ``standard_deviation`` / ``interquartile`` are multipliers with no upper
  2101. bound, so a request amount > 100 is valid for them.
  2102. """
  2103. if strategy_key != "semantic_vector":
  2104. return
  2105. sub = chunk_options.get(strategy_key) or {}
  2106. amt = sub.get("breakpoint_threshold_amount")
  2107. if amt is None:
  2108. return
  2109. kind = sub.get("breakpoint_threshold_type") or "percentile"
  2110. if kind in ("percentile", "gradient") and amt > 100:
  2111. raise ValueError(
  2112. f"chunking for strategy 'semantic_vector': "
  2113. f"breakpoint_threshold_amount ({amt}) must be within (0, 100] for "
  2114. f"breakpoint_threshold_type '{kind}'. The type is inherited from "
  2115. f"addon_params/env when not set in the request."
  2116. )
  2117. async def pipeline_index_texts(
  2118. rag: LightRAG,
  2119. texts: List[str],
  2120. file_sources: List[str] = None,
  2121. track_id: str = None,
  2122. chunking: Optional[TextChunkingConfig] = None,
  2123. ):
  2124. """Index a list of texts with track_id
  2125. Args:
  2126. rag: LightRAG instance
  2127. texts: The texts to index
  2128. file_sources: Sources of the texts
  2129. track_id: Optional tracking ID
  2130. chunking: Optional chunking strategy + params (already validated by
  2131. the request model); when None, default fixed-token chunking is used
  2132. """
  2133. if not texts:
  2134. return
  2135. if not file_sources or len(file_sources) != len(texts):
  2136. raise ValueError("A valid file source is required for each text")
  2137. normalized_file_sources = [normalize_file_path(source) for source in file_sources]
  2138. if any(source == UNKNOWN_FILE_SOURCE for source in normalized_file_sources):
  2139. raise ValueError("A valid file source is required for each text")
  2140. if len(set(normalized_file_sources)) != len(normalized_file_sources):
  2141. raise ValueError("File sources must be unique by filename")
  2142. process_options, chunk_options = _resolve_text_chunking(chunking, rag)
  2143. await rag.apipeline_enqueue_documents(
  2144. input=texts,
  2145. file_paths=normalized_file_sources,
  2146. track_id=track_id,
  2147. process_options=process_options,
  2148. chunk_options=chunk_options,
  2149. )
  2150. await rag.apipeline_process_enqueue_documents()
  2151. async def run_scanning_process(
  2152. rag: LightRAG, doc_manager: DocumentManager, track_id: str = None
  2153. ):
  2154. """Background task to scan and index documents
  2155. Args:
  2156. rag: LightRAG instance
  2157. doc_manager: DocumentManager instance
  2158. track_id: Optional tracking ID to pass to all scanned files
  2159. """
  2160. # The scan endpoint set ``scanning=True`` AND
  2161. # ``scanning_exclusive=True`` synchronously before scheduling this
  2162. # task. ``scanning`` covers the whole lifecycle (refuses
  2163. # overlapping scans); ``scanning_exclusive`` covers only the
  2164. # classification phase below — we clear it before invoking
  2165. # pipeline_index_files so concurrent uploads can land while the
  2166. # scan-driven processing finishes. Both MUST be cleared in
  2167. # finally so subsequent uploads / scans can proceed even if the
  2168. # body raises. When pipeline_status is not initialised (mocked
  2169. # test rigs), the flags were never set so there's nothing to
  2170. # clear — track that here to skip the namespace fetch.
  2171. from lightrag.exceptions import PipelineNotInitializedError
  2172. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  2173. pipeline_status = None
  2174. pipeline_status_lock = None
  2175. try:
  2176. pipeline_status = await get_namespace_data(
  2177. "pipeline_status", workspace=rag.workspace
  2178. )
  2179. pipeline_status_lock = get_namespace_lock(
  2180. "pipeline_status", workspace=rag.workspace
  2181. )
  2182. except PipelineNotInitializedError:
  2183. pass
  2184. try:
  2185. new_files = doc_manager.scan_directory_for_new_files()
  2186. total_files = len(new_files)
  2187. logger.info(f"Found {total_files} files to index.")
  2188. if new_files:
  2189. # Group canonical-equivalent files so we can prefer hint-bearing
  2190. # variants over plain ones. Within each group sort order is
  2191. # preserved as a deterministic tiebreaker.
  2192. files_by_canonical_name: dict[str, list[Path]] = {}
  2193. for file_path in sorted(
  2194. new_files, key=lambda p: get_pinyin_sort_key(str(p))
  2195. ):
  2196. canonical_name = normalize_file_path(str(file_path))
  2197. files_by_canonical_name.setdefault(canonical_name, []).append(file_path)
  2198. unique_files: list[Path] = []
  2199. for canonical_name, group in files_by_canonical_name.items():
  2200. # Prefer the first file carrying a supported parser hint so
  2201. # the user's explicit engine choice wins over plain variants;
  2202. # otherwise fall back to the first sorted entry.
  2203. chosen = next(
  2204. (f for f in group if filename_parser_hint(f.name) is not None),
  2205. group[0],
  2206. )
  2207. unique_files.append(chosen)
  2208. for duplicate in group:
  2209. if duplicate is chosen:
  2210. continue
  2211. warning = (
  2212. "Skipping duplicate file in scan batch: "
  2213. f"{duplicate.name} duplicates {chosen.name} "
  2214. f"(canonical: {canonical_name})"
  2215. )
  2216. await record_scan_warning(rag, warning)
  2217. try:
  2218. await move_file_to_parsed_dir(duplicate)
  2219. except Exception as move_error:
  2220. logger.error(
  2221. f"Failed to move duplicate scan file {duplicate.name} to {PARSED_DIR_NAME}: {move_error}"
  2222. )
  2223. # Partition unique_files into:
  2224. # * processed_files — already PROCESSED, archived and skipped.
  2225. # * resume_files — same canonical basename matches an existing
  2226. # non-PROCESSED doc_status row (PARSING /
  2227. # FAILED / PROCESSING / ANALYZING / PENDING).
  2228. # These must NOT go through pipeline_enqueue_file
  2229. # because apipeline_enqueue_documents would
  2230. # treat the same canonical name as a duplicate
  2231. # (returning None) and pipeline_enqueue_file
  2232. # would then archive the source as if it were
  2233. # a duplicate — corrupting pending-parse cases
  2234. # that still need the source on disk. The
  2235. # pipeline's resume logic, triggered via
  2236. # apipeline_process_enqueue_documents, will
  2237. # advance them based on their existing
  2238. # doc_status row.
  2239. # * new_files — no existing record; standard enqueue path.
  2240. new_files: list[Path] = []
  2241. resume_files: list[Path] = []
  2242. processed_files: list[str] = []
  2243. for file_path in unique_files:
  2244. filename = file_path.name
  2245. # Inline the canonical-basename lookup so we keep both the
  2246. # doc_id and the data: the FAILED-without-full_docs sub-case
  2247. # below needs the doc_id to delete the stale stub.
  2248. basename = normalize_file_path(str(file_path))
  2249. existing_match = (
  2250. await rag.doc_status.get_doc_by_file_basename(basename)
  2251. if basename != UNKNOWN_FILE_SOURCE
  2252. else None
  2253. )
  2254. existing_doc_id, existing_doc_data = (
  2255. existing_match if existing_match else (None, None)
  2256. )
  2257. if (
  2258. existing_doc_data
  2259. and get_doc_status_value(existing_doc_data)
  2260. == DocStatus.PROCESSED.value
  2261. ):
  2262. # File is already PROCESSED, skip it with warning and archive it.
  2263. processed_files.append(filename)
  2264. warning = f"Skipping already processed file: " f"{filename}"
  2265. await record_scan_warning(rag, warning)
  2266. try:
  2267. await move_file_to_parsed_dir(file_path)
  2268. except Exception as move_error:
  2269. logger.error(
  2270. f"Failed to move already processed file {filename} to {PARSED_DIR_NAME}: {move_error}"
  2271. )
  2272. elif existing_doc_data:
  2273. # FAILED rows recorded by apipeline_enqueue_error_documents
  2274. # never write a full_docs entry — extraction blew up before
  2275. # any content was stored. _validate_and_fix_document_consistency
  2276. # preserves them for manual review and removes them from the
  2277. # processing list, so the resume path can never advance them.
  2278. # When the user fixes the file and re-scans we want a real
  2279. # retry: drop the stale stub and treat the file as new so
  2280. # the standard enqueue path re-extracts content.
  2281. status_value = get_doc_status_value(existing_doc_data)
  2282. if status_value == DocStatus.FAILED.value:
  2283. full_doc = await rag.full_docs.get_by_id(existing_doc_id)
  2284. if full_doc is None:
  2285. try:
  2286. await rag.doc_status.delete([existing_doc_id])
  2287. except Exception as delete_error:
  2288. logger.error(
  2289. "Failed to delete stale failed-extraction "
  2290. f"doc_status stub {existing_doc_id} "
  2291. f"({filename}): {delete_error}"
  2292. )
  2293. # Fall through to resume — at worst the row
  2294. # remains preserved (current behaviour) rather
  2295. # than re-enqueued.
  2296. resume_files.append(file_path)
  2297. continue
  2298. logger.info(
  2299. "Retrying previously failed extraction; "
  2300. f"removed stale doc_status stub: {filename} "
  2301. f"(doc_id: {existing_doc_id})"
  2302. )
  2303. new_files.append(file_path)
  2304. continue
  2305. logger.info(
  2306. "Resuming previously unfinished file from scan: "
  2307. f"{filename} (Status: {status_value})"
  2308. )
  2309. resume_files.append(file_path)
  2310. else:
  2311. new_files.append(file_path)
  2312. # Classification phase complete — release ``scanning_exclusive``
  2313. # so concurrent uploads/inserts can land in doc_status while
  2314. # the scan-driven processing finishes. ``scanning`` stays
  2315. # True for the rest of the task lifecycle (releases in
  2316. # finally) so the /scan endpoint still refuses overlapping
  2317. # scans. Any per-file enqueue or duplicate detected during
  2318. # the processing phase is handled by
  2319. # apipeline_enqueue_documents' in-batch dedup, identical to
  2320. # the upload-during-busy case.
  2321. if pipeline_status is not None and pipeline_status_lock is not None:
  2322. async with pipeline_status_lock:
  2323. pipeline_status["scanning_exclusive"] = False
  2324. # New files take the standard enqueue + process path. When at
  2325. # least one new file is successfully enqueued, pipeline_index_files
  2326. # internally invokes apipeline_process_enqueue_documents, which
  2327. # selects work by doc_status state and so will also pick up any
  2328. # resume_files in the same run.
  2329. if new_files:
  2330. await pipeline_index_files(
  2331. rag,
  2332. new_files,
  2333. track_id,
  2334. from_scan=True,
  2335. )
  2336. # Resume targets must always trigger the pipeline explicitly:
  2337. # pipeline_index_files only runs apipeline_process_enqueue_documents
  2338. # after at least one new file successfully enqueues, so when every
  2339. # new file is rejected (unsupported extension, empty body, content
  2340. # / filename duplicate, ...) the resume rows would otherwise stay
  2341. # stuck until an unrelated indexing run. When new files DID
  2342. # enqueue, the inner call already drained the queue and this is a
  2343. # cheap no-op that returns "No documents to process".
  2344. if resume_files:
  2345. await rag.apipeline_process_enqueue_documents()
  2346. total_active = len(new_files) + len(resume_files)
  2347. if total_active or processed_files:
  2348. summary_parts: list[str] = []
  2349. if total_active:
  2350. summary_parts.append(f"{total_active} files Processed")
  2351. if processed_files:
  2352. summary_parts.append(f"{len(processed_files)} skipped")
  2353. logger.info(f"Scanning process completed: {' '.join(summary_parts)}.")
  2354. else:
  2355. logger.info(
  2356. "No files to process after filtering already processed files."
  2357. )
  2358. else:
  2359. # No new files to index — classification is trivially done;
  2360. # release ``scanning_exclusive`` before driving the queue so
  2361. # concurrent uploads can land while process_enqueue runs.
  2362. if pipeline_status is not None and pipeline_status_lock is not None:
  2363. async with pipeline_status_lock:
  2364. pipeline_status["scanning_exclusive"] = False
  2365. logger.info(
  2366. "No upload file found, check if there are any documents in the queue..."
  2367. )
  2368. await rag.apipeline_process_enqueue_documents()
  2369. except Exception as e:
  2370. logger.error(f"Error during scanning process: {str(e)}")
  2371. logger.error(traceback.format_exc())
  2372. finally:
  2373. # Always release both scanning flags so future uploads / scans
  2374. # are not blocked by a crashed task. Skip when pipeline_status
  2375. # was never initialised for this workspace (test rigs).
  2376. if pipeline_status is not None and pipeline_status_lock is not None:
  2377. async with pipeline_status_lock:
  2378. pipeline_status["scanning"] = False
  2379. pipeline_status["scanning_exclusive"] = False
  2380. async def background_delete_documents(
  2381. rag: LightRAG,
  2382. doc_manager: DocumentManager,
  2383. doc_ids: List[str],
  2384. delete_file: bool = False,
  2385. delete_llm_cache: bool = False,
  2386. ):
  2387. """Background task to delete multiple documents"""
  2388. from lightrag.kg.shared_storage import (
  2389. get_namespace_data,
  2390. get_namespace_lock,
  2391. )
  2392. pipeline_status = await get_namespace_data(
  2393. "pipeline_status", workspace=rag.workspace
  2394. )
  2395. pipeline_status_lock = get_namespace_lock(
  2396. "pipeline_status", workspace=rag.workspace
  2397. )
  2398. total_docs = len(doc_ids)
  2399. successful_deletions = []
  2400. failed_deletions = []
  2401. # The /documents/delete_document endpoint has already reserved the
  2402. # destructive slot synchronously: ``busy=True`` and
  2403. # ``destructive_busy=True`` were set before the client got
  2404. # ``deletion_started``, after checking busy + scanning +
  2405. # pending_enqueues>0 atomically. Here we only update the
  2406. # job-info fields; the busy reservation was acquired by the
  2407. # endpoint and is released in the finally block below.
  2408. async with pipeline_status_lock:
  2409. pipeline_status.update(
  2410. {
  2411. # Job name can not be changed, it's verified in adelete_by_doc_id()
  2412. "job_name": f"Deleting {total_docs} Documents",
  2413. "job_start": datetime.now().isoformat(),
  2414. "docs": total_docs,
  2415. "batchs": total_docs,
  2416. "cur_batch": 0,
  2417. "latest_message": "Starting document deletion process",
  2418. }
  2419. )
  2420. # Use slice assignment to clear the list in place
  2421. pipeline_status["history_messages"][:] = ["Starting document deletion process"]
  2422. if delete_llm_cache:
  2423. pipeline_status["history_messages"].append(
  2424. "LLM cache cleanup requested for this deletion job"
  2425. )
  2426. try:
  2427. # Loop through each document ID and delete them one by one
  2428. for i, doc_id in enumerate(doc_ids, 1):
  2429. # Check for cancellation at the start of each document deletion
  2430. async with pipeline_status_lock:
  2431. if pipeline_status.get("cancellation_requested", False):
  2432. cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
  2433. logger.info(cancel_msg)
  2434. pipeline_status["latest_message"] = cancel_msg
  2435. pipeline_status["history_messages"].append(cancel_msg)
  2436. # Add remaining documents to failed list with cancellation reason
  2437. failed_deletions.extend(
  2438. doc_ids[i - 1 :]
  2439. ) # i-1 because enumerate starts at 1
  2440. break # Exit the loop, remaining documents unchanged
  2441. start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
  2442. logger.info(start_msg)
  2443. pipeline_status["cur_batch"] = i
  2444. pipeline_status["latest_message"] = start_msg
  2445. pipeline_status["history_messages"].append(start_msg)
  2446. file_path = "#"
  2447. try:
  2448. result = await rag.adelete_by_doc_id(
  2449. doc_id, delete_llm_cache=delete_llm_cache
  2450. )
  2451. file_path = (
  2452. getattr(result, "file_path", "-") if "result" in locals() else "-"
  2453. )
  2454. if result.status == "success":
  2455. successful_deletions.append(doc_id)
  2456. success_msg = (
  2457. f"Document deleted {i}/{total_docs}: {doc_id}[{file_path}]"
  2458. )
  2459. logger.info(success_msg)
  2460. async with pipeline_status_lock:
  2461. pipeline_status["history_messages"].append(success_msg)
  2462. # Handle file deletion if requested and source information is available
  2463. if (
  2464. delete_file
  2465. and result.file_path
  2466. and result.file_path != UNKNOWN_FILE_SOURCE
  2467. ):
  2468. try:
  2469. deleted_files, file_delete_errors = (
  2470. delete_file_variants_by_file_path(
  2471. doc_manager.input_dir,
  2472. result.file_path,
  2473. )
  2474. )
  2475. for file_delete_error in file_delete_errors:
  2476. logger.warning(file_delete_error)
  2477. async with pipeline_status_lock:
  2478. pipeline_status["latest_message"] = (
  2479. file_delete_error
  2480. )
  2481. pipeline_status["history_messages"].append(
  2482. file_delete_error
  2483. )
  2484. if deleted_files:
  2485. file_delete_msg = (
  2486. "Successfully deleted source files: "
  2487. + ", ".join(deleted_files)
  2488. )
  2489. logger.info(file_delete_msg)
  2490. async with pipeline_status_lock:
  2491. pipeline_status["latest_message"] = file_delete_msg
  2492. pipeline_status["history_messages"].append(
  2493. file_delete_msg
  2494. )
  2495. else:
  2496. file_error_msg = (
  2497. "File deletion skipped, missing or unsafe file: "
  2498. f"{result.file_path}"
  2499. )
  2500. logger.warning(file_error_msg)
  2501. async with pipeline_status_lock:
  2502. pipeline_status["latest_message"] = file_error_msg
  2503. pipeline_status["history_messages"].append(
  2504. file_error_msg
  2505. )
  2506. except Exception as file_error:
  2507. file_error_msg = f"Failed to delete file {result.file_path}: {str(file_error)}"
  2508. logger.error(file_error_msg)
  2509. async with pipeline_status_lock:
  2510. pipeline_status["latest_message"] = file_error_msg
  2511. pipeline_status["history_messages"].append(
  2512. file_error_msg
  2513. )
  2514. elif delete_file:
  2515. no_file_msg = (
  2516. f"File deletion skipped, missing file path: {doc_id}"
  2517. )
  2518. logger.warning(no_file_msg)
  2519. async with pipeline_status_lock:
  2520. pipeline_status["latest_message"] = no_file_msg
  2521. pipeline_status["history_messages"].append(no_file_msg)
  2522. else:
  2523. failed_deletions.append(doc_id)
  2524. error_msg = f"Failed to delete {i}/{total_docs}: {doc_id}[{file_path}] - {result.message}"
  2525. logger.error(error_msg)
  2526. async with pipeline_status_lock:
  2527. pipeline_status["latest_message"] = error_msg
  2528. pipeline_status["history_messages"].append(error_msg)
  2529. except Exception as e:
  2530. failed_deletions.append(doc_id)
  2531. error_msg = f"Error deleting document {i}/{total_docs}: {doc_id}[{file_path}] - {str(e)}"
  2532. logger.error(error_msg)
  2533. logger.error(traceback.format_exc())
  2534. async with pipeline_status_lock:
  2535. pipeline_status["latest_message"] = error_msg
  2536. pipeline_status["history_messages"].append(error_msg)
  2537. except Exception as e:
  2538. error_msg = f"Critical error during batch deletion: {str(e)}"
  2539. logger.error(error_msg)
  2540. logger.error(traceback.format_exc())
  2541. async with pipeline_status_lock:
  2542. pipeline_status["history_messages"].append(error_msg)
  2543. finally:
  2544. # Final summary and check for pending requests
  2545. async with pipeline_status_lock:
  2546. pipeline_status["busy"] = False
  2547. pipeline_status["destructive_busy"] = False
  2548. pipeline_status["pending_requests"] = False # Reset pending requests flag
  2549. pipeline_status["cancellation_requested"] = (
  2550. False # Always reset cancellation flag
  2551. )
  2552. completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
  2553. pipeline_status["latest_message"] = completion_msg
  2554. pipeline_status["history_messages"].append(completion_msg)
  2555. # Check if there are pending document indexing requests
  2556. has_pending_request = pipeline_status.get("request_pending", False)
  2557. # If there are pending requests, start document processing pipeline
  2558. if has_pending_request:
  2559. try:
  2560. logger.info(
  2561. "Processing pending document indexing requests after deletion"
  2562. )
  2563. await rag.apipeline_process_enqueue_documents()
  2564. except Exception as e:
  2565. logger.error(f"Error processing pending documents after deletion: {e}")
  2566. def create_document_routes(
  2567. rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
  2568. ):
  2569. # Fresh router per call — see the note above the temp_prefix constant.
  2570. router = APIRouter(
  2571. prefix="/documents",
  2572. tags=["documents"],
  2573. )
  2574. # Create combined auth dependency for document routes
  2575. combined_auth = get_combined_auth_dependency(api_key)
  2576. @router.post(
  2577. "/scan", response_model=ScanResponse, dependencies=[Depends(combined_auth)]
  2578. )
  2579. async def scan_for_new_documents(background_tasks: BackgroundTasks):
  2580. """
  2581. Trigger the scanning process for new documents.
  2582. Refuses to start a new scan with
  2583. ``status='scanning_skipped_pipeline_busy'`` (and does not
  2584. schedule a background task) when any of these is set:
  2585. - ``pipeline_status["busy"]`` — the processing loop or another
  2586. destructive job is running.
  2587. - ``pipeline_status["scanning"]`` — another scan is already
  2588. running (any phase: classification or processing).
  2589. - ``pipeline_status["pending_enqueues"] > 0`` — an /upload,
  2590. /text or /texts endpoint has reserved a slot whose bg task
  2591. has not yet written to doc_status; starting a scan now would
  2592. race scan's classification reads against that pending write.
  2593. Both ``scanning`` and ``scanning_exclusive`` are acquired
  2594. synchronously here so a subsequent fast-follow request hits the
  2595. guard rather than racing against the not-yet-started task.
  2596. ``run_scanning_process`` clears ``scanning_exclusive`` once
  2597. classification is done, allowing concurrent uploads to land
  2598. while the scan-driven processing finishes.
  2599. Returns:
  2600. ScanResponse: A response object containing the scanning status and track_id
  2601. """
  2602. from lightrag.exceptions import PipelineNotInitializedError
  2603. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  2604. # Generate track_id with "scan" prefix for scanning operation
  2605. track_id = generate_track_id("scan")
  2606. try:
  2607. pipeline_status = await get_namespace_data(
  2608. "pipeline_status", workspace=rag.workspace
  2609. )
  2610. except PipelineNotInitializedError:
  2611. # Workspace pipeline_status not yet bootstrapped (e.g. mocked
  2612. # test rigs). Treat as idle and allow the scan to proceed; the
  2613. # scanning flag has nowhere to live so it is effectively skipped.
  2614. background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
  2615. return ScanResponse(
  2616. status="scanning_started",
  2617. message="Scanning process has been initiated in the background",
  2618. track_id=track_id,
  2619. )
  2620. pipeline_status_lock = get_namespace_lock(
  2621. "pipeline_status", workspace=rag.workspace
  2622. )
  2623. # Atomically acquire the scanning flag. Scan is the exclusive
  2624. # writer in this contract — it reads doc_status to make
  2625. # classification decisions (PROCESSED / resume / retry-as-new /
  2626. # archive) and would race with concurrent writers — so refuse if:
  2627. # * pipeline is processing (busy=True): scan + processing both
  2628. # read/mutate doc_status; serialise.
  2629. # * another scan is in flight (scanning=True).
  2630. # * any /upload, /text, /texts endpoint has reserved a
  2631. # pending-enqueue slot (see _reserve_enqueue_slot): the bg
  2632. # task has not yet written doc_status and we would otherwise
  2633. # race with its mid-flight write.
  2634. async with pipeline_status_lock:
  2635. if pipeline_status.get("busy"):
  2636. logger.warning(
  2637. "Scan request skipped: pipeline is busy processing documents"
  2638. )
  2639. return ScanResponse(
  2640. status="scanning_skipped_pipeline_busy",
  2641. message=(
  2642. "Pipeline is currently busy processing documents. "
  2643. "Wait for the running job to finish before triggering another scan."
  2644. ),
  2645. track_id=track_id,
  2646. )
  2647. if pipeline_status.get("scanning"):
  2648. logger.warning(
  2649. "Scan request skipped: another scan is already in progress"
  2650. )
  2651. return ScanResponse(
  2652. status="scanning_skipped_pipeline_busy",
  2653. message=(
  2654. "Another scan is already in progress. "
  2655. "Wait for it to finish before triggering a new one."
  2656. ),
  2657. track_id=track_id,
  2658. )
  2659. pending_enqueues = pipeline_status.get("pending_enqueues", 0)
  2660. if pending_enqueues > 0:
  2661. logger.warning(
  2662. "Scan request skipped: "
  2663. f"{pending_enqueues} pending enqueue(s) reserved by "
  2664. "upload/insert endpoints"
  2665. )
  2666. return ScanResponse(
  2667. status="scanning_skipped_pipeline_busy",
  2668. message=(
  2669. "Document upload/insert is being enqueued. "
  2670. "Wait for in-flight work to complete before triggering a scan."
  2671. ),
  2672. track_id=track_id,
  2673. )
  2674. # ``scanning`` covers the whole scan task lifecycle (used by
  2675. # this endpoint to refuse overlapping scans).
  2676. # ``scanning_exclusive`` is True only during the
  2677. # classification phase: run_scanning_process clears it once
  2678. # classification is done so concurrent uploads can land
  2679. # while the scan-driven processing finishes.
  2680. pipeline_status["scanning"] = True
  2681. pipeline_status["scanning_exclusive"] = True
  2682. # Start the scanning process in the background with track_id. The
  2683. # task is responsible for clearing both flags in its finally block.
  2684. background_tasks.add_task(run_scanning_process, rag, doc_manager, track_id)
  2685. return ScanResponse(
  2686. status="scanning_started",
  2687. message="Scanning process has been initiated in the background",
  2688. track_id=track_id,
  2689. )
  2690. @router.post(
  2691. "/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
  2692. )
  2693. async def upload_to_input_dir(
  2694. background_tasks: BackgroundTasks, file: UploadFile = File(...)
  2695. ):
  2696. """
  2697. Upload a file to the input directory and index it.
  2698. This API endpoint accepts a file through an HTTP POST request, checks if the
  2699. uploaded file is of a supported type, saves it in the specified input directory,
  2700. indexes it for retrieval, and returns a success status with relevant details.
  2701. **File Size Limit:**
  2702. - Configurable via `MAX_UPLOAD_SIZE` environment variable (default: 100MB)
  2703. - Set to `None` or `0` for unlimited upload size
  2704. - Returns HTTP 413 (Request Entity Too Large) if file exceeds limit
  2705. **Duplicate Detection Behavior:**
  2706. This endpoint handles two types of duplicate scenarios differently:
  2707. 1. **Filename Duplicate (Synchronous Detection)**:
  2708. - Detected immediately, before any file is written.
  2709. - File name is treated as the unique document key. Both
  2710. ``doc_status`` and the INPUT directory are checked under the
  2711. canonical (parser-hint stripped) basename so ``abc.docx`` and
  2712. ``abc.[native].docx`` map to the same record.
  2713. - **HTTP 409** is returned when a same-name record already exists.
  2714. The response detail names the conflict source ("Document
  2715. storage already contains ..." or "Input directory already
  2716. contains ..."). Clients must delete the existing document
  2717. (``DELETE /documents/{doc_id}``) before re-uploading; there is
  2718. no longer a 200 ``status="duplicated"`` soft-fail response.
  2719. 2. **Content Duplicate (Asynchronous Detection)**:
  2720. - Detected during background processing after content extraction
  2721. - Returns `status="success"` with a new track_id immediately
  2722. - The duplicate is detected later when processing the file content
  2723. - Use `/documents/track_status/{track_id}` to check the final result:
  2724. - Document will have `status="FAILED"`
  2725. - `error_msg` contains "Content already exists. Original doc_id: xxx"
  2726. - `metadata.is_duplicate=true` with reference to original document
  2727. - `metadata.original_doc_id` points to the existing document
  2728. - `metadata.original_track_id` shows the original upload's track_id
  2729. **Why Different Behavior?**
  2730. - Filename check is fast (simple lookup), done synchronously
  2731. - Content extraction is expensive (PDF/DOCX parsing), done asynchronously
  2732. - This design prevents blocking the client during expensive operations
  2733. **Concurrency Constraint:**
  2734. - The endpoint refuses with HTTP 409 only while one of the
  2735. following exclusive-writer states is set:
  2736. ``pipeline_status["scanning_exclusive"]`` (a scan is in its
  2737. classification phase, reading and possibly mutating doc_status)
  2738. or ``pipeline_status["destructive_busy"]`` (``/documents/clear``
  2739. or per-doc delete is dropping storages / removing input files).
  2740. Wait for the running job to finish before re-submitting.
  2741. - ``busy=True`` from the processing loop, and a scan in its
  2742. processing phase (``scanning=True`` with
  2743. ``scanning_exclusive=False``), do NOT block uploads — uploads
  2744. are accepted concurrently and the running pipeline picks them
  2745. up via its ``request_pending`` mechanism.
  2746. Args:
  2747. background_tasks: FastAPI BackgroundTasks for async processing
  2748. file (UploadFile): The file to be uploaded. It must have an allowed extension.
  2749. Returns:
  2750. InsertResponse: A response object containing the upload status and a message.
  2751. - status="success": File accepted and queued for processing
  2752. Raises:
  2753. HTTPException: 400 unsupported file type, 409 same-name
  2754. conflict or scan-classifying / destructive job in
  2755. flight, 413 file too large, 500 other errors.
  2756. """
  2757. slot_reserved = False
  2758. try:
  2759. # Reject upload while a scan is in its CLASSIFICATION
  2760. # phase or a destructive job (clear / per-doc delete) is
  2761. # in flight, AND reserve a pending-enqueue slot so a scan
  2762. # request that arrives before the bg task runs cannot
  2763. # transition scanning_exclusive=True under us. Concurrent
  2764. # processing (``busy=True``) and a scan in its processing
  2765. # phase (``scanning=True`` with
  2766. # ``scanning_exclusive=False``) are permitted: the running
  2767. # loop's ``request_pending`` mechanism picks up our doc
  2768. # after the current batch.
  2769. slot_reserved = await _reserve_enqueue_slot(rag)
  2770. # Sanitize filename to prevent Path Traversal attacks
  2771. safe_filename = sanitize_filename(file.filename, doc_manager.input_dir)
  2772. if not doc_manager.is_supported_file(safe_filename):
  2773. raise HTTPException(
  2774. status_code=400,
  2775. detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
  2776. )
  2777. # Check file size limit (if configured)
  2778. if (
  2779. global_args.max_upload_size is not None
  2780. and global_args.max_upload_size > 0
  2781. ):
  2782. # Safe access to file size (not available in older Starlette versions)
  2783. file_size = getattr(file, "size", None)
  2784. # Pre-flight size check (only if size is available)
  2785. if file_size is not None:
  2786. if file_size > global_args.max_upload_size:
  2787. raise HTTPException(
  2788. status_code=413,
  2789. detail=f"File too large. Maximum size: {global_args.max_upload_size / 1024 / 1024:.1f}MB, uploaded: {file_size / 1024 / 1024:.1f}MB",
  2790. )
  2791. else:
  2792. # If size not available, we'll check during streaming
  2793. logger.debug(
  2794. f"File size not available in UploadFile for {safe_filename}, will check during streaming"
  2795. )
  2796. file_path = doc_manager.input_dir / safe_filename
  2797. # Strict name pre-check. Both the INPUT directory and doc_status
  2798. # must be free of any same-canonical-basename record before we
  2799. # accept the upload. Replacing an existing document requires an
  2800. # explicit DELETE first; we no longer write a "duplicated" 200
  2801. # response that silently no-ops.
  2802. existing_doc_data = await get_existing_doc_by_file_path_candidates(
  2803. rag.doc_status, file_path
  2804. )
  2805. if existing_doc_data:
  2806. status = get_doc_status_value(existing_doc_data) or "unknown"
  2807. raise HTTPException(
  2808. status_code=409,
  2809. detail=(
  2810. f"Document storage already contains '{safe_filename}' "
  2811. f"(Status: {status}). Delete the existing record before re-uploading."
  2812. ),
  2813. )
  2814. # INPUT directory check, using canonical parser-hint names.
  2815. # Fast path: exact filename match avoids iterdir on large input directories.
  2816. canonical_filename = normalize_file_path(safe_filename)
  2817. if file_path.exists():
  2818. existing_input_file: Path | None = file_path
  2819. else:
  2820. existing_input_file = find_existing_file_by_file_path(
  2821. doc_manager.input_dir, canonical_filename
  2822. )
  2823. if existing_input_file:
  2824. raise HTTPException(
  2825. status_code=409,
  2826. detail=(
  2827. f"Input directory already contains a file with the same "
  2828. f"canonical basename ('{existing_input_file.name}'). "
  2829. f"Remove or rename it before re-uploading."
  2830. ),
  2831. )
  2832. # Async streaming write with size check
  2833. bytes_written = 0
  2834. chunk_size = 1024 * 1024 # 1MB chunks
  2835. needs_cleanup = False
  2836. async with aiofiles.open(file_path, "wb") as out_file:
  2837. while True:
  2838. # Read chunk from upload stream
  2839. chunk = await file.read(chunk_size)
  2840. if not chunk:
  2841. break
  2842. # Check size limit during streaming (if not checked before)
  2843. if (
  2844. global_args.max_upload_size is not None
  2845. and global_args.max_upload_size > 0
  2846. ):
  2847. bytes_written += len(chunk)
  2848. if bytes_written > global_args.max_upload_size:
  2849. needs_cleanup = True
  2850. break
  2851. # Write chunk to file
  2852. await out_file.write(chunk)
  2853. # Cleanup after file is closed
  2854. if needs_cleanup:
  2855. try:
  2856. file_path.unlink()
  2857. except Exception as cleanup_error:
  2858. logger.error(
  2859. f"Error cleaning up oversized file {safe_filename}: {cleanup_error}"
  2860. )
  2861. raise HTTPException(
  2862. status_code=413,
  2863. detail=f"File too large. Maximum size: {global_args.max_upload_size / 1024 / 1024:.1f}MB, uploaded: {bytes_written / 1024 / 1024:.1f}MB",
  2864. )
  2865. track_id = generate_track_id("upload")
  2866. # Bg task: enqueue + trigger processing, then release the slot.
  2867. # ``pipeline_index_file`` does both: it calls
  2868. # ``pipeline_enqueue_file`` (writes doc_status / full_docs) and
  2869. # then ``apipeline_process_enqueue_documents``. The latter is
  2870. # safe to invoke even when the loop is already busy — it
  2871. # collapses to a ``request_pending=True`` nudge and returns,
  2872. # so concurrent uploads/inserts cooperate via the running
  2873. # loop's request_pending mechanism.
  2874. async def _indexing_task():
  2875. try:
  2876. await pipeline_index_file(rag, file_path, track_id)
  2877. finally:
  2878. await _release_enqueue_slot(rag)
  2879. background_tasks.add_task(_indexing_task)
  2880. # Ownership of the slot transferred to the bg task — the
  2881. # finally block below must NOT release it again.
  2882. slot_reserved = False
  2883. return InsertResponse(
  2884. status="success",
  2885. message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
  2886. track_id=track_id,
  2887. )
  2888. except HTTPException:
  2889. # Re-raise HTTP exceptions (400, 413, etc.)
  2890. raise
  2891. except Exception as e:
  2892. logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")
  2893. logger.error(traceback.format_exc())
  2894. raise HTTPException(status_code=500, detail=str(e))
  2895. finally:
  2896. # If we reserved a slot but never scheduled the bg task
  2897. # (e.g. early validation rejection or streaming-write
  2898. # failure), release here. No drain coordination needed —
  2899. # any sibling bg task triggers its own processing pass.
  2900. if slot_reserved:
  2901. await _release_enqueue_slot(rag)
  2902. @router.post(
  2903. "/text", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
  2904. )
  2905. async def insert_text(
  2906. request: InsertTextRequest, background_tasks: BackgroundTasks
  2907. ):
  2908. """
  2909. Insert text into the RAG system.
  2910. This endpoint allows you to insert text data into the RAG system for later retrieval
  2911. and use in generating responses.
  2912. **Concurrency Constraint:**
  2913. - Refuses with HTTP 409 only while
  2914. ``pipeline_status["scanning_exclusive"]`` (a scan is in its
  2915. classification phase) or ``pipeline_status["destructive_busy"]``
  2916. (clear / per-doc delete is in flight) is set. ``busy=True``
  2917. from the processing loop, and a scan in its processing phase,
  2918. do NOT block — the running pipeline picks up the new doc via
  2919. ``request_pending``.
  2920. Args:
  2921. request (InsertTextRequest): The request body containing the text to be inserted.
  2922. background_tasks: FastAPI BackgroundTasks for async processing
  2923. Returns:
  2924. InsertResponse: A response object containing the status of the operation.
  2925. Raises:
  2926. HTTPException: 400 invalid file_source, 409 same-name conflict
  2927. or scan/destructive job in flight, 500 other errors.
  2928. """
  2929. slot_reserved = False
  2930. try:
  2931. # Reject text insertion while a scan is in progress AND reserve
  2932. # a pending-enqueue slot — see /upload for the rationale.
  2933. slot_reserved = await _reserve_enqueue_slot(rag)
  2934. # Check if file_source already exists in doc_status storage
  2935. if not is_valid_file_source(request.file_source):
  2936. raise HTTPException(
  2937. status_code=400,
  2938. detail="A valid file_source is required for text insertion",
  2939. )
  2940. normalized_file_source = normalize_file_path(request.file_source)
  2941. existing_doc_data = await get_existing_doc_by_file_path_candidates(
  2942. rag.doc_status, normalized_file_source
  2943. )
  2944. if existing_doc_data:
  2945. status = get_doc_status_value(existing_doc_data) or "unknown"
  2946. raise HTTPException(
  2947. status_code=409,
  2948. detail=(
  2949. f"Document storage already contains '{normalized_file_source}' "
  2950. f"(Status: {status}). Delete the existing record before re-inserting."
  2951. ),
  2952. )
  2953. # Resolve + validate chunking synchronously so an invalid
  2954. # effective config (e.g. chunk_token_size below the inherited
  2955. # overlap) fails with HTTP 422 here, before any background work is
  2956. # scheduled. pipeline_index_texts re-resolves from the same
  2957. # addon_params inside the task.
  2958. try:
  2959. _resolve_text_chunking(request.chunking, rag)
  2960. except ValueError as exc:
  2961. raise HTTPException(status_code=422, detail=str(exc))
  2962. # Generate track_id for text insertion
  2963. track_id = generate_track_id("insert")
  2964. async def _indexing_task():
  2965. try:
  2966. await pipeline_index_texts(
  2967. rag,
  2968. [request.text],
  2969. file_sources=[normalized_file_source],
  2970. track_id=track_id,
  2971. chunking=request.chunking,
  2972. )
  2973. finally:
  2974. await _release_enqueue_slot(rag)
  2975. background_tasks.add_task(_indexing_task)
  2976. slot_reserved = False
  2977. return InsertResponse(
  2978. status="success",
  2979. message="Text successfully received. Processing will continue in background.",
  2980. track_id=track_id,
  2981. )
  2982. except HTTPException:
  2983. raise
  2984. except Exception as e:
  2985. logger.error(f"Error /documents/text: {str(e)}")
  2986. logger.error(traceback.format_exc())
  2987. raise HTTPException(status_code=500, detail=str(e))
  2988. finally:
  2989. if slot_reserved:
  2990. await _release_enqueue_slot(rag)
  2991. @router.post(
  2992. "/texts",
  2993. response_model=InsertResponse,
  2994. dependencies=[Depends(combined_auth)],
  2995. )
  2996. async def insert_texts(
  2997. request: InsertTextsRequest, background_tasks: BackgroundTasks
  2998. ):
  2999. """
  3000. Insert multiple texts into the RAG system.
  3001. This endpoint allows you to insert multiple text entries into the RAG system
  3002. in a single request.
  3003. **Concurrency Constraint:**
  3004. - Refuses with HTTP 409 only while
  3005. ``pipeline_status["scanning_exclusive"]`` (a scan is in its
  3006. classification phase) or ``pipeline_status["destructive_busy"]``
  3007. (clear / per-doc delete is in flight) is set. ``busy=True``
  3008. from the processing loop, and a scan in its processing phase,
  3009. do NOT block — the running pipeline picks up the new docs via
  3010. ``request_pending``.
  3011. Args:
  3012. request (InsertTextsRequest): The request body containing the list of texts.
  3013. background_tasks: FastAPI BackgroundTasks for async processing
  3014. Returns:
  3015. InsertResponse: A response object containing the status of the operation.
  3016. Raises:
  3017. HTTPException: 400 invalid file_sources, 409 same-name
  3018. conflict or scan/destructive job in flight, 500 other
  3019. errors.
  3020. """
  3021. slot_reserved = False
  3022. try:
  3023. # Reject batch text insertion while a scan is in progress AND
  3024. # reserve a pending-enqueue slot — see /upload for the rationale.
  3025. slot_reserved = await _reserve_enqueue_slot(rag)
  3026. # Check if any file_sources already exist in doc_status storage
  3027. if not request.file_sources or len(request.file_sources) != len(
  3028. request.texts
  3029. ):
  3030. raise HTTPException(
  3031. status_code=400,
  3032. detail="A valid file_source is required for each text",
  3033. )
  3034. normalized_file_sources = [
  3035. normalize_file_path(file_source) for file_source in request.file_sources
  3036. ]
  3037. if any(
  3038. file_source == UNKNOWN_FILE_SOURCE
  3039. for file_source in normalized_file_sources
  3040. ):
  3041. raise HTTPException(
  3042. status_code=400,
  3043. detail="A valid file_source is required for each text",
  3044. )
  3045. if len(set(normalized_file_sources)) != len(normalized_file_sources):
  3046. raise HTTPException(
  3047. status_code=400,
  3048. detail="file_sources must be unique by filename",
  3049. )
  3050. for file_source in normalized_file_sources:
  3051. existing_doc_data = await get_existing_doc_by_file_path_candidates(
  3052. rag.doc_status, file_source
  3053. )
  3054. if existing_doc_data:
  3055. status = get_doc_status_value(existing_doc_data) or "unknown"
  3056. raise HTTPException(
  3057. status_code=409,
  3058. detail=(
  3059. f"Document storage already contains '{file_source}' "
  3060. f"(Status: {status}). Delete the existing record before re-inserting."
  3061. ),
  3062. )
  3063. # Resolve + validate the shared chunking synchronously so an
  3064. # invalid effective config (e.g. chunk_token_size below the
  3065. # inherited overlap) fails with HTTP 422 here, before any
  3066. # background work is scheduled. pipeline_index_texts re-resolves
  3067. # from the same addon_params inside the task.
  3068. try:
  3069. _resolve_text_chunking(request.chunking, rag)
  3070. except ValueError as exc:
  3071. raise HTTPException(status_code=422, detail=str(exc))
  3072. # Generate track_id for texts insertion
  3073. track_id = generate_track_id("insert")
  3074. async def _indexing_task():
  3075. try:
  3076. await pipeline_index_texts(
  3077. rag,
  3078. request.texts,
  3079. file_sources=normalized_file_sources,
  3080. track_id=track_id,
  3081. chunking=request.chunking,
  3082. )
  3083. finally:
  3084. await _release_enqueue_slot(rag)
  3085. background_tasks.add_task(_indexing_task)
  3086. slot_reserved = False
  3087. return InsertResponse(
  3088. status="success",
  3089. message="Texts successfully received. Processing will continue in background.",
  3090. track_id=track_id,
  3091. )
  3092. except HTTPException:
  3093. raise
  3094. except Exception as e:
  3095. logger.error(f"Error /documents/texts: {str(e)}")
  3096. logger.error(traceback.format_exc())
  3097. raise HTTPException(status_code=500, detail=str(e))
  3098. finally:
  3099. if slot_reserved:
  3100. await _release_enqueue_slot(rag)
  3101. @router.delete(
  3102. "", response_model=ClearDocumentsResponse, dependencies=[Depends(combined_auth)]
  3103. )
  3104. async def clear_documents():
  3105. """
  3106. Clear all documents from the RAG system.
  3107. This endpoint deletes all documents, entities, relationships, and files from the system.
  3108. It uses the storage drop methods to properly clean up all data and removes all files
  3109. from the input directory.
  3110. **Concurrency Constraint:**
  3111. - Atomically reserves the destructive slot (sets ``busy=True``
  3112. and ``destructive_busy=True``) before dropping anything.
  3113. Refuses with ``status="busy"`` when ANY of these is set:
  3114. ``pipeline_status["busy"]`` (processing loop or another
  3115. destructive job in flight), ``pipeline_status["scanning"]``
  3116. (a scan is anywhere in its lifecycle), or
  3117. ``pipeline_status["pending_enqueues"] > 0`` (an /upload,
  3118. /text or /texts has reserved a slot whose bg task has not
  3119. yet written to doc_status).
  3120. Returns:
  3121. ClearDocumentsResponse: A response object containing the status and message.
  3122. - status="success": All documents and files were successfully cleared.
  3123. - status="partial_success": Document clear job exit with some errors.
  3124. - status="busy": Operation could not be completed because another
  3125. writer (busy / scanning / pending enqueue) holds the pipeline.
  3126. - status="fail": All storage drop operations failed, with message
  3127. - message: Detailed information about the operation results, including counts
  3128. of deleted files and any errors encountered.
  3129. Raises:
  3130. HTTPException: Raised when a serious error occurs during the clearing process,
  3131. with status code 500 and error details in the detail field.
  3132. """
  3133. from lightrag.kg.shared_storage import (
  3134. get_namespace_data,
  3135. get_namespace_lock,
  3136. )
  3137. # Get pipeline status and lock
  3138. pipeline_status = await get_namespace_data(
  3139. "pipeline_status", workspace=rag.workspace
  3140. )
  3141. pipeline_status_lock = get_namespace_lock(
  3142. "pipeline_status", workspace=rag.workspace
  3143. )
  3144. # Atomically reserve the destructive slot. Checks busy +
  3145. # scanning + pending_enqueues>0 in a single critical section
  3146. # before flipping busy=True and destructive_busy=True together.
  3147. # ``destructive_busy`` blocks reservation and the enqueue
  3148. # last-line guard: clear is about to drop every storage and
  3149. # remove every input file, so a concurrent upload accepted in
  3150. # this window would write to storages mid-drop and silently
  3151. # lose the document.
  3152. acquired, reason = await _acquire_destructive_busy(rag)
  3153. if not acquired:
  3154. return ClearDocumentsResponse(status="busy", message=reason)
  3155. async with pipeline_status_lock:
  3156. pipeline_status.update(
  3157. {
  3158. "job_name": "Clearing Documents",
  3159. "job_start": datetime.now().isoformat(),
  3160. "docs": 0,
  3161. "batchs": 0,
  3162. "cur_batch": 0,
  3163. "request_pending": False, # Clear any previous request
  3164. "latest_message": "Starting document clearing process",
  3165. }
  3166. )
  3167. # Cleaning history_messages without breaking it as a shared list object
  3168. del pipeline_status["history_messages"][:]
  3169. pipeline_status["history_messages"].append(
  3170. "Starting document clearing process"
  3171. )
  3172. try:
  3173. # Use drop method to clear all data
  3174. drop_tasks = []
  3175. storages = [
  3176. rag.text_chunks,
  3177. rag.full_docs,
  3178. rag.full_entities,
  3179. rag.full_relations,
  3180. rag.entity_chunks,
  3181. rag.relation_chunks,
  3182. rag.entities_vdb,
  3183. rag.relationships_vdb,
  3184. rag.chunks_vdb,
  3185. rag.chunk_entity_relation_graph,
  3186. rag.doc_status,
  3187. ]
  3188. # Log storage drop start
  3189. if "history_messages" in pipeline_status:
  3190. pipeline_status["history_messages"].append(
  3191. "Starting to drop storage components"
  3192. )
  3193. for storage in storages:
  3194. if storage is not None:
  3195. drop_tasks.append(storage.drop())
  3196. # Wait for all drop tasks to complete
  3197. drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
  3198. # Check for errors and log results
  3199. errors = []
  3200. storage_success_count = 0
  3201. storage_error_count = 0
  3202. for i, result in enumerate(drop_results):
  3203. storage_name = storages[i].__class__.__name__
  3204. if isinstance(result, Exception):
  3205. error_msg = f"Error dropping {storage_name}: {str(result)}"
  3206. errors.append(error_msg)
  3207. logger.error(error_msg)
  3208. storage_error_count += 1
  3209. else:
  3210. namespace = storages[i].namespace
  3211. workspace = storages[i].workspace
  3212. logger.info(
  3213. f"Successfully dropped {storage_name}: {workspace}/{namespace}"
  3214. )
  3215. storage_success_count += 1
  3216. # Log storage drop results
  3217. if "history_messages" in pipeline_status:
  3218. if storage_error_count > 0:
  3219. pipeline_status["history_messages"].append(
  3220. f"Dropped {storage_success_count} storage components with {storage_error_count} errors"
  3221. )
  3222. else:
  3223. pipeline_status["history_messages"].append(
  3224. f"Successfully dropped all {storage_success_count} storage components"
  3225. )
  3226. # If all storage operations failed, return error status and don't proceed with file deletion
  3227. if storage_success_count == 0 and storage_error_count > 0:
  3228. error_message = "All storage drop operations failed. Aborting document clearing process."
  3229. logger.error(error_message)
  3230. if "history_messages" in pipeline_status:
  3231. pipeline_status["history_messages"].append(error_message)
  3232. return ClearDocumentsResponse(status="fail", message=error_message)
  3233. # Log file deletion start
  3234. if "history_messages" in pipeline_status:
  3235. pipeline_status["history_messages"].append(
  3236. "Starting to delete files in input directory"
  3237. )
  3238. # Delete only files in the current directory, preserve files in subdirectories
  3239. deleted_files_count = 0
  3240. file_errors_count = 0
  3241. for file_path in doc_manager.input_dir.glob("*"):
  3242. if file_path.is_file():
  3243. try:
  3244. file_path.unlink()
  3245. deleted_files_count += 1
  3246. except Exception as e:
  3247. logger.error(f"Error deleting file {file_path}: {str(e)}")
  3248. file_errors_count += 1
  3249. # Log file deletion results
  3250. if "history_messages" in pipeline_status:
  3251. if file_errors_count > 0:
  3252. pipeline_status["history_messages"].append(
  3253. f"Deleted {deleted_files_count} files with {file_errors_count} errors"
  3254. )
  3255. errors.append(f"Failed to delete {file_errors_count} files")
  3256. else:
  3257. pipeline_status["history_messages"].append(
  3258. f"Successfully deleted {deleted_files_count} files"
  3259. )
  3260. # Prepare final result message
  3261. final_message = ""
  3262. if errors:
  3263. final_message = f"Cleared documents with some errors. Deleted {deleted_files_count} files."
  3264. status = "partial_success"
  3265. else:
  3266. final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
  3267. status = "success"
  3268. # Log final result
  3269. if "history_messages" in pipeline_status:
  3270. pipeline_status["history_messages"].append(final_message)
  3271. # Return response based on results
  3272. return ClearDocumentsResponse(status=status, message=final_message)
  3273. except Exception as e:
  3274. error_msg = f"Error clearing documents: {str(e)}"
  3275. logger.error(error_msg)
  3276. logger.error(traceback.format_exc())
  3277. if "history_messages" in pipeline_status:
  3278. pipeline_status["history_messages"].append(error_msg)
  3279. raise HTTPException(status_code=500, detail=str(e))
  3280. finally:
  3281. # Reset busy + destructive_busy after completion so the next
  3282. # reservation / scan sees an idle pipeline.
  3283. async with pipeline_status_lock:
  3284. pipeline_status["busy"] = False
  3285. pipeline_status["destructive_busy"] = False
  3286. completion_msg = "Document clearing process completed"
  3287. pipeline_status["latest_message"] = completion_msg
  3288. if "history_messages" in pipeline_status:
  3289. pipeline_status["history_messages"].append(completion_msg)
  3290. @router.get(
  3291. "/pipeline_status",
  3292. dependencies=[Depends(combined_auth)],
  3293. response_model=PipelineStatusResponse,
  3294. )
  3295. async def get_pipeline_status() -> PipelineStatusResponse:
  3296. """
  3297. Get the current status of the document indexing pipeline.
  3298. This endpoint returns information about the current state of the document processing pipeline,
  3299. including the processing status, progress information, and history messages.
  3300. Returns:
  3301. PipelineStatusResponse: A response object containing:
  3302. - autoscanned (bool): Whether auto-scan has started
  3303. - busy (bool): Whether the pipeline is currently busy
  3304. - job_name (str): Current job name (e.g., indexing files/indexing texts)
  3305. - job_start (str, optional): Job start time as ISO format string
  3306. - docs (int): Total number of documents to be indexed
  3307. - batchs (int): Number of batches for processing documents
  3308. - cur_batch (int): Current processing batch
  3309. - request_pending (bool): Flag for pending request for processing
  3310. - latest_message (str): Latest message from pipeline processing
  3311. - history_messages (List[str], optional): List of history messages (limited to latest 1000 entries,
  3312. with truncation message if more than 1000 messages exist)
  3313. Raises:
  3314. HTTPException: If an error occurs while retrieving pipeline status (500)
  3315. """
  3316. try:
  3317. from lightrag.kg.shared_storage import (
  3318. get_namespace_data,
  3319. get_namespace_lock,
  3320. get_all_update_flags_status,
  3321. )
  3322. pipeline_status = await get_namespace_data(
  3323. "pipeline_status", workspace=rag.workspace
  3324. )
  3325. pipeline_status_lock = get_namespace_lock(
  3326. "pipeline_status", workspace=rag.workspace
  3327. )
  3328. # Get update flags status for all namespaces
  3329. update_status = await get_all_update_flags_status(workspace=rag.workspace)
  3330. # Convert MutableBoolean objects to regular boolean values
  3331. processed_update_status = {}
  3332. for namespace, flags in update_status.items():
  3333. processed_flags = []
  3334. for flag in flags:
  3335. # Handle both multiprocess and single process cases
  3336. if hasattr(flag, "value"):
  3337. processed_flags.append(bool(flag.value))
  3338. else:
  3339. processed_flags.append(bool(flag))
  3340. processed_update_status[namespace] = processed_flags
  3341. async with pipeline_status_lock:
  3342. # Convert to regular dict if it's a Manager.dict
  3343. status_dict = dict(pipeline_status)
  3344. # Add processed update_status to the status dictionary
  3345. status_dict["update_status"] = processed_update_status
  3346. # Convert history_messages to a regular list if it's a Manager.list
  3347. # and limit to latest 1000 entries with truncation message if needed
  3348. if "history_messages" in status_dict:
  3349. history_list = list(status_dict["history_messages"])
  3350. total_count = len(history_list)
  3351. if total_count > 1000:
  3352. # Calculate truncated message count
  3353. truncated_count = total_count - 1000
  3354. # Take only the latest 1000 messages
  3355. latest_messages = history_list[-1000:]
  3356. # Add truncation message at the beginning
  3357. truncation_message = (
  3358. f"[Truncated history messages: {truncated_count}/{total_count}]"
  3359. )
  3360. status_dict["history_messages"] = [
  3361. truncation_message
  3362. ] + latest_messages
  3363. else:
  3364. # No truncation needed, return all messages
  3365. status_dict["history_messages"] = history_list
  3366. # Ensure job_start is properly formatted as a string with timezone information
  3367. if "job_start" in status_dict and status_dict["job_start"]:
  3368. # Use format_datetime to ensure consistent formatting
  3369. status_dict["job_start"] = format_datetime(status_dict["job_start"])
  3370. return PipelineStatusResponse(**status_dict)
  3371. except Exception as e:
  3372. logger.error(f"Error getting pipeline status: {str(e)}")
  3373. logger.error(traceback.format_exc())
  3374. raise HTTPException(status_code=500, detail=str(e))
  3375. # TODO: Deprecated, use /documents/paginated instead
  3376. @router.get(
  3377. "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
  3378. )
  3379. async def documents() -> DocsStatusesResponse:
  3380. """
  3381. Get the status of all documents in the system. This endpoint is deprecated; use /documents/paginated instead.
  3382. To prevent excessive resource consumption, a maximum of 1,000 records is returned.
  3383. This endpoint retrieves the current status of all documents, grouped by their
  3384. processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are
  3385. limited to 1000 total documents with fair distribution across all statuses.
  3386. Returns:
  3387. DocsStatusesResponse: A response object containing a dictionary where keys are
  3388. DocStatus values and values are lists of DocStatusResponse
  3389. objects representing documents in each status category.
  3390. Maximum 1000 documents total will be returned.
  3391. Raises:
  3392. HTTPException: If an error occurs while retrieving document statuses (500).
  3393. """
  3394. try:
  3395. statuses = (
  3396. DocStatus.PENDING,
  3397. DocStatus.PARSING,
  3398. DocStatus.ANALYZING,
  3399. DocStatus.PROCESSING,
  3400. DocStatus.PREPROCESSED,
  3401. DocStatus.PROCESSED,
  3402. DocStatus.FAILED,
  3403. )
  3404. tasks = [rag.get_docs_by_status(status) for status in statuses]
  3405. results: List[Dict[str, DocProcessingStatus]] = await asyncio.gather(*tasks)
  3406. response = DocsStatusesResponse()
  3407. total_documents = 0
  3408. max_documents = 1000
  3409. # Convert results to lists for easier processing
  3410. status_documents = []
  3411. for idx, result in enumerate(results):
  3412. status = statuses[idx]
  3413. docs_list = []
  3414. for doc_id, doc_status in result.items():
  3415. docs_list.append((doc_id, doc_status))
  3416. status_documents.append((status, docs_list))
  3417. # Fair distribution: round-robin across statuses
  3418. status_indices = [0] * len(
  3419. status_documents
  3420. ) # Track current index for each status
  3421. current_status_idx = 0
  3422. while total_documents < max_documents:
  3423. # Check if we have any documents left to process
  3424. has_remaining = False
  3425. for status_idx, (status, docs_list) in enumerate(status_documents):
  3426. if status_indices[status_idx] < len(docs_list):
  3427. has_remaining = True
  3428. break
  3429. if not has_remaining:
  3430. break
  3431. # Try to get a document from the current status
  3432. status, docs_list = status_documents[current_status_idx]
  3433. current_index = status_indices[current_status_idx]
  3434. if current_index < len(docs_list):
  3435. doc_id, doc_status = docs_list[current_index]
  3436. if status not in response.statuses:
  3437. response.statuses[status] = []
  3438. response.statuses[status].append(
  3439. DocStatusResponse(
  3440. id=doc_id,
  3441. content_summary=doc_status.content_summary,
  3442. content_length=doc_status.content_length,
  3443. status=doc_status.status,
  3444. created_at=format_datetime(doc_status.created_at),
  3445. updated_at=format_datetime(doc_status.updated_at),
  3446. track_id=doc_status.track_id,
  3447. chunks_count=doc_status.chunks_count,
  3448. error_msg=doc_status.error_msg,
  3449. metadata=doc_status.metadata,
  3450. file_path=normalize_file_path(doc_status.file_path),
  3451. )
  3452. )
  3453. status_indices[current_status_idx] += 1
  3454. total_documents += 1
  3455. # Move to next status (round-robin)
  3456. current_status_idx = (current_status_idx + 1) % len(status_documents)
  3457. return response
  3458. except Exception as e:
  3459. logger.error(f"Error GET /documents: {str(e)}")
  3460. logger.error(traceback.format_exc())
  3461. raise HTTPException(status_code=500, detail=str(e))
  3462. class DeleteDocByIdResponse(BaseModel):
  3463. """Response model for single document deletion operation."""
  3464. status: Literal["deletion_started", "busy", "not_allowed"] = Field(
  3465. description="Status of the deletion operation"
  3466. )
  3467. message: str = Field(description="Message describing the operation result")
  3468. doc_id: str = Field(description="The ID of the document to delete")
  3469. @router.delete(
  3470. "/delete_document",
  3471. response_model=DeleteDocByIdResponse,
  3472. dependencies=[Depends(combined_auth)],
  3473. summary="Delete a document and all its associated data by its ID.",
  3474. )
  3475. async def delete_document(
  3476. delete_request: DeleteDocRequest,
  3477. background_tasks: BackgroundTasks,
  3478. ) -> DeleteDocByIdResponse:
  3479. """
  3480. Delete documents and all their associated data by their IDs using background processing.
  3481. Deletes specific documents and all their associated data, including their status,
  3482. text chunks, vector embeddings, and any related graph data. When requested,
  3483. cached LLM extraction responses are removed after graph deletion/rebuild completes.
  3484. The deletion process runs in the background to avoid blocking the client connection.
  3485. This operation is irreversible and will interact with the pipeline status.
  3486. **Concurrency Constraint:**
  3487. - Atomically reserves the destructive slot (sets ``busy=True``
  3488. and ``destructive_busy=True``) **synchronously** before
  3489. returning ``deletion_started``, so a /scan or /upload that
  3490. arrives before the bg task runs cannot race the delete.
  3491. Refuses with ``status="busy"`` when ANY of these is set:
  3492. ``pipeline_status["busy"]``, ``pipeline_status["scanning"]``,
  3493. or ``pipeline_status["pending_enqueues"] > 0``.
  3494. Args:
  3495. delete_request (DeleteDocRequest): The request containing the document IDs and deletion options.
  3496. background_tasks: FastAPI BackgroundTasks for async processing
  3497. Returns:
  3498. DeleteDocByIdResponse: The result of the deletion operation.
  3499. - status="deletion_started": The document deletion has been initiated in the background.
  3500. - status="busy": Another writer (busy / scanning / pending enqueue) holds the
  3501. pipeline; nothing scheduled, retry after the running job finishes.
  3502. Raises:
  3503. HTTPException:
  3504. - 500: If an unexpected internal error occurs during initialization.
  3505. """
  3506. doc_ids = delete_request.doc_ids
  3507. slot_acquired = False
  3508. try:
  3509. # Atomically reserve the destructive slot BEFORE returning
  3510. # ``deletion_started``. Without this, the bg task would set
  3511. # destructive_busy only when it later runs — leaving a
  3512. # window where a /scan or /upload can race the delete after
  3513. # the client has already received success. The check
  3514. # covers busy + scanning + pending_enqueues>0 in a single
  3515. # critical section.
  3516. acquired, reason = await _acquire_destructive_busy(rag)
  3517. if not acquired:
  3518. return DeleteDocByIdResponse(
  3519. status="busy",
  3520. message=reason or "Cannot delete documents while pipeline is busy",
  3521. doc_id=", ".join(doc_ids),
  3522. )
  3523. slot_acquired = True
  3524. background_tasks.add_task(
  3525. background_delete_documents,
  3526. rag,
  3527. doc_manager,
  3528. doc_ids,
  3529. delete_request.delete_file,
  3530. delete_request.delete_llm_cache,
  3531. )
  3532. # Ownership of the slot transferred to the bg task — it
  3533. # will release in its finally. The endpoint's finally
  3534. # below must NOT release it again.
  3535. slot_acquired = False
  3536. return DeleteDocByIdResponse(
  3537. status="deletion_started",
  3538. message=f"Document deletion for {len(doc_ids)} documents has been initiated. Processing will continue in background.",
  3539. doc_id=", ".join(doc_ids),
  3540. )
  3541. except Exception as e:
  3542. error_msg = f"Error initiating document deletion for {delete_request.doc_ids}: {str(e)}"
  3543. logger.error(error_msg)
  3544. logger.error(traceback.format_exc())
  3545. raise HTTPException(status_code=500, detail=error_msg)
  3546. finally:
  3547. # If we reserved but never scheduled the bg task (e.g. an
  3548. # unexpected error between acquire and add_task), release
  3549. # so the next reservation / scan / enqueue can proceed.
  3550. if slot_acquired:
  3551. await _release_destructive_busy(rag)
  3552. @router.post(
  3553. "/clear_cache",
  3554. response_model=ClearCacheResponse,
  3555. dependencies=[Depends(combined_auth)],
  3556. )
  3557. async def clear_cache(request: ClearCacheRequest):
  3558. """
  3559. Clear all cache data from the LLM response cache storage.
  3560. This endpoint clears all cached LLM responses regardless of mode.
  3561. The request body is accepted for API compatibility but is ignored.
  3562. Args:
  3563. request (ClearCacheRequest): The request body (ignored for compatibility).
  3564. Returns:
  3565. ClearCacheResponse: A response object containing the status and message.
  3566. Raises:
  3567. HTTPException: If an error occurs during cache clearing (500).
  3568. """
  3569. try:
  3570. # Call the aclear_cache method (no modes parameter)
  3571. await rag.aclear_cache()
  3572. # Prepare success message
  3573. message = "Successfully cleared all cache"
  3574. return ClearCacheResponse(status="success", message=message)
  3575. except Exception as e:
  3576. logger.error(f"Error clearing cache: {str(e)}")
  3577. logger.error(traceback.format_exc())
  3578. raise HTTPException(status_code=500, detail=str(e))
  3579. @router.delete(
  3580. "/delete_entity",
  3581. response_model=DeletionResult,
  3582. dependencies=[Depends(combined_auth)],
  3583. )
  3584. async def delete_entity(request: DeleteEntityRequest):
  3585. """
  3586. Delete an entity and all its relationships from the knowledge graph.
  3587. Args:
  3588. request (DeleteEntityRequest): The request body containing the entity name.
  3589. Returns:
  3590. DeletionResult: An object containing the outcome of the deletion process.
  3591. Raises:
  3592. HTTPException: If the entity is not found (404) or an error occurs (500).
  3593. """
  3594. try:
  3595. await check_pipeline_busy_or_raise(rag)
  3596. result = await rag.adelete_by_entity(entity_name=request.entity_name)
  3597. if result.status == "not_found":
  3598. raise HTTPException(status_code=404, detail=result.message)
  3599. if result.status == "fail":
  3600. raise HTTPException(status_code=500, detail=result.message)
  3601. # Set doc_id to empty string since this is an entity operation, not document
  3602. result.doc_id = ""
  3603. return result
  3604. except HTTPException:
  3605. raise
  3606. except Exception as e:
  3607. error_msg = f"Error deleting entity '{request.entity_name}': {str(e)}"
  3608. logger.error(error_msg)
  3609. logger.error(traceback.format_exc())
  3610. raise HTTPException(status_code=500, detail=error_msg)
  3611. @router.delete(
  3612. "/delete_relation",
  3613. response_model=DeletionResult,
  3614. dependencies=[Depends(combined_auth)],
  3615. )
  3616. async def delete_relation(request: DeleteRelationRequest):
  3617. """
  3618. Delete a relationship between two entities from the knowledge graph.
  3619. Args:
  3620. request (DeleteRelationRequest): The request body containing the source and target entity names.
  3621. Returns:
  3622. DeletionResult: An object containing the outcome of the deletion process.
  3623. Raises:
  3624. HTTPException: If the relation is not found (404) or an error occurs (500).
  3625. """
  3626. try:
  3627. await check_pipeline_busy_or_raise(rag)
  3628. result = await rag.adelete_by_relation(
  3629. source_entity=request.source_entity,
  3630. target_entity=request.target_entity,
  3631. )
  3632. if result.status == "not_found":
  3633. raise HTTPException(status_code=404, detail=result.message)
  3634. if result.status == "fail":
  3635. raise HTTPException(status_code=500, detail=result.message)
  3636. # Set doc_id to empty string since this is a relation operation, not document
  3637. result.doc_id = ""
  3638. return result
  3639. except HTTPException:
  3640. raise
  3641. except Exception as e:
  3642. error_msg = f"Error deleting relation from '{request.source_entity}' to '{request.target_entity}': {str(e)}"
  3643. logger.error(error_msg)
  3644. logger.error(traceback.format_exc())
  3645. raise HTTPException(status_code=500, detail=error_msg)
  3646. @router.get(
  3647. "/track_status/{track_id}",
  3648. response_model=TrackStatusResponse,
  3649. dependencies=[Depends(combined_auth)],
  3650. )
  3651. async def get_track_status(track_id: str) -> TrackStatusResponse:
  3652. """
  3653. Get the processing status of documents by tracking ID.
  3654. This endpoint retrieves all documents associated with a specific tracking ID,
  3655. allowing users to monitor the processing progress of their uploaded files or inserted texts.
  3656. Args:
  3657. track_id (str): The tracking ID returned from upload, text, or texts endpoints
  3658. Returns:
  3659. TrackStatusResponse: A response object containing:
  3660. - track_id: The tracking ID
  3661. - documents: List of documents associated with this track_id
  3662. - total_count: Total number of documents for this track_id
  3663. Raises:
  3664. HTTPException: If track_id is invalid (400) or an error occurs (500).
  3665. """
  3666. try:
  3667. # Validate track_id
  3668. if not track_id or not track_id.strip():
  3669. raise HTTPException(status_code=400, detail="Track ID cannot be empty")
  3670. track_id = track_id.strip()
  3671. # Get documents by track_id
  3672. docs_by_track_id = await rag.aget_docs_by_track_id(track_id)
  3673. # Convert to response format
  3674. documents = []
  3675. status_summary = {}
  3676. for doc_id, doc_status in docs_by_track_id.items():
  3677. documents.append(
  3678. DocStatusResponse(
  3679. id=doc_id,
  3680. content_summary=doc_status.content_summary,
  3681. content_length=doc_status.content_length,
  3682. status=doc_status.status,
  3683. created_at=format_datetime(doc_status.created_at),
  3684. updated_at=format_datetime(doc_status.updated_at),
  3685. track_id=doc_status.track_id,
  3686. chunks_count=doc_status.chunks_count,
  3687. error_msg=doc_status.error_msg,
  3688. metadata=doc_status.metadata,
  3689. file_path=normalize_file_path(doc_status.file_path),
  3690. )
  3691. )
  3692. # Build status summary
  3693. # Handle both DocStatus enum and string cases for robust deserialization
  3694. status_key = str(doc_status.status)
  3695. status_summary[status_key] = status_summary.get(status_key, 0) + 1
  3696. return TrackStatusResponse(
  3697. track_id=track_id,
  3698. documents=documents,
  3699. total_count=len(documents),
  3700. status_summary=status_summary,
  3701. )
  3702. except HTTPException:
  3703. raise
  3704. except Exception as e:
  3705. logger.error(f"Error getting track status for {track_id}: {str(e)}")
  3706. logger.error(traceback.format_exc())
  3707. raise HTTPException(status_code=500, detail=str(e))
  3708. @router.post(
  3709. "/paginated",
  3710. response_model=PaginatedDocsResponse,
  3711. dependencies=[Depends(combined_auth)],
  3712. )
  3713. async def get_documents_paginated(
  3714. request: DocumentsRequest,
  3715. ) -> PaginatedDocsResponse:
  3716. """
  3717. Get documents with pagination support.
  3718. This endpoint retrieves documents with pagination, filtering, and sorting capabilities.
  3719. It provides better performance for large document collections by loading only the
  3720. requested page of data.
  3721. Args:
  3722. request (DocumentsRequest): The request body containing pagination parameters
  3723. Returns:
  3724. PaginatedDocsResponse: A response object containing:
  3725. - documents: List of documents for the current page
  3726. - pagination: Pagination information (page, total_count, etc.)
  3727. - status_counts: Count of documents by status for all documents
  3728. Raises:
  3729. HTTPException: If an error occurs while retrieving documents (500).
  3730. """
  3731. trace_id = uuid4().hex[:8]
  3732. request_start = time.perf_counter()
  3733. status_filter_value = (
  3734. request.status_filter.value if request.status_filter is not None else None
  3735. )
  3736. workspace = getattr(rag, "workspace", None)
  3737. performance_timing_log(
  3738. "[documents/paginated][%s] Request start workspace=%s status_filter=%s page=%s page_size=%s sort_field=%s sort_direction=%s",
  3739. trace_id,
  3740. workspace,
  3741. status_filter_value,
  3742. request.page,
  3743. request.page_size,
  3744. request.sort_field,
  3745. request.sort_direction,
  3746. )
  3747. try:
  3748. async def _timed_call(operation_name: str, operation):
  3749. operation_start = time.perf_counter()
  3750. performance_timing_log(
  3751. "[documents/paginated][%s] %s started",
  3752. trace_id,
  3753. operation_name,
  3754. )
  3755. try:
  3756. result = await operation
  3757. except Exception:
  3758. elapsed = time.perf_counter() - operation_start
  3759. performance_timing_log(
  3760. "[documents/paginated][%s] %s failed after %.4fs",
  3761. trace_id,
  3762. operation_name,
  3763. elapsed,
  3764. )
  3765. raise
  3766. elapsed = time.perf_counter() - operation_start
  3767. performance_timing_log(
  3768. "[documents/paginated][%s] %s completed in %.4fs",
  3769. trace_id,
  3770. operation_name,
  3771. elapsed,
  3772. )
  3773. return result
  3774. query_task_create_start = time.perf_counter()
  3775. docs_task = asyncio.create_task(
  3776. _timed_call(
  3777. "get_docs_paginated",
  3778. rag.doc_status.get_docs_paginated(
  3779. status_filter=request.status_filter,
  3780. status_filters=request.status_filters,
  3781. page=request.page,
  3782. page_size=request.page_size,
  3783. sort_field=request.sort_field,
  3784. sort_direction=request.sort_direction,
  3785. ),
  3786. )
  3787. )
  3788. status_counts_task = asyncio.create_task(
  3789. _timed_call(
  3790. "get_all_status_counts",
  3791. rag.doc_status.get_all_status_counts(),
  3792. )
  3793. )
  3794. query_task_create_elapsed = time.perf_counter() - query_task_create_start
  3795. performance_timing_log(
  3796. "[documents/paginated][%s] Query tasks created in %.4fs",
  3797. trace_id,
  3798. query_task_create_elapsed,
  3799. )
  3800. query_await_start = time.perf_counter()
  3801. (documents_with_ids, total_count), status_counts = await asyncio.gather(
  3802. docs_task, status_counts_task
  3803. )
  3804. query_await_elapsed = time.perf_counter() - query_await_start
  3805. performance_timing_log(
  3806. "[documents/paginated][%s] Query tasks awaited in %.4fs",
  3807. trace_id,
  3808. query_await_elapsed,
  3809. )
  3810. # Convert documents to response format
  3811. response_assembly_start = time.perf_counter()
  3812. doc_responses = []
  3813. for doc_id, doc in documents_with_ids:
  3814. doc_responses.append(
  3815. DocStatusResponse(
  3816. id=doc_id,
  3817. content_summary=doc.content_summary,
  3818. content_length=doc.content_length,
  3819. status=doc.status,
  3820. created_at=format_datetime(doc.created_at),
  3821. updated_at=format_datetime(doc.updated_at),
  3822. track_id=doc.track_id,
  3823. chunks_count=doc.chunks_count,
  3824. error_msg=doc.error_msg,
  3825. metadata=doc.metadata,
  3826. file_path=normalize_file_path(doc.file_path),
  3827. )
  3828. )
  3829. # Calculate pagination info
  3830. total_pages = (total_count + request.page_size - 1) // request.page_size
  3831. has_next = request.page < total_pages
  3832. has_prev = request.page > 1
  3833. pagination = PaginationInfo(
  3834. page=request.page,
  3835. page_size=request.page_size,
  3836. total_count=total_count,
  3837. total_pages=total_pages,
  3838. has_next=has_next,
  3839. has_prev=has_prev,
  3840. )
  3841. response = PaginatedDocsResponse(
  3842. documents=doc_responses,
  3843. pagination=pagination,
  3844. status_counts=status_counts,
  3845. )
  3846. response_assembly_elapsed = time.perf_counter() - response_assembly_start
  3847. total_elapsed = time.perf_counter() - request_start
  3848. performance_timing_log(
  3849. "[documents/paginated][%s] Response assembled in %.4fs",
  3850. trace_id,
  3851. response_assembly_elapsed,
  3852. )
  3853. performance_timing_log(
  3854. "[documents/paginated][%s] Request completed in %.4fs returned_rows=%s total_count=%s status_count_keys=%s",
  3855. trace_id,
  3856. total_elapsed,
  3857. len(doc_responses),
  3858. total_count,
  3859. sorted(status_counts.keys()),
  3860. )
  3861. return response
  3862. except Exception as e:
  3863. total_elapsed = time.perf_counter() - request_start
  3864. performance_timing_log(
  3865. "[documents/paginated][%s] Request failed after %.4fs",
  3866. trace_id,
  3867. total_elapsed,
  3868. )
  3869. logger.error(f"Error getting paginated documents: {str(e)}")
  3870. logger.error(traceback.format_exc())
  3871. raise HTTPException(status_code=500, detail=str(e))
  3872. @router.get(
  3873. "/status_counts",
  3874. response_model=StatusCountsResponse,
  3875. dependencies=[Depends(combined_auth)],
  3876. )
  3877. async def get_document_status_counts() -> StatusCountsResponse:
  3878. """
  3879. Get counts of documents by status.
  3880. This endpoint retrieves the count of documents in each processing status
  3881. (PENDING, PROCESSING, PROCESSED, FAILED) for all documents in the system.
  3882. Returns:
  3883. StatusCountsResponse: A response object containing status counts
  3884. Raises:
  3885. HTTPException: If an error occurs while retrieving status counts (500).
  3886. """
  3887. try:
  3888. status_counts = await rag.doc_status.get_all_status_counts()
  3889. return StatusCountsResponse(status_counts=status_counts)
  3890. except Exception as e:
  3891. logger.error(f"Error getting document status counts: {str(e)}")
  3892. logger.error(traceback.format_exc())
  3893. raise HTTPException(status_code=500, detail=str(e))
  3894. @router.post(
  3895. "/reprocess_failed",
  3896. response_model=ReprocessResponse,
  3897. dependencies=[Depends(combined_auth)],
  3898. )
  3899. async def reprocess_failed_documents(background_tasks: BackgroundTasks):
  3900. """
  3901. Reprocess failed and pending documents.
  3902. This endpoint triggers the document processing pipeline which automatically
  3903. picks up and reprocesses documents in the following statuses:
  3904. - FAILED: Documents that failed during previous processing attempts
  3905. - PENDING: Documents waiting to be processed
  3906. - PROCESSING: Documents with abnormally terminated processing (e.g., server crashes)
  3907. This is useful for recovering from server crashes, network errors, LLM service
  3908. outages, or other temporary failures that caused document processing to fail.
  3909. The processing happens in the background and can be monitored by checking the
  3910. pipeline status. The reprocessed documents retain their original track_id from
  3911. initial upload, so use their original track_id to monitor progress.
  3912. Returns:
  3913. ReprocessResponse: Response with status and message.
  3914. track_id is always empty string because reprocessed documents retain
  3915. their original track_id from initial upload.
  3916. Raises:
  3917. HTTPException: If an error occurs while initiating reprocessing (500).
  3918. """
  3919. try:
  3920. # Start the reprocessing in the background
  3921. # Note: Reprocessed documents retain their original track_id from initial upload
  3922. background_tasks.add_task(rag.apipeline_process_enqueue_documents)
  3923. logger.info("Reprocessing of failed documents initiated")
  3924. return ReprocessResponse(
  3925. status="reprocessing_started",
  3926. message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.",
  3927. )
  3928. except Exception as e:
  3929. logger.error(f"Error initiating reprocessing of failed documents: {str(e)}")
  3930. logger.error(traceback.format_exc())
  3931. raise HTTPException(status_code=500, detail=str(e))
  3932. @router.post(
  3933. "/cancel_pipeline",
  3934. response_model=CancelPipelineResponse,
  3935. dependencies=[Depends(combined_auth)],
  3936. )
  3937. async def cancel_pipeline():
  3938. """
  3939. Request cancellation of the currently running pipeline.
  3940. This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
  3941. 1. Check this flag at key processing points
  3942. 2. Stop processing new documents
  3943. 3. Cancel all running document processing tasks
  3944. 4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
  3945. The cancellation is graceful and ensures data consistency. Documents that have
  3946. completed processing will remain in PROCESSED status.
  3947. Returns:
  3948. CancelPipelineResponse: Response with status and message
  3949. - status="cancellation_requested": Cancellation flag has been set
  3950. - status="not_busy": Pipeline is not currently running
  3951. Raises:
  3952. HTTPException: If an error occurs while setting cancellation flag (500).
  3953. """
  3954. try:
  3955. from lightrag.kg.shared_storage import (
  3956. get_namespace_data,
  3957. get_namespace_lock,
  3958. )
  3959. pipeline_status = await get_namespace_data(
  3960. "pipeline_status", workspace=rag.workspace
  3961. )
  3962. pipeline_status_lock = get_namespace_lock(
  3963. "pipeline_status", workspace=rag.workspace
  3964. )
  3965. async with pipeline_status_lock:
  3966. if not pipeline_status.get("busy", False):
  3967. return CancelPipelineResponse(
  3968. status="not_busy",
  3969. message="Pipeline is not currently running. No cancellation needed.",
  3970. )
  3971. # Set cancellation flag
  3972. pipeline_status["cancellation_requested"] = True
  3973. cancel_msg = "Pipeline cancellation requested by user"
  3974. logger.info(cancel_msg)
  3975. pipeline_status["latest_message"] = cancel_msg
  3976. pipeline_status["history_messages"].append(cancel_msg)
  3977. return CancelPipelineResponse(
  3978. status="cancellation_requested",
  3979. message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
  3980. )
  3981. except Exception as e:
  3982. logger.error(f"Error requesting pipeline cancellation: {str(e)}")
  3983. logger.error(traceback.format_exc())
  3984. raise HTTPException(status_code=500, detail=str(e))
  3985. return router