| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601 |
- import asyncio
- from datetime import datetime, timezone
- from types import MethodType
- from uuid import uuid4
- import numpy as np
- import pytest
- import lightrag.lightrag as lightrag_module
- import lightrag.pipeline as pipeline_module
- from lightrag.base import DocStatus
- from lightrag.constants import GRAPH_FIELD_SEP
- from lightrag.kg.shared_storage import get_namespace_data, get_namespace_lock
- from lightrag.lightrag import LightRAG
- from lightrag.utils import (
- EmbeddingFunc,
- Tokenizer,
- compute_mdhash_id,
- make_relation_chunk_key,
- )
- pytestmark = pytest.mark.offline
- class _SimpleTokenizerImpl:
- def encode(self, content: str) -> list[int]:
- return [ord(ch) for ch in content]
- def decode(self, tokens: list[int]) -> str:
- return "".join(chr(t) for t in tokens)
- async def _dummy_embedding(texts: list[str]) -> np.ndarray:
- return np.ones((len(texts), 8), dtype=float)
- async def _dummy_llm(*args, **kwargs) -> str:
- return "ok"
- def _deterministic_chunking(
- tokenizer,
- content: str,
- split_by_character,
- split_by_character_only: bool,
- chunk_overlap_token_size: int,
- chunk_token_size: int,
- ) -> list[dict]:
- return [
- {"tokens": 1, "content": f"{content}::chunk1", "chunk_order_index": 0},
- {"tokens": 1, "content": f"{content}::chunk2", "chunk_order_index": 1},
- ]
- def _failing_chunking(
- tokenizer,
- content: str,
- split_by_character,
- split_by_character_only: bool,
- chunk_overlap_token_size: int,
- chunk_token_size: int,
- ) -> list[dict]:
- raise RuntimeError("chunking fail sentinel")
- def _status_to_text(status: object) -> str:
- if isinstance(status, DocStatus):
- return status.value
- return str(status).replace("DocStatus.", "").lower()
- async def _build_rag(
- tmp_path,
- test_name: str,
- chunking_func,
- *,
- max_parallel_insert: int = 1,
- ) -> LightRAG:
- workspace = f"{test_name}_{uuid4().hex[:8]}"
- rag = LightRAG(
- working_dir=str(tmp_path / test_name),
- workspace=workspace,
- llm_model_func=_dummy_llm,
- embedding_func=EmbeddingFunc(
- embedding_dim=8,
- max_token_size=8192,
- func=_dummy_embedding,
- ),
- tokenizer=Tokenizer("test-tokenizer", _SimpleTokenizerImpl()),
- chunking_func=chunking_func,
- max_parallel_insert=max_parallel_insert,
- )
- await rag.initialize_storages()
- return rag
- async def _seed_chunk_cache_entries(
- rag: LightRAG, chunk_ids: list[str], prefix: str
- ) -> list[str]:
- updates = {}
- cache_records = {}
- cache_ids: list[str] = []
- for idx, chunk_id in enumerate(chunk_ids):
- chunk_data = await rag.text_chunks.get_by_id(chunk_id)
- assert chunk_data is not None
- cache_id = f"{prefix}-cache-{idx}"
- chunk_data["llm_cache_list"] = [cache_id]
- updates[chunk_id] = chunk_data
- cache_records[cache_id] = {"cache_type": "extract", "return": f"cached-{idx}"}
- cache_ids.append(cache_id)
- await rag.text_chunks.upsert(updates)
- await rag.llm_response_cache.upsert(cache_records)
- return cache_ids
- async def _seed_delete_retry_state(
- rag: LightRAG,
- *,
- doc_id: str,
- status_chunk_ids: list[str],
- tracking_chunk_ids: list[str],
- chunk_owners: dict[str, str],
- metadata: dict | None = None,
- ) -> dict[str, str]:
- entity_a = "ENTITY-A"
- entity_b = "ENTITY-B"
- relation_key = make_relation_chunk_key(entity_a, entity_b)
- source_id = GRAPH_FIELD_SEP.join(tracking_chunk_ids)
- now = datetime.now(timezone.utc).isoformat()
- await rag.full_docs.upsert(
- {doc_id: {"content": "delete retry state doc", "file_path": "delete_retry.txt"}}
- )
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.PROCESSED,
- "content_summary": "delete retry state",
- "content_length": 22,
- "chunks_count": len(status_chunk_ids),
- "chunks_list": status_chunk_ids,
- "created_at": now,
- "updated_at": now,
- "file_path": "delete_retry.txt",
- "track_id": f"track-{doc_id}",
- "error_msg": "",
- "metadata": metadata or {},
- }
- }
- )
- chunk_payload = {}
- for chunk_id, owner_doc_id in chunk_owners.items():
- chunk_payload[chunk_id] = {
- "content": f"{chunk_id} content",
- "file_path": f"{chunk_id}.txt",
- "full_doc_id": owner_doc_id,
- }
- if chunk_payload:
- await rag.text_chunks.upsert(chunk_payload)
- await rag.chunks_vdb.upsert(chunk_payload)
- await rag.full_entities.upsert({doc_id: {"entity_names": [entity_a, entity_b]}})
- await rag.full_relations.upsert(
- {doc_id: {"relation_pairs": [(entity_a, entity_b)]}}
- )
- await rag.entity_chunks.upsert(
- {
- entity_a: {
- "chunk_ids": tracking_chunk_ids,
- "count": len(tracking_chunk_ids),
- },
- entity_b: {
- "chunk_ids": tracking_chunk_ids,
- "count": len(tracking_chunk_ids),
- },
- }
- )
- await rag.relation_chunks.upsert(
- {
- relation_key: {
- "chunk_ids": tracking_chunk_ids,
- "count": len(tracking_chunk_ids),
- }
- }
- )
- created_at = int(datetime.now(timezone.utc).timestamp())
- for entity_name in [entity_a, entity_b]:
- await rag.chunk_entity_relation_graph.upsert_node(
- entity_name,
- {
- "entity_id": entity_name,
- "source_id": source_id,
- "description": f"{entity_name} description",
- "entity_type": "test",
- "file_path": "delete_retry.txt",
- "created_at": created_at,
- "truncate": "",
- },
- )
- await rag.chunk_entity_relation_graph.upsert_edge(
- entity_a,
- entity_b,
- {
- "source": entity_a,
- "target": entity_b,
- "source_id": source_id,
- "description": "related",
- "keywords": "test",
- "weight": 1.0,
- "file_path": "delete_retry.txt",
- },
- )
- await rag.entities_vdb.upsert(
- {
- compute_mdhash_id(entity_a, prefix="ent-"): {
- "content": f"{entity_a}\n{entity_a} description",
- "entity_name": entity_a,
- "source_id": source_id,
- "description": f"{entity_a} description",
- "entity_type": "test",
- "file_path": "delete_retry.txt",
- },
- compute_mdhash_id(entity_b, prefix="ent-"): {
- "content": f"{entity_b}\n{entity_b} description",
- "entity_name": entity_b,
- "source_id": source_id,
- "description": f"{entity_b} description",
- "entity_type": "test",
- "file_path": "delete_retry.txt",
- },
- }
- )
- await rag.relationships_vdb.upsert(
- {
- compute_mdhash_id(entity_a + entity_b, prefix="rel-"): {
- "content": f"test\t{entity_a}\n{entity_b}\nrelated",
- "src_id": entity_a,
- "tgt_id": entity_b,
- "source_id": source_id,
- "description": "related",
- "keywords": "test",
- "weight": 1.0,
- "file_path": "delete_retry.txt",
- }
- }
- )
- return {
- "entity_a": entity_a,
- "entity_b": entity_b,
- "relation_key": relation_key,
- }
- async def _succeed_rebuild_from_remaining_chunks(
- entities_to_rebuild,
- relationships_to_rebuild,
- knowledge_graph_inst,
- entities_vdb,
- relationships_vdb,
- **kwargs,
- ):
- for entity_name, remaining_chunk_ids in entities_to_rebuild.items():
- node = await knowledge_graph_inst.get_node(entity_name)
- assert node is not None
- updated_node = {
- **node,
- "source_id": GRAPH_FIELD_SEP.join(remaining_chunk_ids),
- }
- await knowledge_graph_inst.upsert_node(entity_name, updated_node)
- await entities_vdb.upsert(
- {
- compute_mdhash_id(entity_name, prefix="ent-"): {
- "content": f"{entity_name}\n{updated_node['description']}",
- "entity_name": entity_name,
- "source_id": updated_node["source_id"],
- "description": updated_node["description"],
- "entity_type": updated_node["entity_type"],
- "file_path": updated_node["file_path"],
- }
- }
- )
- for (src, tgt), remaining_chunk_ids in relationships_to_rebuild.items():
- edge = await knowledge_graph_inst.get_edge(src, tgt)
- assert edge is not None
- updated_edge = {
- **edge,
- "source_id": GRAPH_FIELD_SEP.join(remaining_chunk_ids),
- }
- await knowledge_graph_inst.upsert_edge(src, tgt, updated_edge)
- await relationships_vdb.upsert(
- {
- compute_mdhash_id(src + tgt, prefix="rel-"): {
- "content": f"{updated_edge['keywords']}\t{src}\n{tgt}\n{updated_edge['description']}",
- "src_id": src,
- "tgt_id": tgt,
- "source_id": updated_edge["source_id"],
- "description": updated_edge["description"],
- "keywords": updated_edge["keywords"],
- "weight": updated_edge["weight"],
- "file_path": updated_edge["file_path"],
- }
- }
- )
- @pytest.mark.asyncio
- async def test_extract_failure_preserves_chunks_and_allows_delete_with_cache_cleanup(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(tmp_path, "extract_failure_cleanup", _deterministic_chunking)
- try:
- content = "extract failure document"
- file_path = "extract_failure.txt"
- doc_id = compute_mdhash_id(file_path, prefix="doc-")
- await rag.apipeline_enqueue_documents(input=content, file_paths=file_path)
- async def fail_extract(self, chunks, pipeline_status, pipeline_status_lock):
- raise RuntimeError("extract fail sentinel")
- rag._process_extract_entities = MethodType(fail_extract, rag)
- await rag.apipeline_process_enqueue_documents()
- doc_status = await rag.doc_status.get_by_id(doc_id)
- assert doc_status is not None
- assert _status_to_text(doc_status["status"]) == "failed"
- chunk_ids = doc_status.get("chunks_list", [])
- assert len(chunk_ids) == 2
- assert doc_status.get("chunks_count") == 2
- cache_ids = await _seed_chunk_cache_entries(rag, chunk_ids, "extract")
- result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert result.status == "success"
- deleted_chunks = await rag.text_chunks.get_by_ids(chunk_ids)
- assert all(item is None for item in deleted_chunks)
- deleted_cache = [
- await rag.llm_response_cache.get_by_id(cid) for cid in cache_ids
- ]
- assert all(item is None for item in deleted_cache)
- assert await rag.doc_status.get_by_id(doc_id) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_extract_failure_before_chunking_clears_stale_chunk_snapshot(
- tmp_path,
- ):
- """The resume branch of ``apipeline_process_enqueue_documents`` purges
- any stale ``chunks_list`` from a previous interrupted run *before*
- chunking starts (so the new run does not mix old and new chunks).
- Therefore, when chunking subsequently fails on the retry, the failed
- doc_status reflects the post-purge state — the previous snapshot is
- intentionally not preserved any more.
- Earlier this test asserted the opposite ("preserve previous snapshot
- across failure"), which conflicted with the documented resume rule
- that "已抽取文档一律删掉所有的文本块,重新走多模态分析和实体关系提取".
- """
- rag = await _build_rag(tmp_path, "extract_failure_pre_chunking", _failing_chunking)
- try:
- content = "chunking failure document"
- file_path = "chunking_failure.txt"
- doc_id = compute_mdhash_id(file_path, prefix="doc-")
- await rag.apipeline_enqueue_documents(input=content, file_paths=file_path)
- previous_chunks = ["chunk-old-1", "chunk-old-2", "chunk-old-3"]
- existing = await rag.doc_status.get_by_id(doc_id)
- assert existing is not None
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.FAILED,
- "content_summary": existing["content_summary"],
- "content_length": existing["content_length"],
- "chunks_count": len(previous_chunks),
- "chunks_list": previous_chunks,
- "created_at": existing["created_at"],
- "updated_at": datetime.now(timezone.utc).isoformat(),
- "file_path": existing["file_path"],
- "track_id": existing["track_id"],
- "error_msg": "previous failure",
- "metadata": {"source": "test"},
- }
- }
- )
- await rag.apipeline_process_enqueue_documents()
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert failed_status is not None
- assert _status_to_text(failed_status["status"]) == "failed"
- # Resume purged the stale list before chunking; the failure record
- # therefore shows zero chunks rather than the previous snapshot.
- assert failed_status.get("chunks_list") == []
- assert failed_status.get("chunks_count") == 0
- assert "chunking fail sentinel" in (failed_status.get("error_msg") or "")
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_merge_failure_preserves_chunks_and_skip_cache_cleanup_when_disabled(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path, "merge_failure_keep_cache", _deterministic_chunking
- )
- try:
- content = "merge failure document"
- file_path = "merge_failure.txt"
- doc_id = compute_mdhash_id(file_path, prefix="doc-")
- await rag.apipeline_enqueue_documents(input=content, file_paths=file_path)
- async def ok_extract(self, chunks, pipeline_status, pipeline_status_lock):
- return {"chunk_count": len(chunks)}
- async def fail_merge(**kwargs):
- raise RuntimeError("merge fail sentinel")
- rag._process_extract_entities = MethodType(ok_extract, rag)
- monkeypatch.setattr(pipeline_module, "merge_nodes_and_edges", fail_merge)
- await rag.apipeline_process_enqueue_documents()
- doc_status = await rag.doc_status.get_by_id(doc_id)
- assert doc_status is not None
- assert _status_to_text(doc_status["status"]) == "failed"
- chunk_ids = doc_status.get("chunks_list", [])
- assert len(chunk_ids) == 2
- assert doc_status.get("chunks_count") == 2
- cache_ids = await _seed_chunk_cache_entries(rag, chunk_ids, "merge")
- result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=False)
- assert result.status == "success"
- remaining_cache = [
- await rag.llm_response_cache.get_by_id(cid) for cid in cache_ids
- ]
- assert all(item is not None for item in remaining_cache)
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_rebuild_failure_prunes_chunk_tracking_before_abort(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path, "delete_rebuild_failure_chunk_tracking", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-rebuild-failure"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- seeded = await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- entity_a = seeded["entity_a"]
- entity_b = seeded["entity_b"]
- relation_key = seeded["relation_key"]
- async def fail_rebuild(**kwargs):
- raise RuntimeError("rebuild fail sentinel")
- monkeypatch.setattr(
- lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
- )
- result = await rag.adelete_by_doc_id(doc_id)
- entity_tracking = await rag.entity_chunks.get_by_id(entity_a)
- relation_tracking = await rag.relation_chunks.get_by_id(relation_key)
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert result.status == "fail"
- assert "rebuild fail sentinel" in result.message
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- assert await rag.text_chunks.get_by_id(keep_chunk_id) is not None
- assert failed_status is not None
- assert failed_status["chunks_list"] == [drop_chunk_id]
- assert failed_status["metadata"]["deletion_failed"] is True
- assert (
- failed_status["metadata"]["deletion_failure_stage"]
- == "rebuild_knowledge_graph"
- )
- assert "rebuild fail sentinel" in failed_status["error_msg"]
- assert entity_tracking is not None
- assert entity_tracking["chunk_ids"] == [keep_chunk_id]
- assert entity_tracking["count"] == 1
- assert relation_tracking is not None
- assert relation_tracking["chunk_ids"] == [keep_chunk_id]
- assert relation_tracking["count"] == 1
- assert (
- await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
- is not None
- )
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_succeeds_after_rebuild_failure(tmp_path, monkeypatch):
- rag = await _build_rag(
- tmp_path, "delete_retry_after_failure", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-retry-success"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- seeded = await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- entity_a = seeded["entity_a"]
- entity_b = seeded["entity_b"]
- relation_key = seeded["relation_key"]
- async def fail_rebuild(**kwargs):
- raise RuntimeError("rebuild fail sentinel")
- monkeypatch.setattr(
- lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
- )
- first_result = await rag.adelete_by_doc_id(doc_id)
- assert first_result.status == "fail"
- monkeypatch.setattr(
- lightrag_module,
- "rebuild_knowledge_from_chunks",
- _succeed_rebuild_from_remaining_chunks,
- )
- second_result = await rag.adelete_by_doc_id(doc_id)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.full_entities.get_by_id(doc_id) is None
- assert await rag.full_relations.get_by_id(doc_id) is None
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- assert await rag.text_chunks.get_by_id(keep_chunk_id) is not None
- entity_a_tracking = await rag.entity_chunks.get_by_id(entity_a)
- entity_b_tracking = await rag.entity_chunks.get_by_id(entity_b)
- relation_tracking = await rag.relation_chunks.get_by_id(relation_key)
- edge = await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
- assert entity_a_tracking is not None
- assert entity_a_tracking["chunk_ids"] == [keep_chunk_id]
- assert entity_b_tracking is not None
- assert entity_b_tracking["chunk_ids"] == [keep_chunk_id]
- assert relation_tracking is not None
- assert relation_tracking["chunk_ids"] == [keep_chunk_id]
- assert edge is not None
- assert edge["source_id"] == keep_chunk_id
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_cleans_llm_cache_after_rebuild_failure(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path, "delete_retry_cleans_llm_cache", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-retry-cache-cleanup"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- seeded = await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- entity_a = seeded["entity_a"]
- entity_b = seeded["entity_b"]
- cache_ids = await _seed_chunk_cache_entries(rag, [drop_chunk_id], "retry")
- async def fail_rebuild(**kwargs):
- raise RuntimeError("rebuild fail sentinel")
- monkeypatch.setattr(
- lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
- )
- first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert first_result.status == "fail"
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert failed_status is not None
- assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- monkeypatch.setattr(
- lightrag_module,
- "rebuild_knowledge_from_chunks",
- _succeed_rebuild_from_remaining_chunks,
- )
- second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
- edge = await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
- assert edge is not None
- assert edge["source_id"] == keep_chunk_id
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_cleans_llm_cache_when_enabled_on_retry(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path, "delete_retry_cache_cleanup_flag_change", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-retry-cache-flag-change"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- cache_ids = await _seed_chunk_cache_entries(rag, [drop_chunk_id], "retry-flag")
- async def fail_rebuild(**kwargs):
- raise RuntimeError("rebuild fail sentinel")
- monkeypatch.setattr(
- lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
- )
- first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=False)
- assert first_result.status == "fail"
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert failed_status is not None
- assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- monkeypatch.setattr(
- lightrag_module,
- "rebuild_knowledge_from_chunks",
- _succeed_rebuild_from_remaining_chunks,
- )
- second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_collects_cache_ids_without_cache_storage(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path,
- "delete_retry_collect_cache_ids_without_storage",
- _deterministic_chunking,
- )
- try:
- doc_id = "doc-delete-retry-collect-cache-ids"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- cache_ids = await _seed_chunk_cache_entries(
- rag, [drop_chunk_id], "collect-without-storage"
- )
- async def fail_rebuild(**kwargs):
- raise RuntimeError("rebuild fail sentinel")
- cache_storage = rag.llm_response_cache
- rag.llm_response_cache = None
- monkeypatch.setattr(
- lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
- )
- first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=False)
- assert first_result.status == "fail"
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert failed_status is not None
- assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- rag.llm_response_cache = cache_storage
- monkeypatch.setattr(
- lightrag_module,
- "rebuild_knowledge_from_chunks",
- _succeed_rebuild_from_remaining_chunks,
- )
- second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_succeeds_after_llm_cache_cleanup_failure(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path, "delete_retry_after_cache_cleanup_failure", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-cache-cleanup-failure"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- cache_ids = await _seed_chunk_cache_entries(
- rag, [drop_chunk_id], "cache-cleanup-failure"
- )
- async def fail_cache_delete(self, ids):
- raise RuntimeError("llm cache delete fail sentinel")
- monkeypatch.setattr(
- rag.llm_response_cache,
- "delete",
- MethodType(fail_cache_delete, rag.llm_response_cache),
- )
- first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert first_result.status == "fail"
- assert "Failed to delete LLM cache" in first_result.message
- assert await rag.doc_status.get_by_id(doc_id) is not None
- assert await rag.full_docs.get_by_id(doc_id) is not None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is not None
- monkeypatch.undo()
- second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_succeeds_after_silent_llm_cache_cleanup_failure(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path,
- "delete_retry_after_silent_cache_cleanup_failure",
- _deterministic_chunking,
- )
- try:
- doc_id = "doc-delete-silent-cache-cleanup-failure"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- cache_ids = await _seed_chunk_cache_entries(
- rag, [drop_chunk_id], "silent-cache-cleanup-failure"
- )
- async def silently_fail_cache_delete(self, ids):
- return None
- monkeypatch.setattr(
- rag.llm_response_cache,
- "delete",
- MethodType(silently_fail_cache_delete, rag.llm_response_cache),
- )
- first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert first_result.status == "fail"
- assert "still exist after delete" in first_result.message
- assert await rag.doc_status.get_by_id(doc_id) is not None
- assert await rag.full_docs.get_by_id(doc_id) is not None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is not None
- monkeypatch.undo()
- second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_retry_preserves_cache_cleanup_state_when_cache_storage_unavailable(
- tmp_path,
- ):
- rag = await _build_rag(
- tmp_path, "delete_retry_cache_storage_unavailable", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-cache-storage-unavailable"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- cache_ids = await _seed_chunk_cache_entries(
- rag, [drop_chunk_id], "cache-storage-unavailable"
- )
- status_doc = await rag.doc_status.get_by_id(doc_id)
- assert status_doc is not None
- status_doc["metadata"] = {"deletion_llm_cache_ids": cache_ids}
- await rag.doc_status.upsert({doc_id: status_doc})
- cache_storage = rag.llm_response_cache
- rag.llm_response_cache = None
- first_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert first_result.status == "fail"
- assert "cache storage is unavailable" in first_result.message
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert failed_status is not None
- assert failed_status["metadata"]["deletion_llm_cache_ids"] == cache_ids
- assert await rag.full_docs.get_by_id(doc_id) is not None
- rag.llm_response_cache = cache_storage
- second_result = await rag.adelete_by_doc_id(doc_id, delete_llm_cache=True)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.llm_response_cache.get_by_id(cache_ids[0]) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_succeeds_when_chunks_list_missing(tmp_path):
- rag = await _build_rag(
- tmp_path, "delete_missing_chunks_list_rejected", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-missing-chunks-list"
- drop_chunk_id = "chunk-drop-only"
- seeded = await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- entity_a = seeded["entity_a"]
- entity_b = seeded["entity_b"]
- relation_key = seeded["relation_key"]
- result = await rag.adelete_by_doc_id(doc_id)
- assert result.status == "success"
- assert "without associated chunks" in result.message
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.full_entities.get_by_id(doc_id) is not None
- assert await rag.full_relations.get_by_id(doc_id) is not None
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is not None
- assert await rag.chunks_vdb.get_by_id(drop_chunk_id) is not None
- assert await rag.chunk_entity_relation_graph.get_node(entity_a) is not None
- assert await rag.chunk_entity_relation_graph.get_node(entity_b) is not None
- assert (
- await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b)
- is not None
- )
- assert await rag.entity_chunks.get_by_id(entity_a) is not None
- assert await rag.entity_chunks.get_by_id(entity_b) is not None
- assert await rag.relation_chunks.get_by_id(relation_key) is not None
- assert (
- await rag.entities_vdb.get_by_id(compute_mdhash_id(entity_a, prefix="ent-"))
- is not None
- )
- assert (
- await rag.entities_vdb.get_by_id(compute_mdhash_id(entity_b, prefix="ent-"))
- is not None
- )
- assert (
- await rag.relationships_vdb.get_by_id(
- compute_mdhash_id(entity_a + entity_b, prefix="rel-")
- )
- is not None
- )
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_ignores_stale_graph_source_ids_when_tracking_exists(tmp_path):
- rag = await _build_rag(
- tmp_path, "delete_ignore_stale_graph_sources", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-stale-graph-sources"
- drop_chunk_id = "chunk-drop"
- stale_chunk_id = "chunk-stale"
- seeded = await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- entity_a = seeded["entity_a"]
- entity_b = seeded["entity_b"]
- relation_key = seeded["relation_key"]
- stale_source_id = GRAPH_FIELD_SEP.join([stale_chunk_id, drop_chunk_id])
- await rag.chunk_entity_relation_graph.upsert_node(
- entity_a,
- {
- "entity_id": entity_a,
- "source_id": stale_source_id,
- "description": f"{entity_a} description",
- "entity_type": "test",
- "file_path": "delete_retry.txt",
- "created_at": 1,
- "truncate": "",
- },
- )
- await rag.chunk_entity_relation_graph.upsert_node(
- entity_b,
- {
- "entity_id": entity_b,
- "source_id": stale_source_id,
- "description": f"{entity_b} description",
- "entity_type": "test",
- "file_path": "delete_retry.txt",
- "created_at": 1,
- "truncate": "",
- },
- )
- await rag.chunk_entity_relation_graph.upsert_edge(
- entity_a,
- entity_b,
- {
- "source": entity_a,
- "target": entity_b,
- "source_id": stale_source_id,
- "description": "related",
- "keywords": "test",
- "weight": 1.0,
- "file_path": "delete_retry.txt",
- },
- )
- result = await rag.adelete_by_doc_id(doc_id)
- assert result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- assert await rag.chunk_entity_relation_graph.get_node(entity_a) is None
- assert await rag.chunk_entity_relation_graph.get_node(entity_b) is None
- assert (
- await rag.chunk_entity_relation_graph.get_edge(entity_a, entity_b) is None
- )
- assert await rag.entity_chunks.get_by_id(entity_a) is None
- assert await rag.entity_chunks.get_by_id(entity_b) is None
- assert await rag.relation_chunks.get_by_id(relation_key) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_validate_and_fix_consistency_preserves_chunks_on_reset(tmp_path):
- rag = await _build_rag(tmp_path, "reset_preserve_chunks", _deterministic_chunking)
- try:
- failed_doc_id = "doc-failed-reset"
- processing_doc_id = "doc-processing-reset"
- inferred_count_doc_id = "doc-inferred-count-reset"
- now = datetime.now(timezone.utc).isoformat()
- await rag.full_docs.upsert(
- {
- failed_doc_id: {"content": "failed doc", "file_path": "failed.txt"},
- processing_doc_id: {
- "content": "processing doc",
- "file_path": "processing.txt",
- },
- inferred_count_doc_id: {
- "content": "inferred count doc",
- "file_path": "inferred.txt",
- },
- }
- )
- await rag.doc_status.upsert(
- {
- failed_doc_id: {
- "status": DocStatus.FAILED,
- "content_summary": "failed",
- "content_length": 10,
- "chunks_count": 2,
- "chunks_list": ["f-1", "f-2"],
- "created_at": now,
- "updated_at": now,
- "file_path": "failed.txt",
- "track_id": "track-1",
- "error_msg": "old error",
- "metadata": {"old": True},
- },
- processing_doc_id: {
- "status": DocStatus.PROCESSING,
- "content_summary": "processing",
- "content_length": 12,
- "chunks_count": 1,
- "chunks_list": ["p-1"],
- "created_at": now,
- "updated_at": now,
- "file_path": "processing.txt",
- "track_id": "track-2",
- "error_msg": "old error",
- "metadata": {"old": True},
- },
- inferred_count_doc_id: {
- "status": DocStatus.FAILED,
- "content_summary": "inferred",
- "content_length": 14,
- "chunks_list": ["i-1", "i-2", "i-3"],
- "created_at": now,
- "updated_at": now,
- "file_path": "inferred.txt",
- "track_id": "track-3",
- "error_msg": "old error",
- "metadata": {"old": True},
- },
- }
- )
- failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
- processing_docs = await rag.doc_status.get_docs_by_status(DocStatus.PROCESSING)
- to_process_docs = {**failed_docs, **processing_docs}
- pipeline_status = {"latest_message": "", "history_messages": []}
- await rag._validate_and_fix_document_consistency(
- to_process_docs=to_process_docs,
- pipeline_status=pipeline_status,
- pipeline_status_lock=asyncio.Lock(),
- )
- failed_reset = await rag.doc_status.get_by_id(failed_doc_id)
- assert failed_reset is not None
- assert _status_to_text(failed_reset["status"]) == "pending"
- assert failed_reset.get("chunks_list") == ["f-1", "f-2"]
- assert failed_reset.get("chunks_count") == 2
- processing_reset = await rag.doc_status.get_by_id(processing_doc_id)
- assert processing_reset is not None
- assert _status_to_text(processing_reset["status"]) == "pending"
- assert processing_reset.get("chunks_list") == ["p-1"]
- assert processing_reset.get("chunks_count") == 1
- inferred_count_reset = await rag.doc_status.get_by_id(inferred_count_doc_id)
- assert inferred_count_reset is not None
- assert _status_to_text(inferred_count_reset["status"]) == "pending"
- assert inferred_count_reset.get("chunks_list") == ["i-1", "i-2", "i-3"]
- assert inferred_count_reset.get("chunks_count") == 3
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_validate_and_fix_consistency_repairs_unknown_file_path_from_full_docs(
- tmp_path,
- ):
- rag = await _build_rag(tmp_path, "repair_reset_file_path", _deterministic_chunking)
- try:
- doc_id = "doc-repair-reset"
- now = datetime.now(timezone.utc).isoformat()
- await rag.full_docs.upsert(
- {
- doc_id: {
- "content": "repair path doc",
- "file_path": "repaired-from-full-docs.md",
- }
- }
- )
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.FAILED,
- "content_summary": "repair path",
- "content_length": 15,
- "chunks_count": 0,
- "chunks_list": [],
- "created_at": now,
- "updated_at": now,
- "file_path": "unknown_source",
- "track_id": "track-repair",
- "error_msg": "old error",
- "metadata": {"old": True},
- }
- }
- )
- failed_docs = await rag.doc_status.get_docs_by_status(DocStatus.FAILED)
- pipeline_status = {"latest_message": "", "history_messages": []}
- await rag._validate_and_fix_document_consistency(
- to_process_docs=failed_docs,
- pipeline_status=pipeline_status,
- pipeline_status_lock=asyncio.Lock(),
- )
- repaired_status = await rag.doc_status.get_by_id(doc_id)
- assert repaired_status is not None
- assert _status_to_text(repaired_status["status"]) == "pending"
- assert repaired_status["file_path"] == "repaired-from-full-docs.md"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_pipeline_cancellation_preserves_file_path_for_queued_docs(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path,
- "cancel_preserve_file_path",
- _deterministic_chunking,
- max_parallel_insert=1,
- )
- try:
- contents = ["first queued content", "second queued content"]
- file_paths = ["first.md", "second.md"]
- await rag.apipeline_enqueue_documents(input=contents, file_paths=file_paths)
- extraction_started = asyncio.Event()
- release_first_doc = asyncio.Event()
- async def _blocking_extract(
- self, chunks, pipeline_status, pipeline_status_lock
- ):
- extraction_started.set()
- await release_first_doc.wait()
- return []
- monkeypatch.setattr(
- rag,
- "_process_extract_entities",
- MethodType(_blocking_extract, rag),
- )
- pipeline_task = asyncio.create_task(rag.apipeline_process_enqueue_documents())
- await asyncio.wait_for(extraction_started.wait(), timeout=5)
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- pipeline_status["cancellation_requested"] = True
- release_first_doc.set()
- await asyncio.wait_for(pipeline_task, timeout=5)
- second_doc_id = compute_mdhash_id(file_paths[1], prefix="doc-")
- second_status = await rag.doc_status.get_by_id(second_doc_id)
- assert second_status is not None
- assert _status_to_text(second_status["status"]) == "failed"
- assert second_status["file_path"] == "second.md"
- assert second_status["error_msg"] == "User cancelled"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_pipeline_cancellation_repairs_placeholder_file_path_for_queued_docs(
- tmp_path, monkeypatch
- ):
- rag = await _build_rag(
- tmp_path,
- "cancel_repair_placeholder_file_path",
- _deterministic_chunking,
- max_parallel_insert=1,
- )
- try:
- contents = ["first queued content", "second queued content"]
- file_paths = ["first.md", "second.md"]
- await rag.apipeline_enqueue_documents(input=contents, file_paths=file_paths)
- second_doc_id = compute_mdhash_id(file_paths[1], prefix="doc-")
- second_status = await rag.doc_status.get_by_id(second_doc_id)
- assert second_status is not None
- second_status["file_path"] = "unknown_source"
- await rag.doc_status.upsert({second_doc_id: second_status})
- extraction_started = asyncio.Event()
- release_first_doc = asyncio.Event()
- async def _blocking_extract(
- self, chunks, pipeline_status, pipeline_status_lock
- ):
- extraction_started.set()
- await release_first_doc.wait()
- return []
- monkeypatch.setattr(
- rag,
- "_process_extract_entities",
- MethodType(_blocking_extract, rag),
- )
- pipeline_task = asyncio.create_task(rag.apipeline_process_enqueue_documents())
- await asyncio.wait_for(extraction_started.wait(), timeout=5)
- pipeline_status = await get_namespace_data(
- "pipeline_status", workspace=rag.workspace
- )
- pipeline_status_lock = get_namespace_lock(
- "pipeline_status", workspace=rag.workspace
- )
- async with pipeline_status_lock:
- pipeline_status["cancellation_requested"] = True
- release_first_doc.set()
- await asyncio.wait_for(pipeline_task, timeout=5)
- repaired_status = await rag.doc_status.get_by_id(second_doc_id)
- assert repaired_status is not None
- assert _status_to_text(repaired_status["status"]) == "failed"
- assert repaired_status["file_path"] == "second.md"
- assert repaired_status["error_msg"] == "User cancelled"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_doc_entries_guard_prevents_zombie_record(tmp_path, monkeypatch):
- """When doc_status.delete fails, the guard must not re-create a zombie record.
- The exception handler skips _update_delete_retry_state when deletion_stage is
- "delete_doc_entries". doc_status.delete runs first, so full_docs is still intact
- on failure. This test confirms: (a) the result is status="fail", and (b) the
- exception handler does not upsert a new record with deletion_failed=True.
- """
- rag = await _build_rag(
- tmp_path, "delete_doc_entries_guard", _deterministic_chunking
- )
- try:
- doc_id = "doc-delete-entries-guard"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- original_delete = rag.doc_status.delete
- call_count = 0
- async def fail_doc_status_delete(ids):
- nonlocal call_count
- call_count += 1
- raise RuntimeError("doc_status delete fail sentinel")
- monkeypatch.setattr(rag.doc_status, "delete", fail_doc_status_delete)
- result = await rag.adelete_by_doc_id(doc_id)
- monkeypatch.setattr(rag.doc_status, "delete", original_delete)
- assert result.status == "fail"
- assert "doc_status delete fail sentinel" in result.message
- # doc_status.delete is now called first (before full_docs.delete). The patch
- # makes it fail, so doc_status still exists. The guard must not call
- # _update_delete_retry_state (which would upsert the record with
- # deletion_failed=True) when deletion_stage is "delete_doc_entries".
- status_record = await rag.doc_status.get_by_id(doc_id)
- assert (
- status_record is not None
- ), "doc_status should still exist (delete failed)"
- metadata = status_record.get("metadata", {})
- assert not metadata.get(
- "deletion_failed"
- ), "guard failed: zombie record written with deletion_failed=True"
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_retry_state_write_failure_in_exception_handler_still_returns_fail(
- tmp_path, monkeypatch
- ):
- """If _update_delete_retry_state itself fails inside the exception handler,
- the caller must still receive a well-formed DeletionResult with status="fail"
- and the original error message — not an unhandled exception.
- """
- rag = await _build_rag(
- tmp_path, "retry_state_write_failure", _deterministic_chunking
- )
- try:
- doc_id = "doc-retry-state-write-fail"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- # Include a keep_chunk_id so entities survive and rebuild is triggered
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- async def fail_rebuild(**kwargs):
- raise RuntimeError("rebuild fail sentinel")
- async def fail_upsert(data):
- raise RuntimeError("doc_status upsert fail sentinel")
- monkeypatch.setattr(
- lightrag_module, "rebuild_knowledge_from_chunks", fail_rebuild
- )
- monkeypatch.setattr(rag.doc_status, "upsert", fail_upsert)
- result = await rag.adelete_by_doc_id(doc_id)
- assert result.status == "fail"
- # The original rebuild error must be present in the message; the compound
- # "Additionally, failed to persist retry state" suffix may also be present.
- assert "rebuild fail sentinel" in result.message
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_persist_pre_rebuild_failure_records_stage_and_allows_retry(
- tmp_path, monkeypatch
- ):
- """A failure in _insert_done (persist_pre_rebuild_changes stage) records the
- correct stage in doc_status metadata, and a subsequent retry completes the
- deletion successfully.
- """
- rag = await _build_rag(
- tmp_path, "persist_pre_rebuild_failure", _deterministic_chunking
- )
- try:
- doc_id = "doc-persist-pre-rebuild-failure"
- keep_chunk_id = "chunk-keep"
- drop_chunk_id = "chunk-drop"
- seeded = await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[keep_chunk_id, drop_chunk_id],
- chunk_owners={
- keep_chunk_id: "doc-keep",
- drop_chunk_id: doc_id,
- },
- )
- entity_a = seeded["entity_a"]
- # entity_b = seeded["entity_b"]
- insert_done_calls = 0
- original_insert_done = rag._insert_done
- async def fail_first_insert_done():
- nonlocal insert_done_calls
- insert_done_calls += 1
- if insert_done_calls == 1:
- raise RuntimeError("insert_done fail sentinel")
- await original_insert_done()
- monkeypatch.setattr(rag, "_insert_done", fail_first_insert_done)
- first_result = await rag.adelete_by_doc_id(doc_id)
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert first_result.status == "fail"
- assert "insert_done fail sentinel" in first_result.message
- assert failed_status is not None
- assert failed_status["metadata"]["deletion_failed"] is True
- assert (
- failed_status["metadata"]["deletion_failure_stage"]
- == "persist_pre_rebuild_changes"
- )
- monkeypatch.undo()
- monkeypatch.setattr(
- lightrag_module,
- "rebuild_knowledge_from_chunks",
- _succeed_rebuild_from_remaining_chunks,
- )
- second_result = await rag.adelete_by_doc_id(doc_id)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.text_chunks.get_by_id(drop_chunk_id) is None
- assert await rag.text_chunks.get_by_id(keep_chunk_id) is not None
- entity_a_tracking = await rag.entity_chunks.get_by_id(entity_a)
- assert entity_a_tracking is not None
- assert entity_a_tracking["chunk_ids"] == [keep_chunk_id]
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_delete_doc_graph_metadata_failure_records_stage_and_allows_retry(
- tmp_path, monkeypatch
- ):
- """A failure in full_relations.delete (delete_doc_graph_metadata stage) records
- the correct stage in retry metadata, and a subsequent retry completes the deletion.
- """
- rag = await _build_rag(
- tmp_path, "delete_doc_graph_metadata_failure", _deterministic_chunking
- )
- try:
- doc_id = "doc-graph-metadata-failure"
- drop_chunk_id = "chunk-drop"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- relations_delete_calls = 0
- original_relations_delete = rag.full_relations.delete
- async def fail_first_relations_delete(ids):
- nonlocal relations_delete_calls
- relations_delete_calls += 1
- if relations_delete_calls == 1:
- raise RuntimeError("full_relations delete fail sentinel")
- await original_relations_delete(ids)
- monkeypatch.setattr(rag.full_relations, "delete", fail_first_relations_delete)
- first_result = await rag.adelete_by_doc_id(doc_id)
- failed_status = await rag.doc_status.get_by_id(doc_id)
- assert first_result.status == "fail"
- assert "full_relations delete fail sentinel" in first_result.message
- assert failed_status is not None
- assert failed_status["metadata"]["deletion_failed"] is True
- assert (
- failed_status["metadata"]["deletion_failure_stage"]
- == "delete_doc_graph_metadata"
- )
- monkeypatch.undo()
- second_result = await rag.adelete_by_doc_id(doc_id)
- assert second_result.status == "success"
- assert await rag.doc_status.get_by_id(doc_id) is None
- assert await rag.full_docs.get_by_id(doc_id) is None
- assert await rag.full_entities.get_by_id(doc_id) is None
- assert await rag.full_relations.get_by_id(doc_id) is None
- finally:
- await rag.finalize_storages()
- @pytest.mark.asyncio
- async def test_deletion_fully_completed_prevents_success_override_in_finally(
- tmp_path, monkeypatch
- ):
- """When deletion completes successfully but _insert_done fails in the finally
- block, the already-returned success result must not be overridden with a failure.
- This covers both the no-chunk path and the full deletion path.
- """
- for scenario in ("no_chunk", "full"):
- rag = await _build_rag(
- tmp_path,
- f"deletion_fully_completed_{scenario}",
- _deterministic_chunking,
- )
- try:
- doc_id = f"doc-fully-completed-{scenario}"
- if scenario == "no_chunk":
- # Seed a doc with no chunks: doc_status.chunks_list is empty.
- now = datetime.now(timezone.utc).isoformat()
- await rag.full_docs.upsert(
- {doc_id: {"content": "no chunk doc", "file_path": "nc.txt"}}
- )
- await rag.doc_status.upsert(
- {
- doc_id: {
- "status": DocStatus.PROCESSED,
- "content_summary": "no chunks",
- "content_length": 11,
- "chunks_count": 0,
- "chunks_list": [],
- "created_at": now,
- "updated_at": now,
- "file_path": "nc.txt",
- "track_id": f"track-{doc_id}",
- "error_msg": "",
- "metadata": {},
- }
- }
- )
- else:
- drop_chunk_id = "chunk-drop-fc"
- await _seed_delete_retry_state(
- rag,
- doc_id=doc_id,
- status_chunk_ids=[drop_chunk_id],
- tracking_chunk_ids=[drop_chunk_id],
- chunk_owners={drop_chunk_id: doc_id},
- )
- insert_done_calls = 0
- original_insert_done = rag._insert_done
- async def fail_later_insert_done():
- nonlocal insert_done_calls
- insert_done_calls += 1
- # Let the first call (persist_pre_rebuild_changes) succeed for the
- # full path; only fail the finally-block call.
- if insert_done_calls <= (1 if scenario == "full" else 0):
- await original_insert_done()
- else:
- raise RuntimeError("finally insert_done fail sentinel")
- monkeypatch.setattr(rag, "_insert_done", fail_later_insert_done)
- result = await rag.adelete_by_doc_id(doc_id)
- assert (
- result.status == "success"
- ), f"[{scenario}] expected success but got {result.status}: {result.message}"
- finally:
- monkeypatch.undo()
- await rag.finalize_storages()
|