test_graph_keyed_locks.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. """
  2. Pin the business-layer keyed-lock contracts on the entity-mutation paths.
  3. `get_storage_keyed_lock(keys, namespace=...)` acquires one mutex per key in
  4. the given namespace, so identical key strings share the same mutex across
  5. callers. Locking `[entity_name]` is therefore already enough to mutually
  6. exclude any concurrent edge write that names the same entity in
  7. `sorted([src, tgt])` — no need to enumerate incident edges here.
  8. These tests pin:
  9. - `aedit_entity` locks {old, new} on rename, {entity_name} otherwise.
  10. - `adelete_by_entity` locks {entity_name}.
  11. - `ainsert_custom_kg` locks every entity name plus every relationship
  12. endpoint that the batch will write, sharing the doc-ingest namespace.
  13. - An empty `ainsert_custom_kg` batch skips the lock entirely.
  14. """
  15. from contextlib import asynccontextmanager
  16. from unittest.mock import AsyncMock, MagicMock, patch
  17. import pytest
  18. # ---------------------------------------------------------------------------
  19. # Helpers
  20. # ---------------------------------------------------------------------------
  21. def _make_keyed_lock_spy():
  22. """Return (spy_callable, captured_calls_list).
  23. Spy yields a no-op async context manager and records every invocation's
  24. `keys` / `namespace` arguments.
  25. """
  26. captured: list[dict] = []
  27. @asynccontextmanager
  28. async def _noop_lock():
  29. yield
  30. def spy(keys, namespace="default", enable_logging=False):
  31. captured.append({"keys": list(keys), "namespace": namespace})
  32. return _noop_lock()
  33. return spy, captured
  34. def _make_graph_mock(
  35. edges_for_entity: list[tuple[str, str]] | None = None,
  36. *,
  37. existing_entity: str = "X",
  38. ):
  39. """Minimal `chunk_entity_relation_graph` mock.
  40. `has_node` returns True only for `existing_entity` so a rename target
  41. (e.g. "Y") is treated as not-yet-existing — otherwise aedit_entity would
  42. short-circuit with "Entity name 'Y' already exists".
  43. """
  44. graph = MagicMock()
  45. graph.get_node_edges = AsyncMock(return_value=edges_for_entity or [])
  46. graph.has_node = AsyncMock(side_effect=lambda name: name == existing_entity)
  47. graph.get_node = AsyncMock(
  48. return_value={
  49. "entity_id": existing_entity,
  50. "description": "old description",
  51. "entity_type": "PERSON",
  52. "source_id": "chunk-1",
  53. "file_path": "test.txt",
  54. }
  55. )
  56. graph.upsert_node = AsyncMock(return_value=None)
  57. graph.upsert_edge = AsyncMock(return_value=None)
  58. graph.upsert_nodes_batch = AsyncMock(return_value=None)
  59. graph.upsert_edges_batch = AsyncMock(return_value=None)
  60. graph.has_nodes_batch = AsyncMock(return_value=set())
  61. graph.delete_node = AsyncMock(return_value=None)
  62. graph.get_edge = AsyncMock(
  63. return_value={
  64. "weight": 1.0,
  65. "description": "rel",
  66. "keywords": "k",
  67. "source_id": "chunk-1",
  68. "file_path": "test.txt",
  69. "created_at": 0,
  70. }
  71. )
  72. graph.index_done_callback = AsyncMock(return_value=None)
  73. return graph
  74. def _make_vdb_mock(workspace: str = ""):
  75. vdb = MagicMock()
  76. vdb.global_config = {"workspace": workspace}
  77. vdb.upsert = AsyncMock(return_value=None)
  78. vdb.delete = AsyncMock(return_value=None)
  79. vdb.delete_entity = AsyncMock(return_value=None)
  80. vdb.delete_entity_relation = AsyncMock(return_value=None)
  81. vdb.index_done_callback = AsyncMock(return_value=None)
  82. vdb.client_storage = MagicMock()
  83. return vdb
  84. # ---------------------------------------------------------------------------
  85. # aedit_entity
  86. # ---------------------------------------------------------------------------
  87. @pytest.mark.asyncio
  88. async def test_aedit_entity_rename_locks_old_and_new_names():
  89. """Renaming X -> Y locks only {X, Y}. The doc-ingest pipeline uses the
  90. same namespace and acquires per-key mutexes, so locking the entity name
  91. already excludes any sorted([X, *]) or sorted([Y, *]) edge lock — no
  92. need to enumerate incident edges here."""
  93. from lightrag import utils_graph
  94. spy, captured = _make_keyed_lock_spy()
  95. graph = _make_graph_mock()
  96. entities_vdb = _make_vdb_mock(workspace="ws1")
  97. relationships_vdb = _make_vdb_mock(workspace="ws1")
  98. # Short-circuit before the rename actually runs — we only care about the
  99. # lock arguments.
  100. graph.upsert_node.side_effect = RuntimeError("stop after lock acquisition")
  101. with patch.object(utils_graph, "get_storage_keyed_lock", spy):
  102. with pytest.raises(RuntimeError, match="stop after lock acquisition"):
  103. await utils_graph.aedit_entity(
  104. chunk_entity_relation_graph=graph,
  105. entities_vdb=entities_vdb,
  106. relationships_vdb=relationships_vdb,
  107. entity_name="X",
  108. updated_data={"entity_name": "Y", "description": "renamed"},
  109. allow_rename=True,
  110. )
  111. assert len(captured) == 1
  112. assert captured[0]["keys"] == ["X", "Y"]
  113. assert captured[0]["namespace"] == "ws1:GraphDB"
  114. # No pre-fetch of incident edges — that would only add I/O.
  115. graph.get_node_edges.assert_not_called()
  116. @pytest.mark.asyncio
  117. async def test_aedit_entity_non_rename_locks_single_entity_name():
  118. """Non-rename edits lock just the entity name."""
  119. from lightrag import utils_graph
  120. spy, captured = _make_keyed_lock_spy()
  121. graph = _make_graph_mock()
  122. entities_vdb = _make_vdb_mock(workspace="")
  123. relationships_vdb = _make_vdb_mock(workspace="")
  124. graph.upsert_node.side_effect = RuntimeError("stop after lock acquisition")
  125. with patch.object(utils_graph, "get_storage_keyed_lock", spy):
  126. with pytest.raises(RuntimeError, match="stop after lock acquisition"):
  127. await utils_graph.aedit_entity(
  128. chunk_entity_relation_graph=graph,
  129. entities_vdb=entities_vdb,
  130. relationships_vdb=relationships_vdb,
  131. entity_name="X",
  132. updated_data={"description": "updated"},
  133. allow_rename=False,
  134. )
  135. assert len(captured) == 1
  136. assert captured[0]["keys"] == ["X"]
  137. # Empty workspace falls back to the bare "GraphDB" namespace.
  138. assert captured[0]["namespace"] == "GraphDB"
  139. graph.get_node_edges.assert_not_called()
  140. # ---------------------------------------------------------------------------
  141. # adelete_by_entity
  142. # ---------------------------------------------------------------------------
  143. @pytest.mark.asyncio
  144. async def test_adelete_by_entity_locks_single_entity_name():
  145. """Entity delete locks just the entity name."""
  146. from lightrag import utils_graph
  147. spy, captured = _make_keyed_lock_spy()
  148. graph = _make_graph_mock(edges_for_entity=[("X", "Y"), ("Z", "X")])
  149. entities_vdb = _make_vdb_mock(workspace="ws1")
  150. relationships_vdb = _make_vdb_mock(workspace="ws1")
  151. with patch.object(utils_graph, "get_storage_keyed_lock", spy):
  152. result = await utils_graph.adelete_by_entity(
  153. chunk_entity_relation_graph=graph,
  154. entities_vdb=entities_vdb,
  155. relationships_vdb=relationships_vdb,
  156. entity_name="X",
  157. )
  158. assert result.status == "success"
  159. assert len(captured) == 1
  160. assert captured[0]["keys"] == ["X"]
  161. assert captured[0]["namespace"] == "ws1:GraphDB"
  162. # get_node_edges runs exactly once, inside the lock, to drive cleanup —
  163. # not as a pre-fetch for lock-set extension.
  164. assert graph.get_node_edges.await_count == 1
  165. # ---------------------------------------------------------------------------
  166. # ainsert_custom_kg
  167. # ---------------------------------------------------------------------------
  168. class _AbortOnEnterLock:
  169. """Async context manager that captures lock args and aborts on __aenter__.
  170. Lets the test inspect the lock_keys argument without having to mock every
  171. downstream storage operation that would run inside the with-block.
  172. """
  173. def __init__(self):
  174. self.captured: list[dict] = []
  175. def __call__(self, keys, namespace="default", enable_logging=False):
  176. self.captured.append({"keys": list(keys), "namespace": namespace})
  177. return self
  178. async def __aenter__(self):
  179. raise _LockCaptured("captured")
  180. async def __aexit__(self, exc_type, exc, tb):
  181. return False
  182. class _LockCaptured(RuntimeError):
  183. """Sentinel raised from the captured lock context to short-circuit the
  184. enclosing async with block."""
  185. @pytest.mark.asyncio
  186. async def test_ainsert_custom_kg_locks_every_entity_and_endpoint():
  187. """ainsert_custom_kg must hold a single coarse-grained keyed lock whose
  188. key set covers every entity name plus every relationship endpoint in the
  189. batch — sharing the doc-ingest namespace so concurrent callers on
  190. overlapping entities serialise instead of racing.
  191. """
  192. from lightrag import lightrag as lightrag_module
  193. from lightrag.lightrag import LightRAG
  194. rag = LightRAG.__new__(LightRAG)
  195. rag.workspace = "ws1"
  196. rag.tokenizer = MagicMock()
  197. rag.tokenizer.encode = lambda _content: []
  198. rag.chunks_vdb = _make_vdb_mock(workspace="ws1")
  199. rag.text_chunks = _make_vdb_mock(workspace="ws1")
  200. rag.chunk_entity_relation_graph = _make_graph_mock()
  201. rag.entities_vdb = _make_vdb_mock(workspace="ws1")
  202. rag.relationships_vdb = _make_vdb_mock(workspace="ws1")
  203. rag._insert_done = AsyncMock(return_value=None)
  204. lock_spy = _AbortOnEnterLock()
  205. custom_kg = {
  206. "chunks": [],
  207. "entities": [
  208. {
  209. "entity_name": "Alice",
  210. "entity_type": "PERSON",
  211. "description": "x",
  212. "source_id": "chunk-1",
  213. "file_path": "f",
  214. },
  215. {
  216. "entity_name": "Bob",
  217. "entity_type": "PERSON",
  218. "description": "y",
  219. "source_id": "chunk-1",
  220. "file_path": "f",
  221. },
  222. ],
  223. "relationships": [
  224. {
  225. "src_id": "Alice",
  226. "tgt_id": "Bob",
  227. "description": "knows",
  228. "keywords": "k",
  229. "weight": 1.0,
  230. "source_id": "chunk-1",
  231. "file_path": "f",
  232. },
  233. {
  234. "src_id": "Bob",
  235. "tgt_id": "Carol",
  236. "description": "knows",
  237. "keywords": "k",
  238. "weight": 1.0,
  239. "source_id": "chunk-1",
  240. "file_path": "f",
  241. },
  242. ],
  243. }
  244. with patch.object(lightrag_module, "get_storage_keyed_lock", lock_spy):
  245. with pytest.raises(_LockCaptured):
  246. await rag.ainsert_custom_kg(custom_kg)
  247. assert len(lock_spy.captured) == 1
  248. call = lock_spy.captured[0]
  249. # Namespace matches the doc-ingest pipeline so the same key strings
  250. # mutually exclude across paths.
  251. assert call["namespace"] == "ws1:GraphDB"
  252. # Union of entity names ({Alice, Bob}) and every relationship endpoint
  253. # ({Alice, Bob, Carol}), sorted.
  254. assert call["keys"] == ["Alice", "Bob", "Carol"]
  255. @pytest.mark.asyncio
  256. async def test_ainsert_custom_kg_empty_batch_skips_keyed_lock():
  257. """A custom_kg with no entities or relationships has nothing for the
  258. business-layer keyed lock to serialise on — no lock is acquired and the
  259. chunk-only path still completes."""
  260. from lightrag import lightrag as lightrag_module
  261. from lightrag.lightrag import LightRAG
  262. rag = LightRAG.__new__(LightRAG)
  263. rag.workspace = ""
  264. rag.tokenizer = MagicMock()
  265. rag.tokenizer.encode = lambda _content: []
  266. rag.chunks_vdb = _make_vdb_mock(workspace="")
  267. rag.text_chunks = _make_vdb_mock(workspace="")
  268. rag.chunk_entity_relation_graph = _make_graph_mock()
  269. rag.entities_vdb = _make_vdb_mock(workspace="")
  270. rag.relationships_vdb = _make_vdb_mock(workspace="")
  271. rag._insert_done = AsyncMock(return_value=None)
  272. lock_spy = _AbortOnEnterLock()
  273. with patch.object(lightrag_module, "get_storage_keyed_lock", lock_spy):
  274. await rag.ainsert_custom_kg({"chunks": [], "entities": [], "relationships": []})
  275. assert lock_spy.captured == []