test_doc_status_chunk_preservation.py 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601
  1. import asyncio
  2. from datetime import datetime, timezone
  3. from types import MethodType
  4. from uuid import uuid4
  5. import numpy as np
  6. import pytest
  7. import lightrag.lightrag as lightrag_module
  8. import lightrag.pipeline as pipeline_module
  9. from lightrag.base import DocStatus
  10. from lightrag.constants import GRAPH_FIELD_SEP
  11. from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
  12. from lightrag.lightrag import LightRAG
  13. from lightrag.utils import (
  14. EmbeddingFunc,
  15. Tokenizer,
  16. compute_mdhash_id,
  17. make_relation_chunk_key,
  18. )
  19. pytestmark = pytest.mark.offline
  20. class _SimpleTokenizerImpl:
  21. def encode(self, content: str) -> list[int]:
  22. return [ord(ch) for ch in content]
  23. def decode(self, tokens: list[int]) -> str:
  24. return "".join(chr(t) for t in tokens)
  25. async def _dummy_embedding(texts: list[str]) -> np.ndarray:
  26. return np.ones((len(texts), 8), dtype=float)
  27. async def _dummy_llm(*args, **kwargs) -> str:
  28. return "ok"
  29. def _deterministic_chunking(
  30. tokenizer,
  31. content: str,
  32. split_by_character,
  33. split_by_character_only: bool,
  34. chunk_overlap_token_size: int,
  35. chunk_token_size: int,
  36. ) -> list[dict]:
  37. return [
  38. {"tokens": 1, "content": f"{content}::chunk1", "chunk_order_index": 0},
  39. {"tokens": 1, "content": f"{content}::chunk2", "chunk_order_index": 1},
  40. ]
  41. def _failing_chunking(
  42. tokenizer,
  43. content: str,
  44. split_by_character,
  45. split_by_character_only: bool,
  46. chunk_overlap_token_size: int,
  47. chunk_token_size: int,
  48. ) -> list[dict]:
  49. raise RuntimeError("chunking fail sentinel")
  50. def _status_to_text(status: object) -> str:
  51. if isinstance(status, DocStatus):
  52. return status.value
  53. return str(status).replace("DocStatus.", "").lower()
  54. async def _build_rag(
  55. tmp_path,
  56. test_name: str,
  57. chunking_func,
  58. *,
  59. max_parallel_insert: int = 1,
  60. ) -> LightRAG:
  61. workspace = f"{test_name}_{uuid4().hex[:8]}"
  62. rag = LightRAG(
  63. working_dir=str(tmp_path / test_name),
  64. workspace=workspace,
  65. llm_model_func=_dummy_llm,
  66. embedding_func=EmbeddingFunc(
  67. embedding_dim=8,
  68. max_token_size=8192,
  69. func=_dummy_embedding,
  70. ),
  71. tokenizer=Tokenizer("test-tokenizer", _SimpleTokenizerImpl()),
  72. chunking_func=chunking_func,
  73. max_parallel_insert=max_parallel_insert,
  74. )
  75. await rag.initialize_storages()
  76. return rag
  77. async def _seed_chunk_cache_entries(
  78. rag: LightRAG, chunk_ids: list[str], prefix: str
  79. ) -> list[str]:
  80. updates = {}
  81. cache_records = {}
  82. cache_ids: list[str] = []
  83. for idx, chunk_id in enumerate(chunk_ids):
  84. chunk_data = await rag.text_chunks.get_by_id(chunk_id)
  85. assert chunk_data is not None
  86. cache_id = f"{prefix}-cache-{idx}"
  87. chunk_data["llm_cache_list"] = [cache_id]
  88. updates[chunk_id] = chunk_data
  89. cache_records[cache_id] = {"cache_type": "extract", "return": f"cached-{idx}"}
  90. cache_ids.append(cache_id)
  91. await rag.text_chunks.upsert(updates)
  92. await rag.llm_response_cache.upsert(cache_records)
  93. return cache_ids
  94. async def _seed_delete_retry_state(
  95. rag: LightRAG,
  96. *,
  97. doc_id: str,
  98. status_chunk_ids: list[str],
  99. tracking_chunk_ids: list[str],
  100. chunk_owners: dict[str, str],
  101. metadata: dict | None = None,
  102. ) -> dict[str, str]:
  103. entity_a = "ENTITY-A"
  104. entity_b = "ENTITY-B"
  105. relation_key = make_relation_chunk_key(entity_a, entity_b)
  106. source_id = GRAPH_FIELD_SEP.join(tracking_chunk_ids)
  107. now = datetime.now(timezone.utc).isoformat()
  108. await rag.full_docs.upsert(
  109. {doc_id: {"content": "delete retry state doc", "file_path": "delete_retry.txt"}}
  110. )
  111. await rag.doc_status.upsert(
  112. {
  113. doc_id: {
  114. "status": DocStatus.PROCESSED,
  115. "content_summary": "delete retry state",
  116. "content_length": 22,
  117. "chunks_count": len(status_chunk_ids),
  118. "chunks_list": status_chunk_ids,
  119. "created_at": now,
  120. "updated_at": now,
  121. "file_path": "delete_retry.txt",
  122. "track_id": f"track-{doc_id}",
  123. "error_msg": "",
  124. "metadata": metadata or {},
  125. }
  126. }
  127. )
  128. chunk_payload = {}
  129. for chunk_id, owner_doc_id in chunk_owners.items():
  130. chunk_payload[chunk_id] = {
  131. "content": f"{chunk_id} content",
  132. "file_path": f"{chunk_id}.txt",
  133. "full_doc_id": owner_doc_id,
  134. }
  135. if chunk_payload:
  136. await rag.text_chunks.upsert(chunk_payload)
  137. await rag.chunks_vdb.upsert(chunk_payload)
  138. await rag.full_entities.upsert({doc_id: {"entity_names": [entity_a, entity_b]}})
  139. await rag.full_relations.upsert(
  140. {doc_id: {"relation_pairs": [(entity_a, entity_b)]}}
  141. )
  142. await rag.entity_chunks.upsert(
  143. {
  144. entity_a: {
  145. "chunk_ids": tracking_chunk_ids,
  146. "count": len(tracking_chunk_ids),
  147. },
  148. entity_b: {
  149. "chunk_ids": tracking_chunk_ids,
  150. "count": len(tracking_chunk_ids),
  151. },
  152. }
  153. )
  154. await rag.relation_chunks.upsert(
  155. {
  156. relation_key: {
  157. "chunk_ids": tracking_chunk_ids,
  158. "count": len(tracking_chunk_ids),
  159. }
  160. }
  161. )
  162. created_at = int(datetime.now(timezone.utc).timestamp())
  163. for entity_name in [entity_a, entity_b]:
  164. await rag.chunk_entity_relation_graph.upsert_node(
  165. entity_name,
  166. {
  167. "entity_id": entity_name,
  168. "source_id": source_id,
  169. "description": f"{entity_name} description",
  170. "entity_type": "test",
  171. "file_path": "delete_retry.txt",
  172. "created_at": created_at,
  173. "truncate": "",
  174. },
  175. )
  176. await rag.chunk_entity_relation_graph.upsert_edge(
  177. entity_a,
  178. entity_b,
  179. {
  180. "source": entity_a,
  181. "target": entity_b,
  182. "source_id": source_id,
  183. "description": "related",
  184. "keywords": "test",
  185. "weight": 1.0,
  186. "file_path": "delete_retry.txt",
  187. },
  188. )
  189. await rag.entities_vdb.upsert(
  190. {
  191. compute_mdhash_id(entity_a, prefix="ent-"): {
  192. "content": f"{entity_a}\n{entity_a} description",
  193. "entity_name": entity_a,
  194. "source_id": source_id,
  195. "description": f"{entity_a} description",
  196. "entity_type": "test",
  197. "file_path": "delete_retry.txt",
  198. },
  199. compute_mdhash_id(entity_b, prefix="ent-"): {
  200. "content": f"{entity_b}\n{entity_b} description",
  201. "entity_name": entity_b,
  202. "source_id": source_id,
  203. "description": f"{entity_b} description",
  204. "entity_type": "test",
  205. "file_path": "delete_retry.txt",
  206. },
  207. }
  208. )
  209. await rag.relationships_vdb.upsert(
  210. {
  211. compute_mdhash_id(entity_a + entity_b, prefix="rel-"): {
  212. "content": f"test\t{entity_a}\n{entity_b}\nrelated",
  213. "src_id": entity_a,
  214. "tgt_id": entity_b,
  215. "source_id": source_id,
  216. "description": "related",
  217. "keywords": "test",
  218. "weight": 1.0,
  219. "file_path": "delete_retry.txt",
  220. }
  221. }
  222. )
  223. return {
  224. "entity_a": entity_a,
  225. "entity_b": entity_b,
  226. "relation_key": relation_key,
  227. }
  228. async def _succeed_rebuild_from_remaining_chunks(
  229. entities_to_rebuild,
  230. relationships_to_rebuild,
  231. knowledge_graph_inst,
  232. entities_vdb,
  233. relationships_vdb,
  234. **kwargs,
  235. ):
  236. for entity_name, remaining_chunk_ids in entities_to_rebuild.items():
  237. node = await knowledge_graph_inst.get_node(entity_name)
  238. assert node is not None
  239. updated_node = {
  240. **node,
  241. "source_id": GRAPH_FIELD_SEP.join(remaining_chunk_ids),
  242. }
  243. await knowledge_graph_inst.upsert_node(entity_name, updated_node)
  244. await entities_vdb.upsert(
  245. {
  246. compute_mdhash_id(entity_name, prefix="ent-"): {
  247. "content": f"{entity_name}\n{updated_node['description']}",
  248. "entity_name": entity_name,
  249. "source_id": updated_node["source_id"],
  250. "description": updated_node["description"],
  251. "entity_type": updated_node["entity_type"],
  252. "file_path": updated_node["file_path"],
  253. }
  254. }
  255. )
  256. for (src, tgt), remaining_chunk_ids in relationships_to_rebuild.items():
  257. edge = await knowledge_graph_inst.get_edge(src, tgt)
  258. assert edge is not None
  259. updated_edge = {
  260. **edge,
  261. "source_id": GRAPH_FIELD_SEP.join(remaining_chunk_ids),
  262. }
  263. await knowledge_graph_inst.upsert_edge(src, tgt, updated_edge)
  264. await relationships_vdb.upsert(
  265. {
  266. compute_mdhash_id(src + tgt, prefix="rel-"): {
  267. "content": f"{updated_edge['keywords']}\t{src}\n{tgt}\n{updated_edge['description']}",
  268. "src_id": src,
  269. "tgt_id": tgt,
  270. "source_id": updated_edge["source_id"],
  271. "description": updated_edge["description"],
  272. "keywords": updated_edge["keywords"],
  273. "weight": updated_edge["weight"],
  274. "file_path": updated_edge["file_path"],
  275. }
  276. }
  277. )
  278. @pytest.mark.asyncio
  279. async def test_extract_failure_preserves_chunks_and_allows_delete_with_cache_cleanup(
  280. tmp_path, monkeypatch
  281. ):
  282. rag = await _build_rag(tmp_path, "extract_failure_cleanup", _deterministic_chunking)
  283. try:
  284. content = "extract failure document"
  285. file_path = "extract_failure.txt"
  286. doc_id = compute_mdhash_id(file_path, prefix="doc-")
  287. await rag.apipeline_enqueue_documents(input=content, file_paths=file_path)
  288. async def fail_extract(self, chunks, pipeline_status, pipeline_status_lock):
  289. raise RuntimeError("extract fail sentinel")
  290. rag._process_extract_entities = MethodType(fail_extract, rag)
  291. await rag.apipeline_process_enqueue_documents()
  292. doc_status = await rag.doc_status.get_by_id(doc_id)
  293. assert doc_status is not None
  294. assert _status_to_text(doc_status["status"]) == "failed"
  295. chunk_ids = doc_status.get("chunks_list", [])
  296. assert len(chunk_ids) == 2
  297. assert doc_status.get("chunks_count") == 2
  298. cache_ids = await _seed_chunk_cache_entries(rag, chunk_ids, "extract")
  299. result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  300. assert result.status == "success"
  301. deleted_chunks = await rag.text_chunks.get_by_ids(chunk_ids)
  302. assert all(item is None for item in deleted_chunks)
  303. deleted_cache = [
  304. await rag.llm_response_cache.get_by_id(cid) for cid in cache_ids
  305. ]
  306. assert all(item is None for item in deleted_cache)
  307. assert await rag.doc_status.get_by_id(doc_id) is None
  308. finally:
  309. await rag.finalize_storages()
  310. @pytest.mark.asyncio
  311. async def test_extract_failure_before_chunking_clears_stale_chunk_snapshot(
  312. tmp_path,
  313. ):
  314. """The resume branch of ``apipeline_process_enqueue_documents`` purges
  315. any stale ``chunks_list`` from a previous interrupted run *before*
  316. chunking starts (so the new run does not mix old and new chunks).
  317. Therefore, when chunking subsequently fails on the retry, the failed
  318. doc_status reflects the post-purge state — the previous snapshot is
  319. intentionally not preserved any more.
  320. Earlier this test asserted the opposite ("preserve previous snapshot
  321. across failure"), which conflicted with the documented resume rule
  322. that "已抽取文档一律删掉所有的文本块,重新走多模态分析和实体关系提取".
  323. """
  324. rag = await _build_rag(tmp_path, "extract_failure_pre_chunking", _failing_chunking)
  325. try:
  326. content = "chunking failure document"
  327. file_path = "chunking_failure.txt"
  328. doc_id = compute_mdhash_id(file_path, prefix="doc-")
  329. await rag.apipeline_enqueue_documents(input=content, file_paths=file_path)
  330. previous_chunks = ["chunk-old-1", "chunk-old-2", "chunk-old-3"]
  331. existing = await rag.doc_status.get_by_id(doc_id)
  332. assert existing is not None
  333. await rag.doc_status.upsert(
  334. {
  335. doc_id: {
  336. "status": DocStatus.FAILED,
  337. "content_summary": existing["content_summary"],
  338. "content_length": existing["content_length"],
  339. "chunks_count": len(previous_chunks),
  340. "chunks_list": previous_chunks,
  341. "created_at": existing["created_at"],
  342. "updated_at": datetime.now(timezone.utc).isoformat(),
  343. "file_path": existing["file_path"],
  344. "track_id": existing["track_id"],
  345. "error_msg": "previous failure",
  346. "metadata": {"source": "test"},
  347. }
  348. }
  349. )
  350. await rag.apipeline_process_enqueue_documents()
  351. failed_status = await rag.doc_status.get_by_id(doc_id)
  352. assert failed_status is not None
  353. assert _status_to_text(failed_status["status"]) == "failed"
  354. # Resume purged the stale list before chunking; the failure record
  355. # therefore shows zero chunks rather than the previous snapshot.
  356. assert failed_status.get("chunks_list") == []
  357. assert failed_status.get("chunks_count") == 0
  358. assert "chunking fail sentinel" in (failed_status.get("error_msg") or "")
  359. finally:
  360. await rag.finalize_storages()
  361. @pytest.mark.asyncio
  362. async def test_merge_failure_preserves_chunks_and_skip_cache_cleanup_when_disabled(
  363. tmp_path, monkeypatch
  364. ):
  365. rag = await _build_rag(
  366. tmp_path, "merge_failure_keep_cache", _deterministic_chunking
  367. )
  368. try:
  369. content = "merge failure document"
  370. file_path = "merge_failure.txt"
  371. doc_id = compute_mdhash_id(file_path, prefix="doc-")
  372. await rag.apipeline_enqueue_documents(input=content, file_paths=file_path)
  373. async def ok_extract(self, chunks, pipeline_status, pipeline_status_lock):
  374. return {"chunk_count": len(chunks)}
  375. async def fail_merge(**kwargs):
  376. raise RuntimeError("merge fail sentinel")
  377. rag._process_extract_entities = MethodType(ok_extract, rag)
  378. monkeypatch.setattr(pipeline_module, "merge_nodes_and_edges", fail_merge)
  379. await rag.apipeline_process_enqueue_documents()
  380. doc_status = await rag.doc_status.get_by_id(doc_id)
  381. assert doc_status is not None
  382. assert _status_to_text(doc_status["status"]) == "failed"
  383. chunk_ids = doc_status.get("chunks_list", [])
  384. assert len(chunk_ids) == 2
  385. assert doc_status.get("chunks_count") == 2
  386. cache_ids = await _seed_chunk_cache_entries(rag, chunk_ids, "merge")
  387. result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=False)
  388. assert result.status == "success"
  389. remaining_cache = [
  390. await rag.llm_response_cache.get_by_id(cid) for cid in cache_ids
  391. ]
  392. assert all(item is not None for item in remaining_cache)
  393. finally:
  394. await rag.finalize_storages()
  395. @pytest.mark.asyncio
  396. async def test_delete_rebuild_failure_prunes_chunk_tracking_before_abort(
  397. tmp_path, monkeypatch
  398. ):
  399. rag = await _build_rag(
  400. tmp_path, "delete_rebuild_failure_chunk_tracking", _deterministic_chunking
  401. )
  402. try:
  403. doc_id = "doc-delete-rebuild-failure"
  404. keep_chunk_id = "chunk-keep"
  405. drop_chunk_id = "chunk-drop"
  406. seeded = await _seed_delete_retry_state(
  407. rag,
  408. doc_id=doc_id,
  409. status_chunk_ids=[drop_chunk_id],
  410. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  411. chunk_owners={
  412. keep_chunk_id: "doc-keep",
  413. drop_chunk_id: doc_id,
  414. },
  415. )
  416. entity_a = seeded["entity_a"]
  417. entity_b = seeded["entity_b"]
  418. relation_key = seeded["relation_key"]
  419. async def fail_rebuild(**kwargs):
  420. raise RuntimeError("rebuild fail sentinel")
  421. monkeypatch.setattr(
  422. lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
  423. )
  424. result = await rag.adelete_by_doc_id(doc_id)
  425. entity_tracking = await rag.entity_chunks.get_by_id(entity_a)
  426. relation_tracking = await rag.relation_chunks.get_by_id(relation_key)
  427. failed_status = await rag.doc_status.get_by_id(doc_id)
  428. assert result.status == "fail"
  429. assert "rebuild fail sentinel" in result.message
  430. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  431. assert await rag.text_chunks.get_by_id(keep_chunk_id) is not None
  432. assert failed_status is not None
  433. assert failed_status["chunks_list"] == [drop_chunk_id]
  434. assert failed_status["metadata"]["deletion_failed"] is True
  435. assert (
  436. failed_status["metadata"]["deletion_failure_stage"]
  437. == "rebuild_knowledge_graph"
  438. )
  439. assert "rebuild fail sentinel" in failed_status["error_msg"]
  440. assert entity_tracking is not None
  441. assert entity_tracking["chunk_ids"] == [keep_chunk_id]
  442. assert entity_tracking["count"] == 1
  443. assert relation_tracking is not None
  444. assert relation_tracking["chunk_ids"] == [keep_chunk_id]
  445. assert relation_tracking["count"] == 1
  446. assert (
  447. await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
  448. is not None
  449. )
  450. finally:
  451. await rag.finalize_storages()
  452. @pytest.mark.asyncio
  453. async def test_delete_retry_succeeds_after_rebuild_failure(tmp_path, monkeypatch):
  454. rag = await _build_rag(
  455. tmp_path, "delete_retry_after_failure", _deterministic_chunking
  456. )
  457. try:
  458. doc_id = "doc-delete-retry-success"
  459. keep_chunk_id = "chunk-keep"
  460. drop_chunk_id = "chunk-drop"
  461. seeded = await _seed_delete_retry_state(
  462. rag,
  463. doc_id=doc_id,
  464. status_chunk_ids=[drop_chunk_id],
  465. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  466. chunk_owners={
  467. keep_chunk_id: "doc-keep",
  468. drop_chunk_id: doc_id,
  469. },
  470. )
  471. entity_a = seeded["entity_a"]
  472. entity_b = seeded["entity_b"]
  473. relation_key = seeded["relation_key"]
  474. async def fail_rebuild(**kwargs):
  475. raise RuntimeError("rebuild fail sentinel")
  476. monkeypatch.setattr(
  477. lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
  478. )
  479. first_result = await rag.adelete_by_doc_id(doc_id)
  480. assert first_result.status == "fail"
  481. monkeypatch.setattr(
  482. lightrag_module,
  483. "rebuild_knowledge_from_chunks",
  484. _succeed_rebuild_from_remaining_chunks,
  485. )
  486. second_result = await rag.adelete_by_doc_id(doc_id)
  487. assert second_result.status == "success"
  488. assert await rag.doc_status.get_by_id(doc_id) is None
  489. assert await rag.full_docs.get_by_id(doc_id) is None
  490. assert await rag.full_entities.get_by_id(doc_id) is None
  491. assert await rag.full_relations.get_by_id(doc_id) is None
  492. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  493. assert await rag.text_chunks.get_by_id(keep_chunk_id) is not None
  494. entity_a_tracking = await rag.entity_chunks.get_by_id(entity_a)
  495. entity_b_tracking = await rag.entity_chunks.get_by_id(entity_b)
  496. relation_tracking = await rag.relation_chunks.get_by_id(relation_key)
  497. edge = await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
  498. assert entity_a_tracking is not None
  499. assert entity_a_tracking["chunk_ids"] == [keep_chunk_id]
  500. assert entity_b_tracking is not None
  501. assert entity_b_tracking["chunk_ids"] == [keep_chunk_id]
  502. assert relation_tracking is not None
  503. assert relation_tracking["chunk_ids"] == [keep_chunk_id]
  504. assert edge is not None
  505. assert edge["source_id"] == keep_chunk_id
  506. finally:
  507. await rag.finalize_storages()
  508. @pytest.mark.asyncio
  509. async def test_delete_retry_cleans_llm_cache_after_rebuild_failure(
  510. tmp_path, monkeypatch
  511. ):
  512. rag = await _build_rag(
  513. tmp_path, "delete_retry_cleans_llm_cache", _deterministic_chunking
  514. )
  515. try:
  516. doc_id = "doc-delete-retry-cache-cleanup"
  517. keep_chunk_id = "chunk-keep"
  518. drop_chunk_id = "chunk-drop"
  519. seeded = await _seed_delete_retry_state(
  520. rag,
  521. doc_id=doc_id,
  522. status_chunk_ids=[drop_chunk_id],
  523. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  524. chunk_owners={
  525. keep_chunk_id: "doc-keep",
  526. drop_chunk_id: doc_id,
  527. },
  528. )
  529. entity_a = seeded["entity_a"]
  530. entity_b = seeded["entity_b"]
  531. cache_ids = await _seed_chunk_cache_entries(rag, [drop_chunk_id], "retry")
  532. async def fail_rebuild(**kwargs):
  533. raise RuntimeError("rebuild fail sentinel")
  534. monkeypatch.setattr(
  535. lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
  536. )
  537. first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  538. assert first_result.status == "fail"
  539. failed_status = await rag.doc_status.get_by_id(doc_id)
  540. assert failed_status is not None
  541. assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
  542. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  543. monkeypatch.setattr(
  544. lightrag_module,
  545. "rebuild_knowledge_from_chunks",
  546. _succeed_rebuild_from_remaining_chunks,
  547. )
  548. second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  549. assert second_result.status == "success"
  550. assert await rag.doc_status.get_by_id(doc_id) is None
  551. assert await rag.full_docs.get_by_id(doc_id) is None
  552. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
  553. edge = await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
  554. assert edge is not None
  555. assert edge["source_id"] == keep_chunk_id
  556. finally:
  557. await rag.finalize_storages()
  558. @pytest.mark.asyncio
  559. async def test_delete_retry_cleans_llm_cache_when_enabled_on_retry(
  560. tmp_path, monkeypatch
  561. ):
  562. rag = await _build_rag(
  563. tmp_path, "delete_retry_cache_cleanup_flag_change", _deterministic_chunking
  564. )
  565. try:
  566. doc_id = "doc-delete-retry-cache-flag-change"
  567. keep_chunk_id = "chunk-keep"
  568. drop_chunk_id = "chunk-drop"
  569. await _seed_delete_retry_state(
  570. rag,
  571. doc_id=doc_id,
  572. status_chunk_ids=[drop_chunk_id],
  573. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  574. chunk_owners={
  575. keep_chunk_id: "doc-keep",
  576. drop_chunk_id: doc_id,
  577. },
  578. )
  579. cache_ids = await _seed_chunk_cache_entries(rag, [drop_chunk_id], "retry-flag")
  580. async def fail_rebuild(**kwargs):
  581. raise RuntimeError("rebuild fail sentinel")
  582. monkeypatch.setattr(
  583. lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
  584. )
  585. first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=False)
  586. assert first_result.status == "fail"
  587. failed_status = await rag.doc_status.get_by_id(doc_id)
  588. assert failed_status is not None
  589. assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
  590. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  591. monkeypatch.setattr(
  592. lightrag_module,
  593. "rebuild_knowledge_from_chunks",
  594. _succeed_rebuild_from_remaining_chunks,
  595. )
  596. second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  597. assert second_result.status == "success"
  598. assert await rag.doc_status.get_by_id(doc_id) is None
  599. assert await rag.full_docs.get_by_id(doc_id) is None
  600. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
  601. finally:
  602. await rag.finalize_storages()
  603. @pytest.mark.asyncio
  604. async def test_delete_retry_collects_cache_ids_without_cache_storage(
  605. tmp_path, monkeypatch
  606. ):
  607. rag = await _build_rag(
  608. tmp_path,
  609. "delete_retry_collect_cache_ids_without_storage",
  610. _deterministic_chunking,
  611. )
  612. try:
  613. doc_id = "doc-delete-retry-collect-cache-ids"
  614. keep_chunk_id = "chunk-keep"
  615. drop_chunk_id = "chunk-drop"
  616. await _seed_delete_retry_state(
  617. rag,
  618. doc_id=doc_id,
  619. status_chunk_ids=[drop_chunk_id],
  620. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  621. chunk_owners={
  622. keep_chunk_id: "doc-keep",
  623. drop_chunk_id: doc_id,
  624. },
  625. )
  626. cache_ids = await _seed_chunk_cache_entries(
  627. rag, [drop_chunk_id], "collect-without-storage"
  628. )
  629. async def fail_rebuild(**kwargs):
  630. raise RuntimeError("rebuild fail sentinel")
  631. cache_storage = rag.llm_response_cache
  632. rag.llm_response_cache = None
  633. monkeypatch.setattr(
  634. lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
  635. )
  636. first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=False)
  637. assert first_result.status == "fail"
  638. failed_status = await rag.doc_status.get_by_id(doc_id)
  639. assert failed_status is not None
  640. assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
  641. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  642. rag.llm_response_cache = cache_storage
  643. monkeypatch.setattr(
  644. lightrag_module,
  645. "rebuild_knowledge_from_chunks",
  646. _succeed_rebuild_from_remaining_chunks,
  647. )
  648. second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  649. assert second_result.status == "success"
  650. assert await rag.doc_status.get_by_id(doc_id) is None
  651. assert await rag.full_docs.get_by_id(doc_id) is None
  652. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
  653. finally:
  654. await rag.finalize_storages()
  655. @pytest.mark.asyncio
  656. async def test_delete_retry_succeeds_after_llm_cache_cleanup_failure(
  657. tmp_path, monkeypatch
  658. ):
  659. rag = await _build_rag(
  660. tmp_path, "delete_retry_after_cache_cleanup_failure", _deterministic_chunking
  661. )
  662. try:
  663. doc_id = "doc-delete-cache-cleanup-failure"
  664. drop_chunk_id = "chunk-drop"
  665. await _seed_delete_retry_state(
  666. rag,
  667. doc_id=doc_id,
  668. status_chunk_ids=[drop_chunk_id],
  669. tracking_chunk_ids=[drop_chunk_id],
  670. chunk_owners={drop_chunk_id: doc_id},
  671. )
  672. cache_ids = await _seed_chunk_cache_entries(
  673. rag, [drop_chunk_id], "cache-cleanup-failure"
  674. )
  675. async def fail_cache_delete(self, ids):
  676. raise RuntimeError("llm cache delete fail sentinel")
  677. monkeypatch.setattr(
  678. rag.llm_response_cache,
  679. "delete",
  680. MethodType(fail_cache_delete, rag.llm_response_cache),
  681. )
  682. first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  683. assert first_result.status == "fail"
  684. assert "Failed to delete LLM cache" in first_result.message
  685. assert await rag.doc_status.get_by_id(doc_id) is not None
  686. assert await rag.full_docs.get_by_id(doc_id) is not None
  687. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is not None
  688. monkeypatch.undo()
  689. second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  690. assert second_result.status == "success"
  691. assert await rag.doc_status.get_by_id(doc_id) is None
  692. assert await rag.full_docs.get_by_id(doc_id) is None
  693. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
  694. finally:
  695. await rag.finalize_storages()
  696. @pytest.mark.asyncio
  697. async def test_delete_retry_succeeds_after_silent_llm_cache_cleanup_failure(
  698. tmp_path, monkeypatch
  699. ):
  700. rag = await _build_rag(
  701. tmp_path,
  702. "delete_retry_after_silent_cache_cleanup_failure",
  703. _deterministic_chunking,
  704. )
  705. try:
  706. doc_id = "doc-delete-silent-cache-cleanup-failure"
  707. drop_chunk_id = "chunk-drop"
  708. await _seed_delete_retry_state(
  709. rag,
  710. doc_id=doc_id,
  711. status_chunk_ids=[drop_chunk_id],
  712. tracking_chunk_ids=[drop_chunk_id],
  713. chunk_owners={drop_chunk_id: doc_id},
  714. )
  715. cache_ids = await _seed_chunk_cache_entries(
  716. rag, [drop_chunk_id], "silent-cache-cleanup-failure"
  717. )
  718. async def silently_fail_cache_delete(self, ids):
  719. return None
  720. monkeypatch.setattr(
  721. rag.llm_response_cache,
  722. "delete",
  723. MethodType(silently_fail_cache_delete, rag.llm_response_cache),
  724. )
  725. first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  726. assert first_result.status == "fail"
  727. assert "still exist after delete" in first_result.message
  728. assert await rag.doc_status.get_by_id(doc_id) is not None
  729. assert await rag.full_docs.get_by_id(doc_id) is not None
  730. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is not None
  731. monkeypatch.undo()
  732. second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  733. assert second_result.status == "success"
  734. assert await rag.doc_status.get_by_id(doc_id) is None
  735. assert await rag.full_docs.get_by_id(doc_id) is None
  736. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
  737. finally:
  738. await rag.finalize_storages()
  739. @pytest.mark.asyncio
  740. async def test_delete_retry_preserves_cache_cleanup_state_when_cache_storage_unavailable(
  741. tmp_path,
  742. ):
  743. rag = await _build_rag(
  744. tmp_path, "delete_retry_cache_storage_unavailable", _deterministic_chunking
  745. )
  746. try:
  747. doc_id = "doc-delete-cache-storage-unavailable"
  748. drop_chunk_id = "chunk-drop"
  749. await _seed_delete_retry_state(
  750. rag,
  751. doc_id=doc_id,
  752. status_chunk_ids=[drop_chunk_id],
  753. tracking_chunk_ids=[drop_chunk_id],
  754. chunk_owners={drop_chunk_id: doc_id},
  755. )
  756. cache_ids = await _seed_chunk_cache_entries(
  757. rag, [drop_chunk_id], "cache-storage-unavailable"
  758. )
  759. status_doc = await rag.doc_status.get_by_id(doc_id)
  760. assert status_doc is not None
  761. status_doc["metadata"] = {"deletion_llm_cache_ids": cache_ids}
  762. await rag.doc_status.upsert({doc_id: status_doc})
  763. cache_storage = rag.llm_response_cache
  764. rag.llm_response_cache = None
  765. first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  766. assert first_result.status == "fail"
  767. assert "cache storage is unavailable" in first_result.message
  768. failed_status = await rag.doc_status.get_by_id(doc_id)
  769. assert failed_status is not None
  770. assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
  771. assert await rag.full_docs.get_by_id(doc_id) is not None
  772. rag.llm_response_cache = cache_storage
  773. second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
  774. assert second_result.status == "success"
  775. assert await rag.doc_status.get_by_id(doc_id) is None
  776. assert await rag.full_docs.get_by_id(doc_id) is None
  777. assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
  778. finally:
  779. await rag.finalize_storages()
  780. @pytest.mark.asyncio
  781. async def test_delete_succeeds_when_chunks_list_missing(tmp_path):
  782. rag = await _build_rag(
  783. tmp_path, "delete_missing_chunks_list_rejected", _deterministic_chunking
  784. )
  785. try:
  786. doc_id = "doc-delete-missing-chunks-list"
  787. drop_chunk_id = "chunk-drop-only"
  788. seeded = await _seed_delete_retry_state(
  789. rag,
  790. doc_id=doc_id,
  791. status_chunk_ids=[],
  792. tracking_chunk_ids=[drop_chunk_id],
  793. chunk_owners={drop_chunk_id: doc_id},
  794. )
  795. entity_a = seeded["entity_a"]
  796. entity_b = seeded["entity_b"]
  797. relation_key = seeded["relation_key"]
  798. result = await rag.adelete_by_doc_id(doc_id)
  799. assert result.status == "success"
  800. assert "without associated chunks" in result.message
  801. assert await rag.doc_status.get_by_id(doc_id) is None
  802. assert await rag.full_docs.get_by_id(doc_id) is None
  803. assert await rag.full_entities.get_by_id(doc_id) is not None
  804. assert await rag.full_relations.get_by_id(doc_id) is not None
  805. assert await rag.text_chunks.get_by_id(drop_chunk_id) is not None
  806. assert await rag.chunks_vdb.get_by_id(drop_chunk_id) is not None
  807. assert await rag.chunk_entity_relation_graph.get_node(entity_a) is not None
  808. assert await rag.chunk_entity_relation_graph.get_node(entity_b) is not None
  809. assert (
  810. await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
  811. is not None
  812. )
  813. assert await rag.entity_chunks.get_by_id(entity_a) is not None
  814. assert await rag.entity_chunks.get_by_id(entity_b) is not None
  815. assert await rag.relation_chunks.get_by_id(relation_key) is not None
  816. assert (
  817. await rag.entities_vdb.get_by_id(compute_mdhash_id(entity_a, prefix="ent-"))
  818. is not None
  819. )
  820. assert (
  821. await rag.entities_vdb.get_by_id(compute_mdhash_id(entity_b, prefix="ent-"))
  822. is not None
  823. )
  824. assert (
  825. await rag.relationships_vdb.get_by_id(
  826. compute_mdhash_id(entity_a + entity_b, prefix="rel-")
  827. )
  828. is not None
  829. )
  830. finally:
  831. await rag.finalize_storages()
  832. @pytest.mark.asyncio
  833. async def test_delete_ignores_stale_graph_source_ids_when_tracking_exists(tmp_path):
  834. rag = await _build_rag(
  835. tmp_path, "delete_ignore_stale_graph_sources", _deterministic_chunking
  836. )
  837. try:
  838. doc_id = "doc-delete-stale-graph-sources"
  839. drop_chunk_id = "chunk-drop"
  840. stale_chunk_id = "chunk-stale"
  841. seeded = await _seed_delete_retry_state(
  842. rag,
  843. doc_id=doc_id,
  844. status_chunk_ids=[drop_chunk_id],
  845. tracking_chunk_ids=[drop_chunk_id],
  846. chunk_owners={drop_chunk_id: doc_id},
  847. )
  848. entity_a = seeded["entity_a"]
  849. entity_b = seeded["entity_b"]
  850. relation_key = seeded["relation_key"]
  851. stale_source_id = GRAPH_FIELD_SEP.join([stale_chunk_id, drop_chunk_id])
  852. await rag.chunk_entity_relation_graph.upsert_node(
  853. entity_a,
  854. {
  855. "entity_id": entity_a,
  856. "source_id": stale_source_id,
  857. "description": f"{entity_a} description",
  858. "entity_type": "test",
  859. "file_path": "delete_retry.txt",
  860. "created_at": 1,
  861. "truncate": "",
  862. },
  863. )
  864. await rag.chunk_entity_relation_graph.upsert_node(
  865. entity_b,
  866. {
  867. "entity_id": entity_b,
  868. "source_id": stale_source_id,
  869. "description": f"{entity_b} description",
  870. "entity_type": "test",
  871. "file_path": "delete_retry.txt",
  872. "created_at": 1,
  873. "truncate": "",
  874. },
  875. )
  876. await rag.chunk_entity_relation_graph.upsert_edge(
  877. entity_a,
  878. entity_b,
  879. {
  880. "source": entity_a,
  881. "target": entity_b,
  882. "source_id": stale_source_id,
  883. "description": "related",
  884. "keywords": "test",
  885. "weight": 1.0,
  886. "file_path": "delete_retry.txt",
  887. },
  888. )
  889. result = await rag.adelete_by_doc_id(doc_id)
  890. assert result.status == "success"
  891. assert await rag.doc_status.get_by_id(doc_id) is None
  892. assert await rag.full_docs.get_by_id(doc_id) is None
  893. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  894. assert await rag.chunk_entity_relation_graph.get_node(entity_a) is None
  895. assert await rag.chunk_entity_relation_graph.get_node(entity_b) is None
  896. assert (
  897. await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b) is None
  898. )
  899. assert await rag.entity_chunks.get_by_id(entity_a) is None
  900. assert await rag.entity_chunks.get_by_id(entity_b) is None
  901. assert await rag.relation_chunks.get_by_id(relation_key) is None
  902. finally:
  903. await rag.finalize_storages()
  904. @pytest.mark.asyncio
  905. async def test_validate_and_fix_consistency_preserves_chunks_on_reset(tmp_path):
  906. rag = await _build_rag(tmp_path, "reset_preserve_chunks", _deterministic_chunking)
  907. try:
  908. failed_doc_id = "doc-failed-reset"
  909. processing_doc_id = "doc-processing-reset"
  910. inferred_count_doc_id = "doc-inferred-count-reset"
  911. now = datetime.now(timezone.utc).isoformat()
  912. await rag.full_docs.upsert(
  913. {
  914. failed_doc_id: {"content": "failed doc", "file_path": "failed.txt"},
  915. processing_doc_id: {
  916. "content": "processing doc",
  917. "file_path": "processing.txt",
  918. },
  919. inferred_count_doc_id: {
  920. "content": "inferred count doc",
  921. "file_path": "inferred.txt",
  922. },
  923. }
  924. )
  925. await rag.doc_status.upsert(
  926. {
  927. failed_doc_id: {
  928. "status": DocStatus.FAILED,
  929. "content_summary": "failed",
  930. "content_length": 10,
  931. "chunks_count": 2,
  932. "chunks_list": ["f-1", "f-2"],
  933. "created_at": now,
  934. "updated_at": now,
  935. "file_path": "failed.txt",
  936. "track_id": "track-1",
  937. "error_msg": "old error",
  938. "metadata": {"old": True},
  939. },
  940. processing_doc_id: {
  941. "status": DocStatus.PROCESSING,
  942. "content_summary": "processing",
  943. "content_length": 12,
  944. "chunks_count": 1,
  945. "chunks_list": ["p-1"],
  946. "created_at": now,
  947. "updated_at": now,
  948. "file_path": "processing.txt",
  949. "track_id": "track-2",
  950. "error_msg": "old error",
  951. "metadata": {"old": True},
  952. },
  953. inferred_count_doc_id: {
  954. "status": DocStatus.FAILED,
  955. "content_summary": "inferred",
  956. "content_length": 14,
  957. "chunks_list": ["i-1", "i-2", "i-3"],
  958. "created_at": now,
  959. "updated_at": now,
  960. "file_path": "inferred.txt",
  961. "track_id": "track-3",
  962. "error_msg": "old error",
  963. "metadata": {"old": True},
  964. },
  965. }
  966. )
  967. failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
  968. processing_docs = await rag.doc_status.get_docs_by_status(DocStatus.PROCESSING)
  969. to_process_docs = {**failed_docs, **processing_docs}
  970. pipeline_status = {"latest_message": "", "history_messages": []}
  971. await rag._validate_and_fix_document_consistency(
  972. to_process_docs=to_process_docs,
  973. pipeline_status=pipeline_status,
  974. pipeline_status_lock=asyncio.Lock(),
  975. )
  976. failed_reset = await rag.doc_status.get_by_id(failed_doc_id)
  977. assert failed_reset is not None
  978. assert _status_to_text(failed_reset["status"]) == "pending"
  979. assert failed_reset.get("chunks_list") == ["f-1", "f-2"]
  980. assert failed_reset.get("chunks_count") == 2
  981. processing_reset = await rag.doc_status.get_by_id(processing_doc_id)
  982. assert processing_reset is not None
  983. assert _status_to_text(processing_reset["status"]) == "pending"
  984. assert processing_reset.get("chunks_list") == ["p-1"]
  985. assert processing_reset.get("chunks_count") == 1
  986. inferred_count_reset = await rag.doc_status.get_by_id(inferred_count_doc_id)
  987. assert inferred_count_reset is not None
  988. assert _status_to_text(inferred_count_reset["status"]) == "pending"
  989. assert inferred_count_reset.get("chunks_list") == ["i-1", "i-2", "i-3"]
  990. assert inferred_count_reset.get("chunks_count") == 3
  991. finally:
  992. await rag.finalize_storages()
  993. @pytest.mark.asyncio
  994. async def test_validate_and_fix_consistency_repairs_unknown_file_path_from_full_docs(
  995. tmp_path,
  996. ):
  997. rag = await _build_rag(tmp_path, "repair_reset_file_path", _deterministic_chunking)
  998. try:
  999. doc_id = "doc-repair-reset"
  1000. now = datetime.now(timezone.utc).isoformat()
  1001. await rag.full_docs.upsert(
  1002. {
  1003. doc_id: {
  1004. "content": "repair path doc",
  1005. "file_path": "repaired-from-full-docs.md",
  1006. }
  1007. }
  1008. )
  1009. await rag.doc_status.upsert(
  1010. {
  1011. doc_id: {
  1012. "status": DocStatus.FAILED,
  1013. "content_summary": "repair path",
  1014. "content_length": 15,
  1015. "chunks_count": 0,
  1016. "chunks_list": [],
  1017. "created_at": now,
  1018. "updated_at": now,
  1019. "file_path": "unknown_source",
  1020. "track_id": "track-repair",
  1021. "error_msg": "old error",
  1022. "metadata": {"old": True},
  1023. }
  1024. }
  1025. )
  1026. failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
  1027. pipeline_status = {"latest_message": "", "history_messages": []}
  1028. await rag._validate_and_fix_document_consistency(
  1029. to_process_docs=failed_docs,
  1030. pipeline_status=pipeline_status,
  1031. pipeline_status_lock=asyncio.Lock(),
  1032. )
  1033. repaired_status = await rag.doc_status.get_by_id(doc_id)
  1034. assert repaired_status is not None
  1035. assert _status_to_text(repaired_status["status"]) == "pending"
  1036. assert repaired_status["file_path"] == "repaired-from-full-docs.md"
  1037. finally:
  1038. await rag.finalize_storages()
  1039. @pytest.mark.asyncio
  1040. async def test_pipeline_cancellation_preserves_file_path_for_queued_docs(
  1041. tmp_path, monkeypatch
  1042. ):
  1043. rag = await _build_rag(
  1044. tmp_path,
  1045. "cancel_preserve_file_path",
  1046. _deterministic_chunking,
  1047. max_parallel_insert=1,
  1048. )
  1049. try:
  1050. contents = ["first queued content", "second queued content"]
  1051. file_paths = ["first.md", "second.md"]
  1052. await rag.apipeline_enqueue_documents(input=contents, file_paths=file_paths)
  1053. extraction_started = asyncio.Event()
  1054. release_first_doc = asyncio.Event()
  1055. async def _blocking_extract(
  1056. self, chunks, pipeline_status, pipeline_status_lock
  1057. ):
  1058. extraction_started.set()
  1059. await release_first_doc.wait()
  1060. return []
  1061. monkeypatch.setattr(
  1062. rag,
  1063. "_process_extract_entities",
  1064. MethodType(_blocking_extract, rag),
  1065. )
  1066. pipeline_task = asyncio.create_task(rag.apipeline_process_enqueue_documents())
  1067. await asyncio.wait_for(extraction_started.wait(), timeout=5)
  1068. pipeline_status = await get_namespace_data(
  1069. "pipeline_status", workspace=rag.workspace
  1070. )
  1071. pipeline_status_lock = get_namespace_lock(
  1072. "pipeline_status", workspace=rag.workspace
  1073. )
  1074. async with pipeline_status_lock:
  1075. pipeline_status["cancellation_requested"] = True
  1076. release_first_doc.set()
  1077. await asyncio.wait_for(pipeline_task, timeout=5)
  1078. second_doc_id = compute_mdhash_id(file_paths[1], prefix="doc-")
  1079. second_status = await rag.doc_status.get_by_id(second_doc_id)
  1080. assert second_status is not None
  1081. assert _status_to_text(second_status["status"]) == "failed"
  1082. assert second_status["file_path"] == "second.md"
  1083. assert second_status["error_msg"] == "User cancelled"
  1084. finally:
  1085. await rag.finalize_storages()
  1086. @pytest.mark.asyncio
  1087. async def test_pipeline_cancellation_repairs_placeholder_file_path_for_queued_docs(
  1088. tmp_path, monkeypatch
  1089. ):
  1090. rag = await _build_rag(
  1091. tmp_path,
  1092. "cancel_repair_placeholder_file_path",
  1093. _deterministic_chunking,
  1094. max_parallel_insert=1,
  1095. )
  1096. try:
  1097. contents = ["first queued content", "second queued content"]
  1098. file_paths = ["first.md", "second.md"]
  1099. await rag.apipeline_enqueue_documents(input=contents, file_paths=file_paths)
  1100. second_doc_id = compute_mdhash_id(file_paths[1], prefix="doc-")
  1101. second_status = await rag.doc_status.get_by_id(second_doc_id)
  1102. assert second_status is not None
  1103. second_status["file_path"] = "unknown_source"
  1104. await rag.doc_status.upsert({second_doc_id: second_status})
  1105. extraction_started = asyncio.Event()
  1106. release_first_doc = asyncio.Event()
  1107. async def _blocking_extract(
  1108. self, chunks, pipeline_status, pipeline_status_lock
  1109. ):
  1110. extraction_started.set()
  1111. await release_first_doc.wait()
  1112. return []
  1113. monkeypatch.setattr(
  1114. rag,
  1115. "_process_extract_entities",
  1116. MethodType(_blocking_extract, rag),
  1117. )
  1118. pipeline_task = asyncio.create_task(rag.apipeline_process_enqueue_documents())
  1119. await asyncio.wait_for(extraction_started.wait(), timeout=5)
  1120. pipeline_status = await get_namespace_data(
  1121. "pipeline_status", workspace=rag.workspace
  1122. )
  1123. pipeline_status_lock = get_namespace_lock(
  1124. "pipeline_status", workspace=rag.workspace
  1125. )
  1126. async with pipeline_status_lock:
  1127. pipeline_status["cancellation_requested"] = True
  1128. release_first_doc.set()
  1129. await asyncio.wait_for(pipeline_task, timeout=5)
  1130. repaired_status = await rag.doc_status.get_by_id(second_doc_id)
  1131. assert repaired_status is not None
  1132. assert _status_to_text(repaired_status["status"]) == "failed"
  1133. assert repaired_status["file_path"] == "second.md"
  1134. assert repaired_status["error_msg"] == "User cancelled"
  1135. finally:
  1136. await rag.finalize_storages()
  1137. @pytest.mark.asyncio
  1138. async def test_delete_doc_entries_guard_prevents_zombie_record(tmp_path, monkeypatch):
  1139. """When doc_status.delete fails, the guard must not re-create a zombie record.
  1140. The exception handler skips _update_delete_retry_state when deletion_stage is
  1141. "delete_doc_entries". doc_status.delete runs first, so full_docs is still intact
  1142. on failure. This test confirms: (a) the result is status="fail", and (b) the
  1143. exception handler does not upsert a new record with deletion_failed=True.
  1144. """
  1145. rag = await _build_rag(
  1146. tmp_path, "delete_doc_entries_guard", _deterministic_chunking
  1147. )
  1148. try:
  1149. doc_id = "doc-delete-entries-guard"
  1150. drop_chunk_id = "chunk-drop"
  1151. await _seed_delete_retry_state(
  1152. rag,
  1153. doc_id=doc_id,
  1154. status_chunk_ids=[drop_chunk_id],
  1155. tracking_chunk_ids=[drop_chunk_id],
  1156. chunk_owners={drop_chunk_id: doc_id},
  1157. )
  1158. original_delete = rag.doc_status.delete
  1159. call_count = 0
  1160. async def fail_doc_status_delete(ids):
  1161. nonlocal call_count
  1162. call_count += 1
  1163. raise RuntimeError("doc_status delete fail sentinel")
  1164. monkeypatch.setattr(rag.doc_status, "delete", fail_doc_status_delete)
  1165. result = await rag.adelete_by_doc_id(doc_id)
  1166. monkeypatch.setattr(rag.doc_status, "delete", original_delete)
  1167. assert result.status == "fail"
  1168. assert "doc_status delete fail sentinel" in result.message
  1169. # doc_status.delete is now called first (before full_docs.delete). The patch
  1170. # makes it fail, so doc_status still exists. The guard must not call
  1171. # _update_delete_retry_state (which would upsert the record with
  1172. # deletion_failed=True) when deletion_stage is "delete_doc_entries".
  1173. status_record = await rag.doc_status.get_by_id(doc_id)
  1174. assert (
  1175. status_record is not None
  1176. ), "doc_status should still exist (delete failed)"
  1177. metadata = status_record.get("metadata", {})
  1178. assert not metadata.get(
  1179. "deletion_failed"
  1180. ), "guard failed: zombie record written with deletion_failed=True"
  1181. finally:
  1182. await rag.finalize_storages()
  1183. @pytest.mark.asyncio
  1184. async def test_retry_state_write_failure_in_exception_handler_still_returns_fail(
  1185. tmp_path, monkeypatch
  1186. ):
  1187. """If _update_delete_retry_state itself fails inside the exception handler,
  1188. the caller must still receive a well-formed DeletionResult with status="fail"
  1189. and the original error message — not an unhandled exception.
  1190. """
  1191. rag = await _build_rag(
  1192. tmp_path, "retry_state_write_failure", _deterministic_chunking
  1193. )
  1194. try:
  1195. doc_id = "doc-retry-state-write-fail"
  1196. keep_chunk_id = "chunk-keep"
  1197. drop_chunk_id = "chunk-drop"
  1198. # Include a keep_chunk_id so entities survive and rebuild is triggered
  1199. await _seed_delete_retry_state(
  1200. rag,
  1201. doc_id=doc_id,
  1202. status_chunk_ids=[drop_chunk_id],
  1203. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  1204. chunk_owners={
  1205. keep_chunk_id: "doc-keep",
  1206. drop_chunk_id: doc_id,
  1207. },
  1208. )
  1209. async def fail_rebuild(**kwargs):
  1210. raise RuntimeError("rebuild fail sentinel")
  1211. async def fail_upsert(data):
  1212. raise RuntimeError("doc_status upsert fail sentinel")
  1213. monkeypatch.setattr(
  1214. lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
  1215. )
  1216. monkeypatch.setattr(rag.doc_status, "upsert", fail_upsert)
  1217. result = await rag.adelete_by_doc_id(doc_id)
  1218. assert result.status == "fail"
  1219. # The original rebuild error must be present in the message; the compound
  1220. # "Additionally, failed to persist retry state" suffix may also be present.
  1221. assert "rebuild fail sentinel" in result.message
  1222. finally:
  1223. await rag.finalize_storages()
  1224. @pytest.mark.asyncio
  1225. async def test_persist_pre_rebuild_failure_records_stage_and_allows_retry(
  1226. tmp_path, monkeypatch
  1227. ):
  1228. """A failure in _insert_done (persist_pre_rebuild_changes stage) records the
  1229. correct stage in doc_status metadata, and a subsequent retry completes the
  1230. deletion successfully.
  1231. """
  1232. rag = await _build_rag(
  1233. tmp_path, "persist_pre_rebuild_failure", _deterministic_chunking
  1234. )
  1235. try:
  1236. doc_id = "doc-persist-pre-rebuild-failure"
  1237. keep_chunk_id = "chunk-keep"
  1238. drop_chunk_id = "chunk-drop"
  1239. seeded = await _seed_delete_retry_state(
  1240. rag,
  1241. doc_id=doc_id,
  1242. status_chunk_ids=[drop_chunk_id],
  1243. tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
  1244. chunk_owners={
  1245. keep_chunk_id: "doc-keep",
  1246. drop_chunk_id: doc_id,
  1247. },
  1248. )
  1249. entity_a = seeded["entity_a"]
  1250. # entity_b = seeded["entity_b"]
  1251. insert_done_calls = 0
  1252. original_insert_done = rag._insert_done
  1253. async def fail_first_insert_done():
  1254. nonlocal insert_done_calls
  1255. insert_done_calls += 1
  1256. if insert_done_calls == 1:
  1257. raise RuntimeError("insert_done fail sentinel")
  1258. await original_insert_done()
  1259. monkeypatch.setattr(rag, "_insert_done", fail_first_insert_done)
  1260. first_result = await rag.adelete_by_doc_id(doc_id)
  1261. failed_status = await rag.doc_status.get_by_id(doc_id)
  1262. assert first_result.status == "fail"
  1263. assert "insert_done fail sentinel" in first_result.message
  1264. assert failed_status is not None
  1265. assert failed_status["metadata"]["deletion_failed"] is True
  1266. assert (
  1267. failed_status["metadata"]["deletion_failure_stage"]
  1268. == "persist_pre_rebuild_changes"
  1269. )
  1270. monkeypatch.undo()
  1271. monkeypatch.setattr(
  1272. lightrag_module,
  1273. "rebuild_knowledge_from_chunks",
  1274. _succeed_rebuild_from_remaining_chunks,
  1275. )
  1276. second_result = await rag.adelete_by_doc_id(doc_id)
  1277. assert second_result.status == "success"
  1278. assert await rag.doc_status.get_by_id(doc_id) is None
  1279. assert await rag.full_docs.get_by_id(doc_id) is None
  1280. assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
  1281. assert await rag.text_chunks.get_by_id(keep_chunk_id) is not None
  1282. entity_a_tracking = await rag.entity_chunks.get_by_id(entity_a)
  1283. assert entity_a_tracking is not None
  1284. assert entity_a_tracking["chunk_ids"] == [keep_chunk_id]
  1285. finally:
  1286. await rag.finalize_storages()
  1287. @pytest.mark.asyncio
  1288. async def test_delete_doc_graph_metadata_failure_records_stage_and_allows_retry(
  1289. tmp_path, monkeypatch
  1290. ):
  1291. """A failure in full_relations.delete (delete_doc_graph_metadata stage) records
  1292. the correct stage in retry metadata, and a subsequent retry completes the deletion.
  1293. """
  1294. rag = await _build_rag(
  1295. tmp_path, "delete_doc_graph_metadata_failure", _deterministic_chunking
  1296. )
  1297. try:
  1298. doc_id = "doc-graph-metadata-failure"
  1299. drop_chunk_id = "chunk-drop"
  1300. await _seed_delete_retry_state(
  1301. rag,
  1302. doc_id=doc_id,
  1303. status_chunk_ids=[drop_chunk_id],
  1304. tracking_chunk_ids=[drop_chunk_id],
  1305. chunk_owners={drop_chunk_id: doc_id},
  1306. )
  1307. relations_delete_calls = 0
  1308. original_relations_delete = rag.full_relations.delete
  1309. async def fail_first_relations_delete(ids):
  1310. nonlocal relations_delete_calls
  1311. relations_delete_calls += 1
  1312. if relations_delete_calls == 1:
  1313. raise RuntimeError("full_relations delete fail sentinel")
  1314. await original_relations_delete(ids)
  1315. monkeypatch.setattr(rag.full_relations, "delete", fail_first_relations_delete)
  1316. first_result = await rag.adelete_by_doc_id(doc_id)
  1317. failed_status = await rag.doc_status.get_by_id(doc_id)
  1318. assert first_result.status == "fail"
  1319. assert "full_relations delete fail sentinel" in first_result.message
  1320. assert failed_status is not None
  1321. assert failed_status["metadata"]["deletion_failed"] is True
  1322. assert (
  1323. failed_status["metadata"]["deletion_failure_stage"]
  1324. == "delete_doc_graph_metadata"
  1325. )
  1326. monkeypatch.undo()
  1327. second_result = await rag.adelete_by_doc_id(doc_id)
  1328. assert second_result.status == "success"
  1329. assert await rag.doc_status.get_by_id(doc_id) is None
  1330. assert await rag.full_docs.get_by_id(doc_id) is None
  1331. assert await rag.full_entities.get_by_id(doc_id) is None
  1332. assert await rag.full_relations.get_by_id(doc_id) is None
  1333. finally:
  1334. await rag.finalize_storages()
  1335. @pytest.mark.asyncio
  1336. async def test_deletion_fully_completed_prevents_success_override_in_finally(
  1337. tmp_path, monkeypatch
  1338. ):
  1339. """When deletion completes successfully but _insert_done fails in the finally
  1340. block, the already-returned success result must not be overridden with a failure.
  1341. This covers both the no-chunk path and the full deletion path.
  1342. """
  1343. for scenario in ("no_chunk", "full"):
  1344. rag = await _build_rag(
  1345. tmp_path,
  1346. f"deletion_fully_completed_{scenario}",
  1347. _deterministic_chunking,
  1348. )
  1349. try:
  1350. doc_id = f"doc-fully-completed-{scenario}"
  1351. if scenario == "no_chunk":
  1352. # Seed a doc with no chunks: doc_status.chunks_list is empty.
  1353. now = datetime.now(timezone.utc).isoformat()
  1354. await rag.full_docs.upsert(
  1355. {doc_id: {"content": "no chunk doc", "file_path": "nc.txt"}}
  1356. )
  1357. await rag.doc_status.upsert(
  1358. {
  1359. doc_id: {
  1360. "status": DocStatus.PROCESSED,
  1361. "content_summary": "no chunks",
  1362. "content_length": 11,
  1363. "chunks_count": 0,
  1364. "chunks_list": [],
  1365. "created_at": now,
  1366. "updated_at": now,
  1367. "file_path": "nc.txt",
  1368. "track_id": f"track-{doc_id}",
  1369. "error_msg": "",
  1370. "metadata": {},
  1371. }
  1372. }
  1373. )
  1374. else:
  1375. drop_chunk_id = "chunk-drop-fc"
  1376. await _seed_delete_retry_state(
  1377. rag,
  1378. doc_id=doc_id,
  1379. status_chunk_ids=[drop_chunk_id],
  1380. tracking_chunk_ids=[drop_chunk_id],
  1381. chunk_owners={drop_chunk_id: doc_id},
  1382. )
  1383. insert_done_calls = 0
  1384. original_insert_done = rag._insert_done
  1385. async def fail_later_insert_done():
  1386. nonlocal insert_done_calls
  1387. insert_done_calls += 1
  1388. # Let the first call (persist_pre_rebuild_changes) succeed for the
  1389. # full path; only fail the finally-block call.
  1390. if insert_done_calls <= (1 if scenario == "full" else 0):
  1391. await original_insert_done()
  1392. else:
  1393. raise RuntimeError("finally insert_done fail sentinel")
  1394. monkeypatch.setattr(rag, "_insert_done", fail_later_insert_done)
  1395. result = await rag.adelete_by_doc_id(doc_id)
  1396. assert (
  1397. result.status == "success"
  1398. ), f"[{scenario}] expected success but got {result.status}: {result.message}"
  1399. finally:
  1400. monkeypatch.undo()
  1401. await rag.finalize_storages()