test_document_routes_docx_archive.py 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968
  1. import importlib
  2. import sys
  3. from io import BytesIO
  4. from pathlib import Path
  5. from types import SimpleNamespace
  6. from uuid import uuid4
  7. import pytest
  8. _original_argv = sys.argv[:]
  9. sys.argv = [sys.argv[0]]
  10. _document_routes = importlib.import_module("lightrag.api.routers.document_routes")
  11. _lightrag = importlib.import_module("lightrag.lightrag")
  12. _pipeline = importlib.import_module("lightrag.pipeline")
  13. _base = importlib.import_module("lightrag.base")
  14. _constants = importlib.import_module("lightrag.constants")
  15. _utils = importlib.import_module("lightrag.utils")
  16. _parser_routing = importlib.import_module("lightrag.parser.routing")
  17. sys.argv = _original_argv
  18. DocStatus = _base.DocStatus
  19. DeletionResult = _base.DeletionResult
  20. FULL_DOCS_FORMAT_LIGHTRAG = _constants.FULL_DOCS_FORMAT_LIGHTRAG
  21. FULL_DOCS_FORMAT_PENDING_PARSE = _constants.FULL_DOCS_FORMAT_PENDING_PARSE
  22. PARSED_DIR_NAME = _constants.PARSED_DIR_NAME
  23. PROCESS_OPTION_CHUNK_FIXED = _constants.PROCESS_OPTION_CHUNK_FIXED
  24. compute_mdhash_id = _utils.compute_mdhash_id
  25. LightRAG = _lightrag.LightRAG
  26. resolve_stored_document_parser_engine = (
  27. _parser_routing.resolve_stored_document_parser_engine
  28. )
  29. pipeline_index_file = _document_routes.pipeline_index_file
  30. pipeline_index_files = _document_routes.pipeline_index_files
  31. pipeline_index_texts = _document_routes.pipeline_index_texts
  32. pipeline_enqueue_file = _document_routes.pipeline_enqueue_file
  33. run_scanning_process = _document_routes.run_scanning_process
  34. DocumentManager = _document_routes.DocumentManager
  35. create_document_routes = _document_routes.create_document_routes
  36. pytestmark = pytest.mark.offline
  37. @pytest.fixture(autouse=True)
  38. def _ensure_shared_storage_initialized():
  39. """Initialize the shared_storage module-level dicts before each test.
  40. The scan endpoint and the enqueue/scanning guards read
  41. ``pipeline_status`` via ``get_namespace_data``, which raises if
  42. shared dicts have never been initialized. Tests using mocked
  43. ``LightRAG`` instances don't run ``initialize_storages``, so we set
  44. up the shared store here and reset pipeline_status state per-test
  45. to avoid leakage.
  46. """
  47. import importlib
  48. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  49. shared_storage.initialize_share_data()
  50. yield
  51. # Reset pipeline_status to a clean state so subsequent tests don't
  52. # inherit ``busy`` / ``scanning`` flags set by prior runs.
  53. if shared_storage._shared_dicts is not None:
  54. for key in list(shared_storage._shared_dicts.keys()):
  55. if key.endswith("pipeline_status") or key == "pipeline_status":
  56. ns = shared_storage._shared_dicts[key]
  57. if isinstance(ns, dict):
  58. ns["busy"] = False
  59. ns["scanning"] = False
  60. class _FakeDocStatus:
  61. def __init__(self):
  62. self.docs = {}
  63. async def get_by_id(self, doc_id):
  64. return self.docs.get(doc_id)
  65. class _FakeRag:
  66. def __init__(self, final_status=DocStatus.PROCESSED):
  67. self.doc_status = _FakeDocStatus()
  68. self.final_status = final_status
  69. self.enqueued = []
  70. self.errors = []
  71. # _resolve_text_chunking reads addon_params; {} -> default chunker config.
  72. self.addon_params = {}
  73. async def apipeline_enqueue_documents(
  74. self,
  75. input,
  76. ids=None,
  77. file_paths=None,
  78. track_id=None,
  79. docs_format=None,
  80. parse_engine=None,
  81. process_options=None,
  82. chunk_options=None,
  83. from_scan=False,
  84. ):
  85. item = {
  86. "input": input,
  87. "file_path": file_paths,
  88. "track_id": track_id,
  89. "docs_format": docs_format,
  90. "parse_engine": parse_engine,
  91. "process_options": process_options,
  92. "chunk_options": chunk_options,
  93. "from_scan": from_scan,
  94. }
  95. self.enqueued.append(item)
  96. return track_id
  97. async def apipeline_process_enqueue_documents(self):
  98. for item in self.enqueued:
  99. file_path = item["file_path"]
  100. doc_id = compute_mdhash_id(file_path, prefix="doc-")
  101. self.doc_status.docs[doc_id] = {
  102. "status": self.final_status,
  103. "file_path": file_path,
  104. "track_id": item["track_id"],
  105. }
  106. async def apipeline_enqueue_error_documents(self, error_files, track_id=None):
  107. self.errors.append((error_files, track_id))
  108. class _DuplicateEnqueueRag(_FakeRag):
  109. async def apipeline_enqueue_documents(self, *args, **kwargs):
  110. self.enqueued.append({"args": args, "kwargs": kwargs})
  111. return None
  112. class _ScanDocStatus:
  113. def __init__(self, docs_by_path):
  114. self.docs_by_path = docs_by_path
  115. self.deleted_ids: list[str] = []
  116. async def get_doc_by_file_path(self, file_path):
  117. return self.docs_by_path.get(file_path)
  118. async def get_doc_by_file_basename(self, basename):
  119. from pathlib import Path as _Path
  120. for stored_path, doc in self.docs_by_path.items():
  121. if _Path(stored_path).name == basename:
  122. return stored_path, doc
  123. return None
  124. async def delete(self, ids):
  125. for doc_id in ids:
  126. self.docs_by_path.pop(doc_id, None)
  127. self.deleted_ids.append(doc_id)
  128. class _ScanFullDocs:
  129. """Minimal full_docs double for run_scanning_process.
  130. The scan now consults ``full_docs.get_by_id`` to distinguish a
  131. resumable FAILED row (content was stored, only a downstream step
  132. failed) from an extraction-error stub recorded by
  133. ``apipeline_enqueue_error_documents`` (no full_docs entry exists).
  134. """
  135. def __init__(self, docs_by_id):
  136. self.docs_by_id = docs_by_id
  137. async def get_by_id(self, doc_id):
  138. return self.docs_by_id.get(doc_id)
  139. class _ScanRag:
  140. def __init__(self, docs_by_path, full_docs_by_id=None):
  141. self.doc_status = _ScanDocStatus(docs_by_path)
  142. # Default: every doc_status row has a corresponding full_docs entry,
  143. # i.e. the "resumable" FAILED case. Tests simulating extraction-error
  144. # stubs pass ``full_docs_by_id={}`` (or omit specific doc_ids) so the
  145. # scan classifies them as retry-as-new instead of resume. The mock
  146. # uses the doc_status path-as-doc_id convention from _ScanDocStatus.
  147. if full_docs_by_id is None:
  148. full_docs_by_id = {path: {"content": ""} for path in docs_by_path}
  149. self.full_docs = _ScanFullDocs(full_docs_by_id)
  150. self.process_calls = 0
  151. self.workspace = "scan-test"
  152. self.enqueued = []
  153. self.errors = []
  154. async def apipeline_enqueue_documents(
  155. self,
  156. input,
  157. ids=None,
  158. file_paths=None,
  159. track_id=None,
  160. docs_format=None,
  161. parse_engine=None,
  162. process_options=None,
  163. from_scan=False,
  164. ):
  165. item = {
  166. "input": input,
  167. "file_path": file_paths,
  168. "track_id": track_id,
  169. "docs_format": docs_format,
  170. "parse_engine": parse_engine,
  171. "process_options": process_options,
  172. "from_scan": from_scan,
  173. }
  174. self.enqueued.append(item)
  175. return track_id
  176. async def apipeline_enqueue_error_documents(self, error_files, track_id=None):
  177. self.errors.append((error_files, track_id))
  178. async def apipeline_process_enqueue_documents(self):
  179. self.process_calls += 1
  180. class _DuplicateUploadRag:
  181. def __init__(self, docs_by_path):
  182. self.doc_status = _ScanDocStatus(docs_by_path)
  183. self.workspace = f"upload-test-{uuid4().hex}"
  184. class _DeleteRag:
  185. def __init__(self, result):
  186. self.result = result
  187. self.workspace = f"delete-test-{uuid4().hex}"
  188. self.deleted_doc_ids = []
  189. async def adelete_by_doc_id(self, doc_id, delete_llm_cache=False):
  190. self.deleted_doc_ids.append((doc_id, delete_llm_cache))
  191. return self.result
  192. async def apipeline_process_enqueue_documents(self):
  193. return None
  194. class _ParseFullDocs:
  195. def __init__(self, source_path):
  196. self.source_path = source_path
  197. self.events = []
  198. self.data = {}
  199. async def get_by_id(self, doc_id):
  200. # ``_persist_parsed_full_docs`` merges with the existing pending_parse
  201. # record so metadata seeded at enqueue time (process_options,
  202. # canonical_basename, ...) survives the parse-result overwrite. These
  203. # tests only seed the row via the parser, so returning None is fine.
  204. record = self.data.get(doc_id)
  205. return dict(record) if record is not None else None
  206. async def upsert(self, data):
  207. self.events.append("upsert")
  208. self.data.update(data)
  209. async def index_done_callback(self):
  210. self.events.append("index_done")
  211. assert self.source_path.exists()
  212. class _ParseDocStatus:
  213. """Minimal doc_status double for the parse_* archive tests.
  214. ``_persist_parsed_full_docs`` reads the existing record and patches its
  215. ``content_hash``; with no record present the helper short-circuits, which
  216. is what these tests want — they only assert on full_docs side effects.
  217. """
  218. async def get_by_id(self, doc_id):
  219. return None
  220. async def upsert(self, data):
  221. return None
  222. class _ParseTokenizer(_utils.TokenizerInterface):
  223. def encode(self, content: str):
  224. return [ord(ch) for ch in content]
  225. def decode(self, tokens):
  226. return "".join(chr(t) for t in tokens)
  227. class _ParseRag:
  228. _persist_parsed_full_docs = LightRAG._persist_parsed_full_docs
  229. # parse_native now delegates to the LightRAG Document writer, which the
  230. # tests need to exercise to validate archive + full_docs side effects.
  231. _write_lightrag_document_from_content_list = (
  232. LightRAG._write_lightrag_document_from_content_list
  233. )
  234. def __init__(self, working_dir, source_path):
  235. self.working_dir = str(working_dir)
  236. self.full_docs = _ParseFullDocs(source_path)
  237. self.doc_status = _ParseDocStatus()
  238. self.tokenizer = _utils.Tokenizer(
  239. model_name="char", tokenizer=_ParseTokenizer()
  240. )
  241. def _resolve_source_file_for_parser(self, file_path):
  242. return file_path
  243. async def test_pipeline_index_file_leaves_lightrag_document_docx_for_parser_archive(
  244. tmp_path, monkeypatch
  245. ):
  246. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native")
  247. file_path = tmp_path / "sample.docx"
  248. file_path.write_bytes(b"docx bytes")
  249. rag = _FakeRag()
  250. await pipeline_index_file(rag, file_path, "track-docx")
  251. assert file_path.exists()
  252. assert not (tmp_path / PARSED_DIR_NAME / file_path.name).exists()
  253. assert rag.enqueued[0]["file_path"] == str(file_path)
  254. assert rag.enqueued[0]["docs_format"] == FULL_DOCS_FORMAT_PENDING_PARSE
  255. assert rag.enqueued[0]["parse_engine"] == "native"
  256. async def test_pipeline_enqueue_lightrag_document_docx_does_not_move_source(
  257. tmp_path, monkeypatch
  258. ):
  259. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native")
  260. file_path = tmp_path / "pending.docx"
  261. file_path.write_bytes(b"docx bytes")
  262. rag = _FakeRag()
  263. success, returned_track_id = await pipeline_enqueue_file(
  264. rag, file_path, "track-docx"
  265. )
  266. assert success is True
  267. assert returned_track_id == "track-docx"
  268. assert file_path.exists()
  269. assert not (tmp_path / PARSED_DIR_NAME / file_path.name).exists()
  270. assert rag.enqueued[0]["file_path"] == str(file_path)
  271. assert rag.enqueued[0]["docs_format"] == FULL_DOCS_FORMAT_PENDING_PARSE
  272. assert rag.enqueued[0]["parse_engine"] == "native"
  273. async def test_pipeline_enqueue_docx_plain_text_extracts_before_enqueue(
  274. tmp_path, monkeypatch
  275. ):
  276. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:legacy")
  277. monkeypatch.setattr(
  278. _document_routes, "_extract_docx", lambda file_bytes: "plain docx content"
  279. )
  280. file_path = tmp_path / "plain.docx"
  281. file_path.write_bytes(b"docx bytes")
  282. rag = _FakeRag()
  283. success, returned_track_id = await pipeline_enqueue_file(
  284. rag, file_path, "track-docx"
  285. )
  286. assert success is True
  287. assert returned_track_id == "track-docx"
  288. assert rag.enqueued == [
  289. {
  290. "input": "plain docx content",
  291. "file_path": file_path.name,
  292. "track_id": "track-docx",
  293. "docs_format": None,
  294. "parse_engine": "legacy",
  295. "process_options": PROCESS_OPTION_CHUNK_FIXED,
  296. "chunk_options": None,
  297. "from_scan": False,
  298. }
  299. ]
  300. assert not file_path.exists()
  301. assert (tmp_path / PARSED_DIR_NAME / file_path.name).exists()
  302. async def test_pipeline_enqueue_md_moves_after_enqueue(tmp_path, monkeypatch):
  303. monkeypatch.delenv("LIGHTRAG_PARSER", raising=False)
  304. file_path = tmp_path / "notes.md"
  305. file_path.write_text("# Notes\n\nmarkdown content", encoding="utf-8")
  306. rag = _FakeRag()
  307. success, returned_track_id = await pipeline_enqueue_file(rag, file_path, "track-md")
  308. assert success is True
  309. assert returned_track_id == "track-md"
  310. assert rag.enqueued == [
  311. {
  312. "input": "# Notes\n\nmarkdown content",
  313. "file_path": file_path.name,
  314. "track_id": "track-md",
  315. "docs_format": None,
  316. "parse_engine": "legacy",
  317. "process_options": PROCESS_OPTION_CHUNK_FIXED,
  318. "chunk_options": None,
  319. "from_scan": False,
  320. }
  321. ]
  322. assert not file_path.exists()
  323. assert (tmp_path / PARSED_DIR_NAME / file_path.name).exists()
  324. async def test_pipeline_enqueue_legacy_duplicate_archives_with_unique_name(
  325. tmp_path, monkeypatch
  326. ):
  327. monkeypatch.delenv("LIGHTRAG_PARSER", raising=False)
  328. file_path = tmp_path / "duplicate.md"
  329. file_path.write_text("duplicate content", encoding="utf-8")
  330. parsed_dir = tmp_path / PARSED_DIR_NAME
  331. parsed_dir.mkdir()
  332. (parsed_dir / file_path.name).write_text("existing", encoding="utf-8")
  333. rag = _DuplicateEnqueueRag()
  334. success, returned_track_id = await pipeline_enqueue_file(
  335. rag, file_path, "track-dup"
  336. )
  337. assert success is False
  338. assert returned_track_id == "track-dup"
  339. assert not file_path.exists()
  340. assert (parsed_dir / file_path.name).read_text(encoding="utf-8") == "existing"
  341. assert (parsed_dir / "duplicate_001.md").read_text(
  342. encoding="utf-8"
  343. ) == "duplicate content"
  344. async def test_pipeline_enqueue_parser_routed_pdf_defers_without_extraction(
  345. tmp_path, monkeypatch
  346. ):
  347. monkeypatch.setenv("LIGHTRAG_PARSER", "pdf:mineru,*:legacy")
  348. monkeypatch.setenv("MINERU_API_MODE", "local")
  349. monkeypatch.setenv("MINERU_LOCAL_ENDPOINT", "http://fake-mineru")
  350. def _fail_pdf_extract(*args, **kwargs):
  351. raise AssertionError("parser-routed PDF should not be extracted before enqueue")
  352. monkeypatch.setattr(_document_routes, "_extract_pdf_pypdf", _fail_pdf_extract)
  353. file_path = tmp_path / "paper.pdf"
  354. file_path.write_bytes(b"fake-pdf")
  355. rag = _FakeRag()
  356. success, returned_track_id = await pipeline_enqueue_file(
  357. rag, file_path, "track-pdf"
  358. )
  359. assert success is True
  360. assert returned_track_id == "track-pdf"
  361. assert file_path.exists()
  362. assert rag.enqueued == [
  363. {
  364. "input": "",
  365. "file_path": str(file_path),
  366. "track_id": "track-pdf",
  367. "docs_format": FULL_DOCS_FORMAT_PENDING_PARSE,
  368. "parse_engine": "mineru",
  369. "process_options": PROCESS_OPTION_CHUNK_FIXED,
  370. "chunk_options": None,
  371. "from_scan": False,
  372. }
  373. ]
  374. async def test_pipeline_enqueue_passes_process_options_from_filename_hint(
  375. tmp_path, monkeypatch
  376. ):
  377. """Filename hint ``[native-iet]`` flows into apipeline_enqueue_documents."""
  378. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native")
  379. file_path = tmp_path / "report.[native-iet].docx"
  380. file_path.write_bytes(b"docx-bytes")
  381. rag = _FakeRag()
  382. success, returned_track_id = await pipeline_enqueue_file(
  383. rag, file_path, "track-options"
  384. )
  385. assert success is True
  386. assert returned_track_id == "track-options"
  387. assert rag.enqueued == [
  388. {
  389. "input": "",
  390. "file_path": str(file_path),
  391. "track_id": "track-options",
  392. "docs_format": FULL_DOCS_FORMAT_PENDING_PARSE,
  393. "parse_engine": "native",
  394. "process_options": "iet",
  395. "chunk_options": None,
  396. "from_scan": False,
  397. }
  398. ]
  399. # Native engine deferral keeps the source file in place for the parser.
  400. assert file_path.exists()
  401. async def test_pipeline_enqueue_rejects_invalid_filename_hint(tmp_path, monkeypatch):
  402. """Bad filename processing hints must become file-processing errors."""
  403. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native")
  404. file_path = tmp_path / "report.[abc].docx"
  405. file_path.write_bytes(b"docx-bytes")
  406. rag = _FakeRag()
  407. success, returned_track_id = await pipeline_enqueue_file(
  408. rag, file_path, "track-bad-hint"
  409. )
  410. assert success is False
  411. assert returned_track_id == "track-bad-hint"
  412. assert rag.enqueued == []
  413. assert len(rag.errors) == 1
  414. error_files, track_id = rag.errors[0]
  415. assert track_id == "track-bad-hint"
  416. assert error_files[0]["file_path"] == file_path.name
  417. assert error_files[0]["error_description"] == (
  418. "[File Extraction]Filename hint error"
  419. )
  420. assert "unsupported parser engine 'abc'" in error_files[0]["original_error"]
  421. assert file_path.exists()
  422. async def test_pipeline_enqueue_lightrag_parser_rule_provides_default_options(
  423. tmp_path, monkeypatch
  424. ):
  425. """LIGHTRAG_PARSER ``docx:native-iet`` becomes the default ``process_options``."""
  426. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native-iet,*:legacy")
  427. file_path = tmp_path / "rule_default.docx"
  428. file_path.write_bytes(b"docx-bytes")
  429. rag = _FakeRag()
  430. success, _ = await pipeline_enqueue_file(rag, file_path, "track-rule-default")
  431. assert success is True
  432. assert len(rag.enqueued) == 1
  433. enqueued = rag.enqueued[0]
  434. assert enqueued["parse_engine"] == "native"
  435. assert enqueued["process_options"] == "iet"
  436. async def test_pipeline_index_files_leaves_lightrag_document_docx_batch(
  437. tmp_path, monkeypatch
  438. ):
  439. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native")
  440. first = tmp_path / "first.docx"
  441. second = tmp_path / "second.[mineru].docx"
  442. first.write_bytes(b"first docx bytes")
  443. second.write_bytes(b"second docx bytes")
  444. rag = _FakeRag()
  445. await pipeline_index_files(rag, [second, first], "track-scan")
  446. assert first.exists()
  447. assert second.exists()
  448. assert not (tmp_path / PARSED_DIR_NAME / first.name).exists()
  449. assert not (tmp_path / PARSED_DIR_NAME / second.name).exists()
  450. assert all(
  451. item["docs_format"] == FULL_DOCS_FORMAT_PENDING_PARSE for item in rag.enqueued
  452. )
  453. assert all(item["parse_engine"] == "native" for item in rag.enqueued)
  454. assert all(
  455. item["process_options"] == PROCESS_OPTION_CHUNK_FIXED for item in rag.enqueued
  456. )
  457. async def test_pipeline_index_texts_sets_api_default_process_options():
  458. rag = _FakeRag()
  459. await pipeline_index_texts(
  460. rag,
  461. ["first text", "second text"],
  462. file_sources=["first.txt", "second.txt"],
  463. track_id="track-texts",
  464. )
  465. assert len(rag.enqueued) == 1
  466. item = rag.enqueued[0]
  467. assert item["input"] == ["first text", "second text"]
  468. assert item["file_path"] == ["first.txt", "second.txt"]
  469. assert item["track_id"] == "track-texts"
  470. assert item["docs_format"] is None
  471. assert item["parse_engine"] is None
  472. assert item["process_options"] == PROCESS_OPTION_CHUNK_FIXED
  473. assert item["from_scan"] is False
  474. # No chunking config -> default F snapshot is still passed through.
  475. assert isinstance(item["chunk_options"], dict)
  476. assert "fixed_token" in item["chunk_options"]
  477. async def test_scan_processed_same_name_archives_with_unique_name(
  478. tmp_path, monkeypatch
  479. ):
  480. file_path = tmp_path / "already-parsed.docx"
  481. file_path.write_bytes(b"docx bytes")
  482. parsed_dir = tmp_path / PARSED_DIR_NAME
  483. parsed_dir.mkdir()
  484. (parsed_dir / file_path.name).write_bytes(b"previous parsed file")
  485. doc_manager = DocumentManager(str(tmp_path))
  486. rag = _ScanRag(
  487. {
  488. str(file_path): {
  489. "status": DocStatus.PROCESSED.value,
  490. "file_path": str(file_path),
  491. "track_id": "track-existing",
  492. }
  493. }
  494. )
  495. async def fail_if_reenqueue(*args, **kwargs):
  496. raise AssertionError("existing docx should not be re-enqueued")
  497. monkeypatch.setattr(_document_routes, "pipeline_index_files", fail_if_reenqueue)
  498. await run_scanning_process(rag, doc_manager, "track-scan")
  499. assert not file_path.exists()
  500. assert (parsed_dir / file_path.name).read_bytes() == b"previous parsed file"
  501. assert (parsed_dir / "already-parsed_001.docx").read_bytes() == b"docx bytes"
  502. async def test_scan_processed_canonical_name_archives_hinted_file(
  503. tmp_path, monkeypatch
  504. ):
  505. file_path = tmp_path / "already-parsed.[native].docx"
  506. file_path.write_bytes(b"docx bytes")
  507. doc_manager = DocumentManager(str(tmp_path))
  508. rag = _ScanRag(
  509. {
  510. "already-parsed.docx": {
  511. "status": DocStatus.PROCESSED.value,
  512. "file_path": "already-parsed.docx",
  513. "track_id": "track-existing",
  514. }
  515. }
  516. )
  517. async def fail_if_reenqueue(*args, **kwargs):
  518. raise AssertionError("canonical duplicate should not be re-enqueued")
  519. monkeypatch.setattr(_document_routes, "pipeline_index_files", fail_if_reenqueue)
  520. await run_scanning_process(rag, doc_manager, "track-scan")
  521. assert not file_path.exists()
  522. assert (tmp_path / PARSED_DIR_NAME / file_path.name).read_bytes() == b"docx bytes"
  523. async def test_scan_archives_same_batch_canonical_duplicates(tmp_path, monkeypatch):
  524. plain_file = tmp_path / "same.docx"
  525. hinted_file = tmp_path / "same.[native].docx"
  526. plain_file.write_bytes(b"plain docx bytes")
  527. hinted_file.write_bytes(b"hinted docx bytes")
  528. doc_manager = DocumentManager(str(tmp_path))
  529. rag = _ScanRag({})
  530. calls = []
  531. async def capture_pipeline(rag_arg, file_paths, track_id, **kwargs):
  532. calls.append(
  533. {
  534. "rag": rag_arg,
  535. "file_paths": file_paths,
  536. "track_id": track_id,
  537. "kwargs": kwargs,
  538. }
  539. )
  540. monkeypatch.setattr(_document_routes, "pipeline_index_files", capture_pipeline)
  541. await run_scanning_process(rag, doc_manager, "track-scan")
  542. # Hinted variant is preferred so the user's explicit engine choice wins;
  543. # the plain variant is the one that gets archived.
  544. assert len(calls) == 1
  545. assert calls[0]["file_paths"] == [hinted_file]
  546. # The scan-owned background task forwards from_scan=True so per-file
  547. # enqueues bypass the scanning guard whose ``scanning`` flag the
  548. # scan task itself holds.
  549. assert calls[0]["kwargs"] == {"from_scan": True}
  550. archived_names = {
  551. path.name for path in (tmp_path / PARSED_DIR_NAME).iterdir() if path.is_file()
  552. }
  553. assert archived_names == {"same.docx"}
  554. assert hinted_file.exists()
  555. assert not plain_file.exists()
  556. async def test_scan_rejects_invalid_filename_hint(tmp_path, monkeypatch):
  557. monkeypatch.setenv("LIGHTRAG_PARSER", "docx:native")
  558. file_path = tmp_path / "bad-scan.[native-FR].docx"
  559. file_path.write_bytes(b"docx bytes")
  560. doc_manager = DocumentManager(str(tmp_path))
  561. rag = _ScanRag({})
  562. await run_scanning_process(rag, doc_manager, "track-scan")
  563. assert rag.enqueued == []
  564. assert len(rag.errors) == 1
  565. error_files, track_id = rag.errors[0]
  566. assert track_id == "track-scan"
  567. assert error_files[0]["file_path"] == file_path.name
  568. assert error_files[0]["error_description"] == (
  569. "[File Extraction]Filename hint error"
  570. )
  571. assert "multiple chunking modes" in error_files[0]["original_error"]
  572. assert rag.process_calls == 0
  573. assert file_path.exists()
  574. async def test_scan_existing_non_processed_reprocesses_file(tmp_path, monkeypatch):
  575. file_path = tmp_path / "retry.docx"
  576. file_path.write_bytes(b"docx bytes")
  577. doc_manager = DocumentManager(str(tmp_path))
  578. rag = _ScanRag(
  579. {
  580. str(file_path): {
  581. "status": DocStatus.PARSING.value,
  582. "file_path": str(file_path),
  583. "track_id": "track-existing",
  584. }
  585. }
  586. )
  587. calls = []
  588. async def capture_pipeline(rag_arg, file_paths, track_id, **kwargs):
  589. calls.append(
  590. {
  591. "rag": rag_arg,
  592. "file_paths": file_paths,
  593. "track_id": track_id,
  594. "kwargs": kwargs,
  595. }
  596. )
  597. monkeypatch.setattr(_document_routes, "pipeline_index_files", capture_pipeline)
  598. await run_scanning_process(rag, doc_manager, "track-scan")
  599. # Resume targets bypass pipeline_index_files entirely: routing them
  600. # through apipeline_enqueue_documents would treat the same canonical
  601. # basename as a duplicate (returning None), causing the source to be
  602. # archived as if it were a duplicate while leaving the unfinished
  603. # doc_status row untouched. Instead, the scan kicks off
  604. # apipeline_process_enqueue_documents directly so the existing PARSING
  605. # row is picked up by the pipeline's resume logic, and the source file
  606. # stays in place for any pending-parse engine that still needs it.
  607. assert calls == []
  608. assert rag.process_calls == 1
  609. assert file_path.exists()
  610. async def test_scan_mixed_new_and_resume_routes_only_new_through_enqueue(
  611. tmp_path, monkeypatch
  612. ):
  613. """When a scan finds both a new file and one matching an unfinished
  614. doc_status row, only the new file goes through pipeline_index_files;
  615. the resume target stays in place. run_scanning_process always
  616. triggers apipeline_process_enqueue_documents whenever resume targets
  617. exist — even when new files were also enqueued — because
  618. pipeline_index_files only runs that call after at least one new file
  619. successfully enqueues. Without the unconditional trigger, an all-
  620. failed batch of new files would silently strand the resume rows.
  621. """
  622. new_file = tmp_path / "fresh.docx"
  623. resume_file = tmp_path / "retry.docx"
  624. new_file.write_bytes(b"fresh docx bytes")
  625. resume_file.write_bytes(b"retry docx bytes")
  626. doc_manager = DocumentManager(str(tmp_path))
  627. rag = _ScanRag(
  628. {
  629. str(resume_file): {
  630. "status": DocStatus.FAILED.value,
  631. "file_path": str(resume_file),
  632. "track_id": "track-existing",
  633. }
  634. }
  635. )
  636. calls = []
  637. async def capture_pipeline(rag_arg, file_paths, track_id, **kwargs):
  638. calls.append(
  639. {
  640. "rag": rag_arg,
  641. "file_paths": file_paths,
  642. "track_id": track_id,
  643. "kwargs": kwargs,
  644. }
  645. )
  646. monkeypatch.setattr(_document_routes, "pipeline_index_files", capture_pipeline)
  647. await run_scanning_process(rag, doc_manager, "track-scan")
  648. # Only the new file goes through the enqueue path; the resume file
  649. # stays in input/ for any pending-parse engine that still needs the
  650. # source on disk.
  651. assert len(calls) == 1
  652. assert calls[0]["file_paths"] == [new_file]
  653. assert calls[0]["kwargs"] == {"from_scan": True}
  654. # The unconditional trigger fires once — guaranteeing the resume row
  655. # advances even if pipeline_index_files's internal trigger were to be
  656. # skipped (e.g. if every new file was rejected by enqueue).
  657. assert rag.process_calls == 1
  658. assert resume_file.exists()
  659. assert new_file.exists()
  660. async def test_scan_failed_extraction_record_without_full_docs_is_retried(
  661. tmp_path, monkeypatch
  662. ):
  663. """Stub doc_status rows recorded by apipeline_enqueue_error_documents
  664. have no full_docs entry — _validate_and_fix_document_consistency
  665. preserves them for manual review and excludes them from processing,
  666. so the resume path can never advance them. When the user fixes the
  667. file and re-scans we must drop the stale stub and route the file
  668. through the normal new-file enqueue, otherwise the fix never lands.
  669. """
  670. file_path = tmp_path / "fixed.docx"
  671. file_path.write_bytes(b"now-readable bytes")
  672. doc_manager = DocumentManager(str(tmp_path))
  673. rag = _ScanRag(
  674. {
  675. str(file_path): {
  676. "status": DocStatus.FAILED.value,
  677. "file_path": str(file_path),
  678. "track_id": "track-old",
  679. "metadata": {"error_type": "file_extraction_error"},
  680. }
  681. },
  682. full_docs_by_id={}, # Extraction error: no full_docs entry was ever written.
  683. )
  684. calls = []
  685. async def capture_pipeline(rag_arg, file_paths, track_id, **kwargs):
  686. calls.append(
  687. {
  688. "rag": rag_arg,
  689. "file_paths": file_paths,
  690. "track_id": track_id,
  691. "kwargs": kwargs,
  692. }
  693. )
  694. monkeypatch.setattr(_document_routes, "pipeline_index_files", capture_pipeline)
  695. await run_scanning_process(rag, doc_manager, "track-scan")
  696. # The stale FAILED stub is deleted and the file is routed as new so
  697. # the standard enqueue path can re-extract content. No resume
  698. # trigger fires because there are no resume targets.
  699. assert rag.doc_status.deleted_ids == [str(file_path)]
  700. assert len(calls) == 1
  701. assert calls[0]["file_paths"] == [file_path]
  702. assert calls[0]["kwargs"] == {"from_scan": True}
  703. assert rag.process_calls == 0
  704. assert file_path.exists()
  705. async def test_scan_failed_with_full_docs_resumes_normally(tmp_path, monkeypatch):
  706. """A FAILED row that DOES have a full_docs entry came from a downstream
  707. failure after content was successfully stored; the pipeline's resume
  708. logic resets it to PENDING and replays. The scan must not delete it.
  709. """
  710. file_path = tmp_path / "downstream-failed.docx"
  711. file_path.write_bytes(b"docx bytes")
  712. doc_manager = DocumentManager(str(tmp_path))
  713. rag = _ScanRag(
  714. {
  715. str(file_path): {
  716. "status": DocStatus.FAILED.value,
  717. "file_path": str(file_path),
  718. "track_id": "track-old",
  719. }
  720. }
  721. # full_docs default-seeded for the path → resumable FAILED.
  722. )
  723. calls = []
  724. async def capture_pipeline(rag_arg, file_paths, track_id, **kwargs):
  725. calls.append(file_paths)
  726. monkeypatch.setattr(_document_routes, "pipeline_index_files", capture_pipeline)
  727. await run_scanning_process(rag, doc_manager, "track-scan")
  728. # No stub deletion; resume path runs.
  729. assert rag.doc_status.deleted_ids == []
  730. assert calls == []
  731. assert rag.process_calls == 1
  732. assert file_path.exists()
  733. async def test_scan_resume_runs_when_all_new_files_fail_to_enqueue(
  734. tmp_path, monkeypatch
  735. ):
  736. """The exact P2 scenario: a scan batch contains a resume target plus
  737. new files that all fail / are rejected during enqueue.
  738. pipeline_index_files's internal process_enqueue is gated on at least
  739. one successful enqueue; without the unconditional resume trigger in
  740. run_scanning_process the PARSING/FAILED row would stay stuck.
  741. We simulate "all new files rejected" with a pipeline_index_files mock
  742. that records its invocation but does not call process_enqueue (mirroring
  743. the real helper's behaviour when every per-file enqueue returns False).
  744. """
  745. new_file = tmp_path / "fresh.docx"
  746. resume_file = tmp_path / "retry.docx"
  747. new_file.write_bytes(b"fresh docx bytes")
  748. resume_file.write_bytes(b"retry docx bytes")
  749. doc_manager = DocumentManager(str(tmp_path))
  750. rag = _ScanRag(
  751. {
  752. str(resume_file): {
  753. "status": DocStatus.PARSING.value,
  754. "file_path": str(resume_file),
  755. "track_id": "track-existing",
  756. }
  757. }
  758. )
  759. async def index_files_all_rejected(rag_arg, file_paths, track_id, **kwargs):
  760. # Real pipeline_index_files skips its internal
  761. # apipeline_process_enqueue_documents call when no per-file enqueue
  762. # succeeded; the mock omits the call entirely to mirror that.
  763. return None
  764. monkeypatch.setattr(
  765. _document_routes, "pipeline_index_files", index_files_all_rejected
  766. )
  767. await run_scanning_process(rag, doc_manager, "track-scan")
  768. # Even though pipeline_index_files's internal trigger never fired, the
  769. # scan still kicks process_enqueue once so the resume row advances.
  770. assert rag.process_calls == 1
  771. assert resume_file.exists()
  772. assert new_file.exists()
  773. async def test_upload_rejects_same_name_failed_doc_status_without_full_docs(
  774. tmp_path, monkeypatch
  775. ):
  776. # Other tests (e.g. test_auth.py) may replace global_args with a SimpleNamespace
  777. # that lacks max_upload_size; pin a known state so the upload endpoint runs.
  778. monkeypatch.setattr(
  779. _document_routes, "global_args", SimpleNamespace(max_upload_size=None)
  780. )
  781. doc_manager = DocumentManager(str(tmp_path))
  782. rag = _DuplicateUploadRag(
  783. {
  784. "failed.docx": {
  785. "status": DocStatus.FAILED.value,
  786. "file_path": "failed.docx",
  787. "track_id": "track-failed",
  788. "metadata": {"error_type": "file_extraction_error"},
  789. }
  790. }
  791. )
  792. router = create_document_routes(rag, doc_manager)
  793. upload_endpoint = [
  794. route.endpoint
  795. for route in router.routes
  796. if getattr(route, "name", "") == "upload_to_input_dir"
  797. ][-1]
  798. upload_file = _document_routes.UploadFile(
  799. filename="failed.docx",
  800. file=BytesIO(b"replacement docx bytes"),
  801. )
  802. # Strict name pre-check: same-canonical record in doc_status now raises 409
  803. # rather than returning a "duplicated" 200 response. Clients must delete
  804. # the existing record before re-uploading.
  805. with pytest.raises(_document_routes.HTTPException) as excinfo:
  806. await upload_endpoint(_document_routes.BackgroundTasks(), upload_file)
  807. assert excinfo.value.status_code == 409
  808. assert "failed.docx" in excinfo.value.detail
  809. assert "Status: failed" in excinfo.value.detail
  810. assert not (tmp_path / "failed.docx").exists()
  811. async def test_upload_rejects_parser_hinted_filesystem_duplicate(tmp_path, monkeypatch):
  812. monkeypatch.setattr(
  813. _document_routes, "global_args", SimpleNamespace(max_upload_size=None)
  814. )
  815. (tmp_path / "existing.docx").write_bytes(b"existing docx bytes")
  816. doc_manager = DocumentManager(str(tmp_path))
  817. rag = _DuplicateUploadRag({})
  818. router = create_document_routes(rag, doc_manager)
  819. upload_endpoint = [
  820. route.endpoint
  821. for route in router.routes
  822. if getattr(route, "name", "") == "upload_to_input_dir"
  823. ][-1]
  824. upload_file = _document_routes.UploadFile(
  825. filename="existing.[native].docx",
  826. file=BytesIO(b"replacement docx bytes"),
  827. )
  828. # Strict name pre-check: an INPUT directory file with the same canonical
  829. # basename now blocks the upload with 409.
  830. with pytest.raises(_document_routes.HTTPException) as excinfo:
  831. await upload_endpoint(_document_routes.BackgroundTasks(), upload_file)
  832. assert excinfo.value.status_code == 409
  833. assert "existing.docx" in excinfo.value.detail
  834. assert not (tmp_path / "existing.[native].docx").exists()
  835. async def test_upload_succeeds_concurrent_with_pipeline_busy(tmp_path, monkeypatch):
  836. """Under the new contract, ``busy=True`` no longer blocks uploads.
  837. The upload reserves a pending-enqueue slot, schedules its bg task,
  838. and returns success; the bg task's enqueue is permitted while the
  839. pipeline is busy and the running loop's request_pending mechanism
  840. will pick up the new doc after its current batch.
  841. """
  842. import importlib
  843. monkeypatch.setattr(
  844. _document_routes, "global_args", SimpleNamespace(max_upload_size=None)
  845. )
  846. doc_manager = DocumentManager(str(tmp_path))
  847. rag = _DuplicateUploadRag({})
  848. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  849. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  850. pipeline_status = await shared_storage.get_namespace_data(
  851. "pipeline_status", workspace=rag.workspace
  852. )
  853. pipeline_status["scanning"] = False
  854. pipeline_status["pending_enqueues"] = 0
  855. pipeline_status["busy"] = True
  856. router = create_document_routes(rag, doc_manager)
  857. upload_endpoint = [
  858. route.endpoint
  859. for route in router.routes
  860. if getattr(route, "name", "") == "upload_to_input_dir"
  861. ][-1]
  862. upload_file = _document_routes.UploadFile(
  863. filename="while_busy.docx",
  864. file=BytesIO(b"docx bytes"),
  865. )
  866. bg = _document_routes.BackgroundTasks()
  867. response = await upload_endpoint(bg, upload_file)
  868. # Endpoint accepted the upload despite busy=True.
  869. assert response.status == "success"
  870. assert (tmp_path / "while_busy.docx").exists()
  871. # The slot has been transferred to the bg task; it will release on
  872. # completion. Until then pending_enqueues stays at 1 so a
  873. # concurrent /scan would refuse.
  874. assert pipeline_status["pending_enqueues"] == 1
  875. assert len(bg.tasks) == 1
  876. async def test_upload_returns_409_when_scanning_classification(tmp_path, monkeypatch):
  877. """Upload must refuse with 409 when scan is in its CLASSIFICATION
  878. phase (``scanning_exclusive=True``). Scan's processing phase
  879. (``scanning=True`` but ``scanning_exclusive=False``) is permissive
  880. — see ``test_upload_succeeds_during_scan_processing_phase`` below.
  881. """
  882. import importlib
  883. monkeypatch.setattr(
  884. _document_routes, "global_args", SimpleNamespace(max_upload_size=None)
  885. )
  886. doc_manager = DocumentManager(str(tmp_path))
  887. rag = _DuplicateUploadRag({})
  888. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  889. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  890. pipeline_status = await shared_storage.get_namespace_data(
  891. "pipeline_status", workspace=rag.workspace
  892. )
  893. pipeline_status["scanning"] = True
  894. pipeline_status["scanning_exclusive"] = True
  895. router = create_document_routes(rag, doc_manager)
  896. upload_endpoint = [
  897. route.endpoint
  898. for route in router.routes
  899. if getattr(route, "name", "") == "upload_to_input_dir"
  900. ][-1]
  901. upload_file = _document_routes.UploadFile(
  902. filename="while_scanning.docx",
  903. file=BytesIO(b"docx bytes"),
  904. )
  905. with pytest.raises(_document_routes.HTTPException) as excinfo:
  906. await upload_endpoint(_document_routes.BackgroundTasks(), upload_file)
  907. assert excinfo.value.status_code == 409
  908. assert "classifying" in excinfo.value.detail.lower()
  909. assert not (tmp_path / "while_scanning.docx").exists()
  910. async def test_upload_succeeds_during_scan_processing_phase(tmp_path, monkeypatch):
  911. """User-reported scenario: while pipeline is doing scan-driven
  912. processing (``scanning=True`` but ``scanning_exclusive=False``),
  913. new uploads must be accepted. Scan's processing phase is
  914. behaviourally identical to busy=True — uploads coexist via
  915. request_pending.
  916. """
  917. import importlib
  918. monkeypatch.setattr(
  919. _document_routes, "global_args", SimpleNamespace(max_upload_size=None)
  920. )
  921. doc_manager = DocumentManager(str(tmp_path))
  922. rag = _DuplicateUploadRag({})
  923. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  924. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  925. pipeline_status = await shared_storage.get_namespace_data(
  926. "pipeline_status", workspace=rag.workspace
  927. )
  928. # Classification done; scan is now driving the processing pipeline.
  929. pipeline_status["scanning"] = True
  930. pipeline_status["scanning_exclusive"] = False
  931. pipeline_status["busy"] = True
  932. pipeline_status["pending_enqueues"] = 0
  933. router = create_document_routes(rag, doc_manager)
  934. upload_endpoint = [
  935. route.endpoint
  936. for route in router.routes
  937. if getattr(route, "name", "") == "upload_to_input_dir"
  938. ][-1]
  939. upload_file = _document_routes.UploadFile(
  940. filename="upload_during_scan_processing.docx",
  941. file=BytesIO(b"docx bytes"),
  942. )
  943. bg = _document_routes.BackgroundTasks()
  944. response = await upload_endpoint(bg, upload_file)
  945. # Endpoint accepted the upload despite scan in progress.
  946. assert response.status == "success"
  947. assert (tmp_path / "upload_during_scan_processing.docx").exists()
  948. assert pipeline_status["pending_enqueues"] == 1
  949. assert len(bg.tasks) == 1
  950. async def test_scan_endpoint_returns_skipped_when_pipeline_busy(tmp_path):
  951. """Scan endpoint must return ``scanning_skipped_pipeline_busy`` and NOT
  952. schedule a background task while the pipeline is busy."""
  953. import importlib
  954. doc_manager = DocumentManager(str(tmp_path))
  955. rag = _ScanRag({})
  956. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  957. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  958. pipeline_status = await shared_storage.get_namespace_data(
  959. "pipeline_status", workspace=rag.workspace
  960. )
  961. pipeline_status["busy"] = True
  962. router = create_document_routes(rag, doc_manager)
  963. scan_endpoint = [
  964. route.endpoint
  965. for route in router.routes
  966. if getattr(route, "name", "") == "scan_for_new_documents"
  967. ][-1]
  968. bg = _document_routes.BackgroundTasks()
  969. response = await scan_endpoint(bg)
  970. assert response.status == "scanning_skipped_pipeline_busy"
  971. # No background task should have been scheduled.
  972. assert len(bg.tasks) == 0
  973. # And ``scanning`` is left unchanged at False (we didn't acquire it).
  974. assert pipeline_status.get("scanning") is False
  975. async def test_scan_endpoint_returns_skipped_when_already_scanning(tmp_path):
  976. """Scan endpoint must reject overlapping scans by checking the
  977. ``scanning`` flag, not just ``busy``."""
  978. import importlib
  979. doc_manager = DocumentManager(str(tmp_path))
  980. rag = _ScanRag({})
  981. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  982. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  983. pipeline_status = await shared_storage.get_namespace_data(
  984. "pipeline_status", workspace=rag.workspace
  985. )
  986. pipeline_status["scanning"] = True
  987. router = create_document_routes(rag, doc_manager)
  988. scan_endpoint = [
  989. route.endpoint
  990. for route in router.routes
  991. if getattr(route, "name", "") == "scan_for_new_documents"
  992. ][-1]
  993. bg = _document_routes.BackgroundTasks()
  994. response = await scan_endpoint(bg)
  995. assert response.status == "scanning_skipped_pipeline_busy"
  996. assert len(bg.tasks) == 0
  997. async def test_scan_endpoint_acquires_and_releases_scanning_flag(tmp_path, monkeypatch):
  998. """The scan endpoint must atomically set ``scanning=True`` and
  999. ``run_scanning_process`` must clear it in finally — even when the body
  1000. raises — so successive scans aren't permanently blocked.
  1001. """
  1002. import importlib
  1003. doc_manager = DocumentManager(str(tmp_path))
  1004. rag = _ScanRag({})
  1005. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1006. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1007. pipeline_status = await shared_storage.get_namespace_data(
  1008. "pipeline_status", workspace=rag.workspace
  1009. )
  1010. pipeline_status["busy"] = False
  1011. pipeline_status["scanning"] = False
  1012. router = create_document_routes(rag, doc_manager)
  1013. scan_endpoint = [
  1014. route.endpoint
  1015. for route in router.routes
  1016. if getattr(route, "name", "") == "scan_for_new_documents"
  1017. ][-1]
  1018. bg = _document_routes.BackgroundTasks()
  1019. response = await scan_endpoint(bg)
  1020. # Endpoint scheduled the task and acquired the flag synchronously.
  1021. assert response.status == "scanning_started"
  1022. assert pipeline_status["scanning"] is True
  1023. assert len(bg.tasks) == 1
  1024. # Run the scheduled task; finally-block must clear the flag.
  1025. task = bg.tasks[0]
  1026. await task.func(*task.args, **task.kwargs)
  1027. assert pipeline_status["scanning"] is False
  1028. async def test_scan_endpoint_returns_skipped_when_enqueue_pending(tmp_path):
  1029. """The preflight-to-background race: an upload/insert endpoint may
  1030. have passed the idle check, reserved a pending-enqueue slot, and
  1031. returned success — but its bg task has not yet written to
  1032. doc_status. A scan that arrives in this window must refuse;
  1033. starting it would race scan's doc_status reads against the bg
  1034. task's still-pending writes.
  1035. """
  1036. import importlib
  1037. doc_manager = DocumentManager(str(tmp_path))
  1038. rag = _ScanRag({})
  1039. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1040. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1041. pipeline_status = await shared_storage.get_namespace_data(
  1042. "pipeline_status", workspace=rag.workspace
  1043. )
  1044. # The "scan-test" workspace is shared across tests; reset all guarded
  1045. # flags so we start from a clean idle state.
  1046. pipeline_status["busy"] = False
  1047. pipeline_status["scanning"] = False
  1048. pipeline_status["pending_enqueues"] = 0
  1049. # Simulate a reservation made by /upload that has not yet released.
  1050. pipeline_status["pending_enqueues"] = 1
  1051. router = create_document_routes(rag, doc_manager)
  1052. scan_endpoint = [
  1053. route.endpoint
  1054. for route in router.routes
  1055. if getattr(route, "name", "") == "scan_for_new_documents"
  1056. ][-1]
  1057. bg = _document_routes.BackgroundTasks()
  1058. response = await scan_endpoint(bg)
  1059. assert response.status == "scanning_skipped_pipeline_busy"
  1060. # No background task scheduled; scanning flag untouched.
  1061. assert len(bg.tasks) == 0
  1062. assert pipeline_status.get("scanning") is False
  1063. # Reservation count is preserved — only the owning bg task may release it.
  1064. assert pipeline_status["pending_enqueues"] == 1
  1065. async def test_reserve_enqueue_slot_blocks_concurrent_scan_until_release(tmp_path):
  1066. """End-to-end on the reservation primitive: reserving a slot makes
  1067. the scan endpoint refuse; releasing it lets the next scan in. This
  1068. is the contract the upload/text endpoints rely on to close the
  1069. preflight-to-background race.
  1070. """
  1071. import importlib
  1072. doc_manager = DocumentManager(str(tmp_path))
  1073. rag = _ScanRag({})
  1074. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1075. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1076. pipeline_status = await shared_storage.get_namespace_data(
  1077. "pipeline_status", workspace=rag.workspace
  1078. )
  1079. pipeline_status["busy"] = False
  1080. pipeline_status["scanning"] = False
  1081. pipeline_status["pending_enqueues"] = 0
  1082. # Reserve a slot — mirrors what /upload, /text and /texts do
  1083. # synchronously before scheduling their bg tasks.
  1084. reserved = await _document_routes._reserve_enqueue_slot(rag)
  1085. assert reserved is True
  1086. assert pipeline_status["pending_enqueues"] == 1
  1087. router = create_document_routes(rag, doc_manager)
  1088. scan_endpoint = [
  1089. route.endpoint
  1090. for route in router.routes
  1091. if getattr(route, "name", "") == "scan_for_new_documents"
  1092. ][-1]
  1093. bg = _document_routes.BackgroundTasks()
  1094. blocked = await scan_endpoint(bg)
  1095. assert blocked.status == "scanning_skipped_pipeline_busy"
  1096. # Release: bg task wrapper would do this in finally.
  1097. await _document_routes._release_enqueue_slot(rag)
  1098. assert pipeline_status["pending_enqueues"] == 0
  1099. bg2 = _document_routes.BackgroundTasks()
  1100. allowed = await scan_endpoint(bg2)
  1101. assert allowed.status == "scanning_started"
  1102. assert pipeline_status["scanning"] is True
  1103. async def test_release_enqueue_slot_decrements_per_call(tmp_path):
  1104. """Two-reservation cohort: each release is a pure decrement. Drain
  1105. coordination is no longer needed because the busy guard on enqueue
  1106. has been removed — concurrent enqueues are permitted while the
  1107. pipeline is busy and the running loop's request_pending mechanism
  1108. drains them.
  1109. """
  1110. import importlib
  1111. rag = _ScanRag({})
  1112. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1113. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1114. pipeline_status = await shared_storage.get_namespace_data(
  1115. "pipeline_status", workspace=rag.workspace
  1116. )
  1117. pipeline_status["busy"] = False
  1118. pipeline_status["scanning"] = False
  1119. pipeline_status["scanning_exclusive"] = False
  1120. pipeline_status["pending_enqueues"] = 0
  1121. # Two concurrent reservations from /upload + /text — both pass the
  1122. # idle preflight because scanning_exclusive=F at reservation time
  1123. # (busy and the bare ``scanning`` flag are no longer gates;
  1124. # concurrent enqueue with the processing loop and scan's
  1125. # processing phase is explicitly allowed).
  1126. assert await _document_routes._reserve_enqueue_slot(rag) is True
  1127. assert await _document_routes._reserve_enqueue_slot(rag) is True
  1128. assert pipeline_status["pending_enqueues"] == 2
  1129. # Each release is a pure decrement; no drain coordination required
  1130. # because each bg task triggers process_enqueue independently and
  1131. # the running loop's request_pending mechanism collapses duplicate
  1132. # triggers safely.
  1133. await _document_routes._release_enqueue_slot(rag)
  1134. assert pipeline_status["pending_enqueues"] == 1
  1135. await _document_routes._release_enqueue_slot(rag)
  1136. assert pipeline_status["pending_enqueues"] == 0
  1137. async def test_two_concurrent_uploads_both_succeed_when_pipeline_busy(
  1138. tmp_path, monkeypatch
  1139. ):
  1140. """The scenario the original race report described, end-to-end:
  1141. two upload requests arrive while the pipeline is busy. Under the
  1142. new contract neither is rejected; both reserve slots, both schedule
  1143. bg tasks, and pending_enqueues stays at 2 until each bg task
  1144. releases. No reservation can be killed by the busy guard because
  1145. that guard has been removed from apipeline_enqueue_documents.
  1146. """
  1147. import importlib
  1148. monkeypatch.setattr(
  1149. _document_routes, "global_args", SimpleNamespace(max_upload_size=None)
  1150. )
  1151. doc_manager = DocumentManager(str(tmp_path))
  1152. rag = _DuplicateUploadRag({})
  1153. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1154. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1155. pipeline_status = await shared_storage.get_namespace_data(
  1156. "pipeline_status", workspace=rag.workspace
  1157. )
  1158. pipeline_status["scanning"] = False
  1159. pipeline_status["pending_enqueues"] = 0
  1160. pipeline_status["busy"] = True
  1161. router = create_document_routes(rag, doc_manager)
  1162. upload_endpoint = [
  1163. route.endpoint
  1164. for route in router.routes
  1165. if getattr(route, "name", "") == "upload_to_input_dir"
  1166. ][-1]
  1167. bg_a = _document_routes.BackgroundTasks()
  1168. upload_a = _document_routes.UploadFile(filename="a.docx", file=BytesIO(b"a bytes"))
  1169. response_a = await upload_endpoint(bg_a, upload_a)
  1170. assert response_a.status == "success"
  1171. assert pipeline_status["pending_enqueues"] == 1
  1172. bg_b = _document_routes.BackgroundTasks()
  1173. upload_b = _document_routes.UploadFile(filename="b.docx", file=BytesIO(b"b bytes"))
  1174. response_b = await upload_endpoint(bg_b, upload_b)
  1175. assert response_b.status == "success"
  1176. # Both reservations coexist while bg tasks are pending.
  1177. assert pipeline_status["pending_enqueues"] == 2
  1178. # Both files were written to disk; both bg tasks scheduled.
  1179. assert (tmp_path / "a.docx").exists()
  1180. assert (tmp_path / "b.docx").exists()
  1181. assert len(bg_a.tasks) == 1
  1182. assert len(bg_b.tasks) == 1
  1183. async def test_reserve_enqueue_slot_allows_busy_and_scan_processing_phase(tmp_path):
  1184. """Reservation only blocks on ``scanning_exclusive`` (scan's
  1185. classification phase) and ``destructive_busy``. Plain ``busy=True``
  1186. (processing loop) and ``scanning=True`` with
  1187. ``scanning_exclusive=False`` (scan in its processing phase) are
  1188. BOTH permitted — that's what enables "upload while pipeline is
  1189. working".
  1190. """
  1191. import importlib
  1192. rag = _ScanRag({})
  1193. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1194. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1195. pipeline_status = await shared_storage.get_namespace_data(
  1196. "pipeline_status", workspace=rag.workspace
  1197. )
  1198. pipeline_status["busy"] = False
  1199. pipeline_status["scanning"] = False
  1200. pipeline_status["scanning_exclusive"] = False
  1201. pipeline_status["pending_enqueues"] = 0
  1202. # busy=True alone does NOT block.
  1203. pipeline_status["busy"] = True
  1204. assert await _document_routes._reserve_enqueue_slot(rag) is True
  1205. await _document_routes._release_enqueue_slot(rag)
  1206. pipeline_status["busy"] = False
  1207. # scanning=True (scan processing phase) does NOT block — this is
  1208. # the user-reported case: upload during scan-driven processing
  1209. # must succeed.
  1210. pipeline_status["scanning"] = True
  1211. assert await _document_routes._reserve_enqueue_slot(rag) is True
  1212. await _document_routes._release_enqueue_slot(rag)
  1213. pipeline_status["scanning"] = False
  1214. # scanning_exclusive=True (scan classification phase) STILL rejects.
  1215. pipeline_status["scanning"] = True
  1216. pipeline_status["scanning_exclusive"] = True
  1217. with pytest.raises(_document_routes.HTTPException) as exc:
  1218. await _document_routes._reserve_enqueue_slot(rag)
  1219. assert exc.value.status_code == 409
  1220. assert "classifying" in exc.value.detail.lower()
  1221. assert pipeline_status["pending_enqueues"] == 0
  1222. async def test_reserve_enqueue_slot_rejects_destructive_busy(tmp_path):
  1223. """``destructive_busy`` (set by /documents/clear and per-doc delete)
  1224. must reject reservation: those jobs DROP storages and remove input
  1225. files, so any concurrent enqueue would write to a storage being
  1226. torn down and silently lose the document after the client saw 200.
  1227. """
  1228. import importlib
  1229. rag = _ScanRag({})
  1230. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1231. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1232. pipeline_status = await shared_storage.get_namespace_data(
  1233. "pipeline_status", workspace=rag.workspace
  1234. )
  1235. pipeline_status["busy"] = False
  1236. pipeline_status["scanning"] = False
  1237. pipeline_status["scanning_exclusive"] = False
  1238. pipeline_status["pending_enqueues"] = 0
  1239. # destructive_busy=True (clear / delete in flight) → 409.
  1240. pipeline_status["busy"] = True
  1241. pipeline_status["destructive_busy"] = True
  1242. with pytest.raises(_document_routes.HTTPException) as exc:
  1243. await _document_routes._reserve_enqueue_slot(rag)
  1244. assert exc.value.status_code == 409
  1245. assert "clearing or deleting" in exc.value.detail.lower()
  1246. assert pipeline_status["pending_enqueues"] == 0
  1247. # Cleared once the destructive job finishes.
  1248. pipeline_status["destructive_busy"] = False
  1249. pipeline_status["busy"] = False
  1250. assert await _document_routes._reserve_enqueue_slot(rag) is True
  1251. await _document_routes._release_enqueue_slot(rag)
  1252. async def test_clear_documents_sets_and_clears_destructive_busy(tmp_path):
  1253. """``/documents/clear`` must set ``destructive_busy=True`` while it is
  1254. dropping storages (so concurrent uploads get 409, not silent loss)
  1255. and clear the flag on completion so the pipeline returns to idle.
  1256. """
  1257. import importlib
  1258. workspace = f"clear-test-{uuid4().hex}"
  1259. observed = {"destructive_busy": None}
  1260. class _DropSpy:
  1261. """Mid-drop probe: snapshots ``destructive_busy`` when the clear
  1262. endpoint calls our ``drop()``. Concurrent reservations during
  1263. this window MUST see destructive_busy=True.
  1264. """
  1265. def __init__(self, ws):
  1266. self.namespace = "spy"
  1267. self.workspace = ws
  1268. async def drop(self):
  1269. shared_storage_inner = importlib.import_module("lightrag.kg.shared_storage")
  1270. ns = await shared_storage_inner.get_namespace_data(
  1271. "pipeline_status", workspace=self.workspace
  1272. )
  1273. observed["destructive_busy"] = ns.get("destructive_busy")
  1274. return None
  1275. spy = _DropSpy(workspace)
  1276. class _ClearRag:
  1277. def __init__(self):
  1278. self.workspace = workspace
  1279. # Eleven storage attributes the clear endpoint iterates over.
  1280. # Reusing the same spy is fine — each gets ``.drop()`` called
  1281. # in turn, all observe the same destructive_busy flag.
  1282. self.text_chunks = spy
  1283. self.full_docs = spy
  1284. self.full_entities = spy
  1285. self.full_relations = spy
  1286. self.entity_chunks = spy
  1287. self.relation_chunks = spy
  1288. self.entities_vdb = spy
  1289. self.relationships_vdb = spy
  1290. self.chunks_vdb = spy
  1291. self.chunk_entity_relation_graph = spy
  1292. self.doc_status = spy
  1293. async def aclear_cache(self, modes=None):
  1294. return None
  1295. doc_manager = DocumentManager(str(tmp_path))
  1296. rag = _ClearRag()
  1297. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1298. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1299. pipeline_status = await shared_storage.get_namespace_data(
  1300. "pipeline_status", workspace=rag.workspace
  1301. )
  1302. router = create_document_routes(rag, doc_manager)
  1303. clear_endpoint = [
  1304. route.endpoint
  1305. for route in router.routes
  1306. if getattr(route, "name", "") == "clear_documents"
  1307. ][-1]
  1308. response = await clear_endpoint()
  1309. assert response.status == "success"
  1310. # destructive_busy was True for the duration of the storage drop.
  1311. assert observed["destructive_busy"] is True
  1312. # And cleared back to False after completion.
  1313. assert pipeline_status.get("destructive_busy") is False
  1314. assert pipeline_status.get("busy") is False
  1315. async def test_clear_documents_refuses_when_scanning_or_pending_enqueues(tmp_path):
  1316. """``/documents/clear`` must refuse atomically when ANY exclusive
  1317. or in-flight writer is active — not just ``busy``. Previously
  1318. only ``busy`` was checked, so clear could begin dropping storages
  1319. while a /scan task was running or while an upload bg task had
  1320. reserved a slot but not yet written its doc to doc_status.
  1321. """
  1322. import importlib
  1323. workspace = f"clear-refuse-test-{uuid4().hex}"
  1324. class _StubRag:
  1325. def __init__(self):
  1326. self.workspace = workspace
  1327. rag = _StubRag()
  1328. doc_manager = DocumentManager(str(tmp_path))
  1329. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1330. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1331. pipeline_status = await shared_storage.get_namespace_data(
  1332. "pipeline_status", workspace=rag.workspace
  1333. )
  1334. router = create_document_routes(rag, doc_manager)
  1335. clear_endpoint = [
  1336. route.endpoint
  1337. for route in router.routes
  1338. if getattr(route, "name", "") == "clear_documents"
  1339. ][-1]
  1340. # Case 1: scanning=True must refuse.
  1341. pipeline_status["busy"] = False
  1342. pipeline_status["scanning"] = True
  1343. pipeline_status["pending_enqueues"] = 0
  1344. response = await clear_endpoint()
  1345. assert response.status == "busy"
  1346. # Critical: no flag mutation occurred; scanning is still owned.
  1347. assert pipeline_status["scanning"] is True
  1348. assert pipeline_status.get("destructive_busy", False) is False
  1349. # Case 2: pending_enqueues>0 must refuse.
  1350. pipeline_status["scanning"] = False
  1351. pipeline_status["pending_enqueues"] = 1
  1352. response = await clear_endpoint()
  1353. assert response.status == "busy"
  1354. assert pipeline_status["pending_enqueues"] == 1
  1355. assert pipeline_status.get("destructive_busy", False) is False
  1356. # Case 3: busy=True (e.g. processing loop or another destructive
  1357. # job) must refuse — preserves existing behaviour.
  1358. pipeline_status["pending_enqueues"] = 0
  1359. pipeline_status["busy"] = True
  1360. response = await clear_endpoint()
  1361. assert response.status == "busy"
  1362. assert pipeline_status.get("destructive_busy", False) is False
  1363. async def test_delete_document_reserves_destructive_busy_synchronously(tmp_path):
  1364. """``/documents/delete_document`` must reserve the destructive slot
  1365. synchronously BEFORE returning ``deletion_started``. Otherwise
  1366. a /scan or /upload arriving between the response and the bg task
  1367. starting could race the destructive job.
  1368. Acceptance criteria: after the endpoint returns success,
  1369. pipeline_status reflects ``busy=True`` and ``destructive_busy=True``
  1370. even though the bg task hasn't run yet. Refusal cases for
  1371. scanning / pending_enqueues / busy must short-circuit and return
  1372. ``status="busy"`` without scheduling.
  1373. """
  1374. import importlib
  1375. rag = _DeleteRag(DeletionResult(status="success", message="ok", doc_id="doc-1"))
  1376. doc_manager = DocumentManager(str(tmp_path))
  1377. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1378. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1379. pipeline_status = await shared_storage.get_namespace_data(
  1380. "pipeline_status", workspace=rag.workspace
  1381. )
  1382. router = create_document_routes(rag, doc_manager)
  1383. delete_endpoint = [
  1384. route.endpoint
  1385. for route in router.routes
  1386. if getattr(route, "name", "") == "delete_document"
  1387. ][-1]
  1388. # Build the request payload using the model class on the module.
  1389. DeleteDocRequest = _document_routes.DeleteDocRequest
  1390. # Case 1: reservation acquired synchronously, bg task scheduled.
  1391. pipeline_status["busy"] = False
  1392. pipeline_status["scanning"] = False
  1393. pipeline_status["pending_enqueues"] = 0
  1394. bg = _document_routes.BackgroundTasks()
  1395. response = await delete_endpoint(
  1396. DeleteDocRequest(doc_ids=["doc-1"]),
  1397. bg,
  1398. )
  1399. assert response.status == "deletion_started"
  1400. # Synchronously reserved BEFORE returning.
  1401. assert pipeline_status["busy"] is True
  1402. assert pipeline_status["destructive_busy"] is True
  1403. assert len(bg.tasks) == 1
  1404. # Reset for next case.
  1405. pipeline_status["busy"] = False
  1406. pipeline_status["destructive_busy"] = False
  1407. # Case 2: scanning=True must refuse without scheduling.
  1408. pipeline_status["scanning"] = True
  1409. bg = _document_routes.BackgroundTasks()
  1410. response = await delete_endpoint(
  1411. DeleteDocRequest(doc_ids=["doc-1"]),
  1412. bg,
  1413. )
  1414. assert response.status == "busy"
  1415. assert len(bg.tasks) == 0
  1416. assert pipeline_status.get("destructive_busy", False) is False
  1417. pipeline_status["scanning"] = False
  1418. # Case 3: pending_enqueues>0 must refuse without scheduling.
  1419. pipeline_status["pending_enqueues"] = 1
  1420. bg = _document_routes.BackgroundTasks()
  1421. response = await delete_endpoint(
  1422. DeleteDocRequest(doc_ids=["doc-1"]),
  1423. bg,
  1424. )
  1425. assert response.status == "busy"
  1426. assert len(bg.tasks) == 0
  1427. assert pipeline_status["pending_enqueues"] == 1
  1428. assert pipeline_status.get("destructive_busy", False) is False
  1429. pipeline_status["pending_enqueues"] = 0
  1430. def test_delete_file_variants_removes_canonical_hint_variants(tmp_path):
  1431. parsed_dir = tmp_path / PARSED_DIR_NAME
  1432. parsed_dir.mkdir()
  1433. files_to_delete = [
  1434. tmp_path / "report.docx",
  1435. tmp_path / "report.[native].docx",
  1436. parsed_dir / "report.[mineru].docx",
  1437. parsed_dir / "report.[native]_001.docx",
  1438. parsed_dir / "report_001.docx",
  1439. ]
  1440. for path in files_to_delete:
  1441. path.write_bytes(b"delete me")
  1442. unrelated_files = [
  1443. tmp_path / "report_001.docx",
  1444. tmp_path / "report_2024.docx",
  1445. parsed_dir / "other.[native].docx",
  1446. ]
  1447. for path in unrelated_files:
  1448. path.write_bytes(b"keep me")
  1449. artifact_dirs = [
  1450. parsed_dir / "report.docx.parsed",
  1451. parsed_dir / "report.docx.parsed_001",
  1452. ]
  1453. for artifact_dir in artifact_dirs:
  1454. artifact_dir.mkdir()
  1455. (artifact_dir / "report.blocks.jsonl").write_text("{}", encoding="utf-8")
  1456. unrelated_artifact_dir = parsed_dir / "other.docx.parsed"
  1457. unrelated_artifact_dir.mkdir()
  1458. (unrelated_artifact_dir / "other.blocks.jsonl").write_text("{}", encoding="utf-8")
  1459. deleted_files, errors = _document_routes.delete_file_variants_by_file_path(
  1460. tmp_path,
  1461. "report.docx",
  1462. )
  1463. assert errors == []
  1464. assert set(deleted_files) == {
  1465. "report.docx",
  1466. "report.[native].docx",
  1467. str(Path(PARSED_DIR_NAME) / "report.[mineru].docx"),
  1468. str(Path(PARSED_DIR_NAME) / "report.[native]_001.docx"),
  1469. str(Path(PARSED_DIR_NAME) / "report_001.docx"),
  1470. str(Path(PARSED_DIR_NAME) / "report.docx.parsed"),
  1471. str(Path(PARSED_DIR_NAME) / "report.docx.parsed_001"),
  1472. }
  1473. assert all(not path.exists() for path in files_to_delete)
  1474. assert all(path.exists() for path in unrelated_files)
  1475. assert all(not artifact_dir.exists() for artifact_dir in artifact_dirs)
  1476. assert unrelated_artifact_dir.is_dir()
  1477. async def test_background_delete_removes_parser_hint_file_variants(tmp_path):
  1478. shared_storage = importlib.import_module("lightrag.kg.shared_storage")
  1479. parsed_dir = tmp_path / PARSED_DIR_NAME
  1480. parsed_dir.mkdir()
  1481. source_file = tmp_path / "paper.[native].docx"
  1482. source_file.write_bytes(b"source")
  1483. parsed_variant = parsed_dir / "paper.[mineru]_001.docx"
  1484. parsed_variant.write_bytes(b"parsed")
  1485. unrelated_file = tmp_path / "other.[native].docx"
  1486. unrelated_file.write_bytes(b"keep")
  1487. doc_manager = DocumentManager(str(tmp_path))
  1488. rag = _DeleteRag(
  1489. DeletionResult(
  1490. status="success",
  1491. doc_id="doc-paper",
  1492. message="deleted",
  1493. file_path="paper.docx",
  1494. )
  1495. )
  1496. shared_storage.initialize_share_data()
  1497. await shared_storage.initialize_pipeline_status(workspace=rag.workspace)
  1498. await _document_routes.background_delete_documents(
  1499. rag,
  1500. doc_manager,
  1501. ["doc-paper"],
  1502. delete_file=True,
  1503. delete_llm_cache=True,
  1504. )
  1505. assert rag.deleted_doc_ids == [("doc-paper", True)]
  1506. assert not source_file.exists()
  1507. assert not parsed_variant.exists()
  1508. assert unrelated_file.exists()
  1509. async def test_docx_archive_failure_is_best_effort(tmp_path, monkeypatch):
  1510. file_path = tmp_path / "archive-failure.docx"
  1511. file_path.write_bytes(b"docx bytes")
  1512. async def _raise_archive_failure(*args, **kwargs):
  1513. raise OSError("simulated archive failure")
  1514. from lightrag.utils_pipeline import (
  1515. archive_docx_source_after_full_docs_sync,
  1516. )
  1517. import lightrag.utils_pipeline as _utils_pipeline
  1518. monkeypatch.setattr(
  1519. _utils_pipeline, "move_file_to_parsed_dir", _raise_archive_failure
  1520. )
  1521. archived_path = await archive_docx_source_after_full_docs_sync(str(file_path))
  1522. assert archived_path is None
  1523. assert file_path.exists()
  1524. async def test_parse_native_archives_docx_after_full_docs_sync(tmp_path, monkeypatch):
  1525. monkeypatch.setenv("INPUT_DIR", str(tmp_path))
  1526. source_path = tmp_path / "parsed-after-sync.docx"
  1527. source_path.write_bytes(b"docx bytes")
  1528. rag = _ParseRag(tmp_path / "work", source_path)
  1529. def _fake_extract(file_path, fixlevel=None, drawing_context=None, **kwargs):
  1530. # extract_docx_blocks returns a list of block dicts; a single text
  1531. # block is enough to exercise the archive + full_docs side-effects
  1532. # tested below — the adapter will turn it into one .blocks.jsonl
  1533. # content row.
  1534. return [
  1535. {
  1536. "uuid": "p1",
  1537. "uuid_end": "p1",
  1538. "heading": "",
  1539. "content": "parsed",
  1540. "type": "text",
  1541. "parent_headings": [],
  1542. "level": 0,
  1543. "table_chunk_role": "none",
  1544. }
  1545. ]
  1546. monkeypatch.setattr(
  1547. "lightrag.parser.docx.parse_document.extract_docx_blocks",
  1548. _fake_extract,
  1549. )
  1550. result = await LightRAG.parse_native(
  1551. rag,
  1552. "doc-test",
  1553. str(source_path),
  1554. {"parse_format": FULL_DOCS_FORMAT_PENDING_PARSE, "content": ""},
  1555. )
  1556. # parse_native now returns LIGHTRAG-format parsed_data with merged_text
  1557. # (not the {{LRdoc}} marker — that's only in the persisted full_docs row).
  1558. assert result["content"]
  1559. assert result["parse_format"] == "lightrag"
  1560. assert result["blocks_path"]
  1561. assert rag.full_docs.events == ["upsert", "index_done"]
  1562. assert not source_path.exists()
  1563. assert (tmp_path / PARSED_DIR_NAME / source_path.name).exists()
  1564. parsed_artifact_dir = tmp_path / PARSED_DIR_NAME / f"{source_path.name}.parsed"
  1565. assert parsed_artifact_dir.is_dir()
  1566. assert (parsed_artifact_dir / "parsed-after-sync.blocks.jsonl").is_file()
  1567. assert rag.full_docs.data["doc-test"]["parse_engine"] == "native"
  1568. assert rag.full_docs.data["doc-test"]["parse_format"] == "lightrag"
  1569. # Per docs/FileProcessingConfiguration-zh.md, content uses the {{LRdoc}}
  1570. # marker plus a leading-text summary derived from merged blocks.
  1571. assert rag.full_docs.data["doc-test"]["content"].startswith("{{LRdoc}}")
  1572. def test_parsed_artifact_dir_uses_unique_suffix_when_path_is_file(
  1573. tmp_path, monkeypatch
  1574. ):
  1575. from lightrag.utils_pipeline import parsed_artifact_dir_for
  1576. monkeypatch.setenv("INPUT_DIR", str(tmp_path))
  1577. parsed_dir = tmp_path / PARSED_DIR_NAME
  1578. parsed_dir.mkdir()
  1579. (parsed_dir / "demo.docx.parsed").write_text("legacy file", encoding="utf-8")
  1580. artifact_dir = parsed_artifact_dir_for("demo.docx")
  1581. assert artifact_dir == parsed_dir / "demo.docx.parsed_001"
  1582. def test_parsed_artifact_dir_reuses_existing_parsed_parent(tmp_path, monkeypatch):
  1583. from lightrag.utils_pipeline import parsed_artifact_dir_for
  1584. monkeypatch.setenv("INPUT_DIR", str(tmp_path))
  1585. parsed_dir = tmp_path / PARSED_DIR_NAME
  1586. parsed_dir.mkdir()
  1587. artifact_dir = parsed_artifact_dir_for("demo.docx")
  1588. assert artifact_dir == parsed_dir / "demo.docx.parsed"
  1589. async def test_parse_native_docx_content_list_failure_raises_without_fallback(
  1590. tmp_path, monkeypatch
  1591. ):
  1592. source_path = tmp_path / "content-list-failure.docx"
  1593. source_path.write_bytes(b"docx bytes")
  1594. rag = _ParseRag(tmp_path / "work", source_path)
  1595. def _raise_parser(file_path, fixlevel=None, drawing_context=None, **kwargs):
  1596. raise RuntimeError("content list boom")
  1597. def _fail_fallback(file_bytes):
  1598. raise AssertionError("plain text fallback should not run")
  1599. monkeypatch.setattr(
  1600. "lightrag.parser.docx.parse_document.extract_docx_blocks",
  1601. _raise_parser,
  1602. )
  1603. monkeypatch.setattr(_document_routes, "_extract_docx", _fail_fallback)
  1604. with pytest.raises(RuntimeError, match="content list boom"):
  1605. await LightRAG.parse_native(
  1606. rag,
  1607. "doc-test",
  1608. str(source_path),
  1609. {"parse_format": FULL_DOCS_FORMAT_PENDING_PARSE, "content": ""},
  1610. )
  1611. assert source_path.exists()
  1612. assert rag.full_docs.events == []
  1613. async def test_parse_native_docx_empty_content_list_result_raises_without_fallback(
  1614. tmp_path, monkeypatch
  1615. ):
  1616. source_path = tmp_path / "empty-content-list.docx"
  1617. source_path.write_bytes(b"docx bytes")
  1618. rag = _ParseRag(tmp_path / "work", source_path)
  1619. def _fail_fallback(file_bytes):
  1620. raise AssertionError("plain text fallback should not run")
  1621. monkeypatch.setattr(
  1622. "lightrag.parser.docx.parse_document.extract_docx_blocks",
  1623. lambda *args, **kwargs: [],
  1624. )
  1625. monkeypatch.setattr(_document_routes, "_extract_docx", _fail_fallback)
  1626. with pytest.raises(ValueError, match="empty content"):
  1627. await LightRAG.parse_native(
  1628. rag,
  1629. "doc-test",
  1630. str(source_path),
  1631. {"parse_format": FULL_DOCS_FORMAT_PENDING_PARSE, "content": ""},
  1632. )
  1633. assert source_path.exists()
  1634. assert rag.full_docs.events == []
  1635. def test_lightrag_document_reprocess_uses_full_docs_without_reparse():
  1636. engine = resolve_stored_document_parser_engine(
  1637. "report.[mineru].docx",
  1638. {
  1639. "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
  1640. "sidecar_location": "file:///tmp/report.docx.parsed/",
  1641. "parse_engine": "mineru",
  1642. },
  1643. )
  1644. assert engine == "native"