| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- """
- Pin the business-layer keyed-lock contracts on the entity-mutation paths.
- `get_storage_keyed_lock(keys, namespace=...)` acquires one mutex per key in
- the given namespace, so identical key strings share the same mutex across
- callers. Locking `[entity_name]` is therefore already enough to mutually
- exclude any concurrent edge write that names the same entity in
- `sorted([src, tgt])` — no need to enumerate incident edges here.
- These tests pin:
- - `aedit_entity` locks {old, new} on rename, {entity_name} otherwise.
- - `adelete_by_entity` locks {entity_name}.
- - `ainsert_custom_kg` locks every entity name plus every relationship
- endpoint that the batch will write, sharing the doc-ingest namespace.
- - An empty `ainsert_custom_kg` batch skips the lock entirely.
- """
- from contextlib import asynccontextmanager
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- # ---------------------------------------------------------------------------
- # Helpers
- # ---------------------------------------------------------------------------
- def _make_keyed_lock_spy():
- """Return (spy_callable, captured_calls_list).
- Spy yields a no-op async context manager and records every invocation's
- `keys` / `namespace` arguments.
- """
- captured: list[dict] = []
- @asynccontextmanager
- async def _noop_lock():
- yield
- def spy(keys, namespace="default", enable_logging=False):
- captured.append({"keys": list(keys), "namespace": namespace})
- return _noop_lock()
- return spy, captured
- def _make_graph_mock(
- edges_for_entity: list[tuple[str, str]] | None = None,
- *,
- existing_entity: str = "X",
- ):
- """Minimal `chunk_entity_relation_graph` mock.
- `has_node` returns True only for `existing_entity` so a rename target
- (e.g. "Y") is treated as not-yet-existing — otherwise aedit_entity would
- short-circuit with "Entity name 'Y' already exists".
- """
- graph = MagicMock()
- graph.get_node_edges = AsyncMock(return_value=edges_for_entity or [])
- graph.has_node = AsyncMock(side_effect=lambda name: name == existing_entity)
- graph.get_node = AsyncMock(
- return_value={
- "entity_id": existing_entity,
- "description": "old description",
- "entity_type": "PERSON",
- "source_id": "chunk-1",
- "file_path": "test.txt",
- }
- )
- graph.upsert_node = AsyncMock(return_value=None)
- graph.upsert_edge = AsyncMock(return_value=None)
- graph.upsert_nodes_batch = AsyncMock(return_value=None)
- graph.upsert_edges_batch = AsyncMock(return_value=None)
- graph.has_nodes_batch = AsyncMock(return_value=set())
- graph.delete_node = AsyncMock(return_value=None)
- graph.get_edge = AsyncMock(
- return_value={
- "weight": 1.0,
- "description": "rel",
- "keywords": "k",
- "source_id": "chunk-1",
- "file_path": "test.txt",
- "created_at": 0,
- }
- )
- graph.index_done_callback = AsyncMock(return_value=None)
- return graph
- def _make_vdb_mock(workspace: str = ""):
- vdb = MagicMock()
- vdb.global_config = {"workspace": workspace}
- vdb.upsert = AsyncMock(return_value=None)
- vdb.delete = AsyncMock(return_value=None)
- vdb.delete_entity = AsyncMock(return_value=None)
- vdb.delete_entity_relation = AsyncMock(return_value=None)
- vdb.index_done_callback = AsyncMock(return_value=None)
- vdb.client_storage = MagicMock()
- return vdb
- # ---------------------------------------------------------------------------
- # aedit_entity
- # ---------------------------------------------------------------------------
- @pytest.mark.asyncio
- async def test_aedit_entity_rename_locks_old_and_new_names():
- """Renaming X -> Y locks only {X, Y}. The doc-ingest pipeline uses the
- same namespace and acquires per-key mutexes, so locking the entity name
- already excludes any sorted([X, *]) or sorted([Y, *]) edge lock — no
- need to enumerate incident edges here."""
- from lightrag import utils_graph
- spy, captured = _make_keyed_lock_spy()
- graph = _make_graph_mock()
- entities_vdb = _make_vdb_mock(workspace="ws1")
- relationships_vdb = _make_vdb_mock(workspace="ws1")
- # Short-circuit before the rename actually runs — we only care about the
- # lock arguments.
- graph.upsert_node.side_effect = RuntimeError("stop after lock acquisition")
- with patch.object(utils_graph, "get_storage_keyed_lock", spy):
- with pytest.raises(RuntimeError, match="stop after lock acquisition"):
- await utils_graph.aedit_entity(
- chunk_entity_relation_graph=graph,
- entities_vdb=entities_vdb,
- relationships_vdb=relationships_vdb,
- entity_name="X",
- updated_data={"entity_name": "Y", "description": "renamed"},
- allow_rename=True,
- )
- assert len(captured) == 1
- assert captured[0]["keys"] == ["X", "Y"]
- assert captured[0]["namespace"] == "ws1:GraphDB"
- # No pre-fetch of incident edges — that would only add I/O.
- graph.get_node_edges.assert_not_called()
- @pytest.mark.asyncio
- async def test_aedit_entity_non_rename_locks_single_entity_name():
- """Non-rename edits lock just the entity name."""
- from lightrag import utils_graph
- spy, captured = _make_keyed_lock_spy()
- graph = _make_graph_mock()
- entities_vdb = _make_vdb_mock(workspace="")
- relationships_vdb = _make_vdb_mock(workspace="")
- graph.upsert_node.side_effect = RuntimeError("stop after lock acquisition")
- with patch.object(utils_graph, "get_storage_keyed_lock", spy):
- with pytest.raises(RuntimeError, match="stop after lock acquisition"):
- await utils_graph.aedit_entity(
- chunk_entity_relation_graph=graph,
- entities_vdb=entities_vdb,
- relationships_vdb=relationships_vdb,
- entity_name="X",
- updated_data={"description": "updated"},
- allow_rename=False,
- )
- assert len(captured) == 1
- assert captured[0]["keys"] == ["X"]
- # Empty workspace falls back to the bare "GraphDB" namespace.
- assert captured[0]["namespace"] == "GraphDB"
- graph.get_node_edges.assert_not_called()
- # ---------------------------------------------------------------------------
- # adelete_by_entity
- # ---------------------------------------------------------------------------
- @pytest.mark.asyncio
- async def test_adelete_by_entity_locks_single_entity_name():
- """Entity delete locks just the entity name."""
- from lightrag import utils_graph
- spy, captured = _make_keyed_lock_spy()
- graph = _make_graph_mock(edges_for_entity=[("X", "Y"), ("Z", "X")])
- entities_vdb = _make_vdb_mock(workspace="ws1")
- relationships_vdb = _make_vdb_mock(workspace="ws1")
- with patch.object(utils_graph, "get_storage_keyed_lock", spy):
- result = await utils_graph.adelete_by_entity(
- chunk_entity_relation_graph=graph,
- entities_vdb=entities_vdb,
- relationships_vdb=relationships_vdb,
- entity_name="X",
- )
- assert result.status == "success"
- assert len(captured) == 1
- assert captured[0]["keys"] == ["X"]
- assert captured[0]["namespace"] == "ws1:GraphDB"
- # get_node_edges runs exactly once, inside the lock, to drive cleanup —
- # not as a pre-fetch for lock-set extension.
- assert graph.get_node_edges.await_count == 1
- # ---------------------------------------------------------------------------
- # ainsert_custom_kg
- # ---------------------------------------------------------------------------
- class _AbortOnEnterLock:
- """Async context manager that captures lock args and aborts on __aenter__.
- Lets the test inspect the lock_keys argument without having to mock every
- downstream storage operation that would run inside the with-block.
- """
- def __init__(self):
- self.captured: list[dict] = []
- def __call__(self, keys, namespace="default", enable_logging=False):
- self.captured.append({"keys": list(keys), "namespace": namespace})
- return self
- async def __aenter__(self):
- raise _LockCaptured("captured")
- async def __aexit__(self, exc_type, exc, tb):
- return False
- class _LockCaptured(RuntimeError):
- """Sentinel raised from the captured lock context to short-circuit the
- enclosing async with block."""
- @pytest.mark.asyncio
- async def test_ainsert_custom_kg_locks_every_entity_and_endpoint():
- """ainsert_custom_kg must hold a single coarse-grained keyed lock whose
- key set covers every entity name plus every relationship endpoint in the
- batch — sharing the doc-ingest namespace so concurrent callers on
- overlapping entities serialise instead of racing.
- """
- from lightrag import lightrag as lightrag_module
- from lightrag.lightrag import LightRAG
- rag = LightRAG.__new__(LightRAG)
- rag.workspace = "ws1"
- rag.tokenizer = MagicMock()
- rag.tokenizer.encode = lambda _content: []
- rag.chunks_vdb = _make_vdb_mock(workspace="ws1")
- rag.text_chunks = _make_vdb_mock(workspace="ws1")
- rag.chunk_entity_relation_graph = _make_graph_mock()
- rag.entities_vdb = _make_vdb_mock(workspace="ws1")
- rag.relationships_vdb = _make_vdb_mock(workspace="ws1")
- rag._insert_done = AsyncMock(return_value=None)
- lock_spy = _AbortOnEnterLock()
- custom_kg = {
- "chunks": [],
- "entities": [
- {
- "entity_name": "Alice",
- "entity_type": "PERSON",
- "description": "x",
- "source_id": "chunk-1",
- "file_path": "f",
- },
- {
- "entity_name": "Bob",
- "entity_type": "PERSON",
- "description": "y",
- "source_id": "chunk-1",
- "file_path": "f",
- },
- ],
- "relationships": [
- {
- "src_id": "Alice",
- "tgt_id": "Bob",
- "description": "knows",
- "keywords": "k",
- "weight": 1.0,
- "source_id": "chunk-1",
- "file_path": "f",
- },
- {
- "src_id": "Bob",
- "tgt_id": "Carol",
- "description": "knows",
- "keywords": "k",
- "weight": 1.0,
- "source_id": "chunk-1",
- "file_path": "f",
- },
- ],
- }
- with patch.object(lightrag_module, "get_storage_keyed_lock", lock_spy):
- with pytest.raises(_LockCaptured):
- await rag.ainsert_custom_kg(custom_kg)
- assert len(lock_spy.captured) == 1
- call = lock_spy.captured[0]
- # Namespace matches the doc-ingest pipeline so the same key strings
- # mutually exclude across paths.
- assert call["namespace"] == "ws1:GraphDB"
- # Union of entity names ({Alice, Bob}) and every relationship endpoint
- # ({Alice, Bob, Carol}), sorted.
- assert call["keys"] == ["Alice", "Bob", "Carol"]
- @pytest.mark.asyncio
- async def test_ainsert_custom_kg_empty_batch_skips_keyed_lock():
- """A custom_kg with no entities or relationships has nothing for the
- business-layer keyed lock to serialise on — no lock is acquired and the
- chunk-only path still completes."""
- from lightrag import lightrag as lightrag_module
- from lightrag.lightrag import LightRAG
- rag = LightRAG.__new__(LightRAG)
- rag.workspace = ""
- rag.tokenizer = MagicMock()
- rag.tokenizer.encode = lambda _content: []
- rag.chunks_vdb = _make_vdb_mock(workspace="")
- rag.text_chunks = _make_vdb_mock(workspace="")
- rag.chunk_entity_relation_graph = _make_graph_mock()
- rag.entities_vdb = _make_vdb_mock(workspace="")
- rag.relationships_vdb = _make_vdb_mock(workspace="")
- rag._insert_done = AsyncMock(return_value=None)
- lock_spy = _AbortOnEnterLock()
- with patch.object(lightrag_module, "get_storage_keyed_lock", lock_spy):
- await rag.ainsert_custom_kg({"chunks": [], "entities": [], "relationships": []})
- assert lock_spy.captured == []
|