test_graph_routes_pipeline_busy.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. """Pipeline-busy guard tests for graph mutation endpoints.
  2. These tests verify that all 7 graph-mutation endpoints refuse to operate
  3. with HTTP 409 while the document pipeline is busy:
  4. - POST /graph/entity/edit (graph_routes)
  5. - POST /graph/relation/edit (graph_routes)
  6. - POST /graph/entity/create (graph_routes)
  7. - POST /graph/relation/create (graph_routes)
  8. - POST /graph/entities/merge (graph_routes)
  9. - DELETE /documents/delete_entity (document_routes)
  10. - DELETE /documents/delete_relation (document_routes)
  11. The guard logic itself lives in
  12. ``lightrag.api.routers.document_routes.check_pipeline_busy_or_raise`` and is
  13. exercised both at the endpoint integration layer (via monkeypatch, no
  14. shared-storage dependency) and at the unit layer (against a real
  15. ``pipeline_status`` namespace).
  16. """
  17. import importlib
  18. import sys
  19. from types import SimpleNamespace
  20. from unittest.mock import AsyncMock
  21. import pytest
  22. from fastapi import FastAPI, HTTPException
  23. from fastapi.testclient import TestClient
  24. # Importing routers loads ``lightrag.api.config`` which parses ``sys.argv`` via
  25. # argparse. Stash argv so pytest's CLI flags don't trip the parser.
  26. _original_argv = sys.argv[:]
  27. sys.argv = [sys.argv[0]]
  28. _graph_routes = importlib.import_module("lightrag.api.routers.graph_routes")
  29. _document_routes = importlib.import_module("lightrag.api.routers.document_routes")
  30. sys.argv = _original_argv
  31. create_graph_routes = _graph_routes.create_graph_routes
  32. create_document_routes = _document_routes.create_document_routes
  33. check_pipeline_busy_or_raise = _document_routes.check_pipeline_busy_or_raise
  34. pytestmark = pytest.mark.offline
  35. _API_KEY = "test-key"
  36. _HEADERS = {"X-API-Key": _API_KEY}
  37. # ---------------------------------------------------------------------------
  38. # Test scaffolding
  39. # ---------------------------------------------------------------------------
  40. def _make_mock_rag() -> SimpleNamespace:
  41. """Build a minimal LightRAG stand-in with the 7 mutation methods stubbed.
  42. Each ``AsyncMock`` returns a payload shaped enough to satisfy the
  43. endpoint's response model so the idle pass-through test can verify the
  44. full request path. Busy tests don't rely on these return values; the
  45. guard short-circuits before they're reached.
  46. """
  47. return SimpleNamespace(
  48. workspace="",
  49. aedit_entity=AsyncMock(
  50. return_value={
  51. "entity_name": "Alice",
  52. "description": "updated",
  53. "operation_summary": {
  54. "merged": False,
  55. "merge_status": "not_attempted",
  56. "merge_error": None,
  57. "operation_status": "success",
  58. "target_entity": None,
  59. "final_entity": "Alice",
  60. "renamed": False,
  61. },
  62. }
  63. ),
  64. aedit_relation=AsyncMock(return_value={"description": "updated"}),
  65. acreate_entity=AsyncMock(return_value={"entity_name": "Alice"}),
  66. acreate_relation=AsyncMock(return_value={"src_id": "a", "tgt_id": "b"}),
  67. amerge_entities=AsyncMock(return_value={"merged_entity": "Alice"}),
  68. adelete_by_entity=AsyncMock(
  69. return_value=SimpleNamespace(
  70. status="success", message="deleted", doc_id="ignored"
  71. )
  72. ),
  73. adelete_by_relation=AsyncMock(
  74. return_value=SimpleNamespace(
  75. status="success", message="deleted", doc_id="ignored"
  76. )
  77. ),
  78. )
  79. def _build_client(rag: SimpleNamespace) -> TestClient:
  80. app = FastAPI()
  81. app.include_router(create_graph_routes(rag, api_key=_API_KEY))
  82. app.include_router(create_document_routes(rag, SimpleNamespace(), api_key=_API_KEY))
  83. return TestClient(app)
  84. async def _force_busy_guard(_rag) -> None:
  85. """Stand-in for ``check_pipeline_busy_or_raise`` that always refuses."""
  86. raise HTTPException(
  87. status_code=409,
  88. detail=(
  89. "Pipeline is busy with another operation. "
  90. "Wait for the running job to finish before editing "
  91. "the knowledge graph."
  92. ),
  93. )
  94. async def _noop_guard(_rag) -> None:
  95. """Stand-in for ``check_pipeline_busy_or_raise`` that always permits."""
  96. return None
  97. def _patch_guard(monkeypatch, replacement) -> None:
  98. """Replace the guard reference in BOTH consumer modules.
  99. ``graph_routes`` re-binds the name via ``from .document_routes import ...``
  100. so patching only ``document_routes`` would miss the graph endpoints.
  101. """
  102. monkeypatch.setattr(_graph_routes, "check_pipeline_busy_or_raise", replacement)
  103. monkeypatch.setattr(_document_routes, "check_pipeline_busy_or_raise", replacement)
  104. # ---------------------------------------------------------------------------
  105. # Part A: endpoint integration -- guard refuses with 409
  106. # ---------------------------------------------------------------------------
  107. _ENDPOINTS = [
  108. pytest.param(
  109. "POST",
  110. "/graph/entity/edit",
  111. {"entity_name": "Alice", "updated_data": {"description": "x"}},
  112. id="update_entity",
  113. ),
  114. pytest.param(
  115. "POST",
  116. "/graph/relation/edit",
  117. {
  118. "source_id": "Alice",
  119. "target_id": "Bob",
  120. "updated_data": {"description": "x"},
  121. },
  122. id="update_relation",
  123. ),
  124. pytest.param(
  125. "POST",
  126. "/graph/entity/create",
  127. {"entity_name": "Alice", "entity_data": {"description": "x"}},
  128. id="create_entity",
  129. ),
  130. pytest.param(
  131. "POST",
  132. "/graph/relation/create",
  133. {
  134. "source_entity": "Alice",
  135. "target_entity": "Bob",
  136. "relation_data": {"description": "x"},
  137. },
  138. id="create_relation",
  139. ),
  140. pytest.param(
  141. "POST",
  142. "/graph/entities/merge",
  143. {"entities_to_change": ["Alic"], "entity_to_change_into": "Alice"},
  144. id="merge_entities",
  145. ),
  146. pytest.param(
  147. "DELETE",
  148. "/documents/delete_entity",
  149. {"entity_name": "Alice"},
  150. id="delete_entity",
  151. ),
  152. pytest.param(
  153. "DELETE",
  154. "/documents/delete_relation",
  155. {"source_entity": "Alice", "target_entity": "Bob"},
  156. id="delete_relation",
  157. ),
  158. ]
  159. @pytest.mark.parametrize("method, path, body", _ENDPOINTS)
  160. def test_endpoint_refuses_with_409_when_pipeline_busy(method, path, body, monkeypatch):
  161. rag = _make_mock_rag()
  162. client = _build_client(rag)
  163. _patch_guard(monkeypatch, _force_busy_guard)
  164. response = client.request(method, path, json=body, headers=_HEADERS)
  165. assert response.status_code == 409, response.text
  166. payload = response.json()
  167. assert "Pipeline is busy" in payload["detail"]
  168. # Guard must short-circuit before the underlying mutation runs.
  169. for attr in (
  170. "aedit_entity",
  171. "aedit_relation",
  172. "acreate_entity",
  173. "acreate_relation",
  174. "amerge_entities",
  175. "adelete_by_entity",
  176. "adelete_by_relation",
  177. ):
  178. getattr(rag, attr).assert_not_awaited()
  179. def test_endpoint_passes_through_when_pipeline_idle(monkeypatch):
  180. """Sanity check: with an idle guard, the request reaches ``rag.aedit_entity``."""
  181. rag = _make_mock_rag()
  182. client = _build_client(rag)
  183. _patch_guard(monkeypatch, _noop_guard)
  184. response = client.post(
  185. "/graph/entity/edit",
  186. json={"entity_name": "Alice", "updated_data": {"description": "x"}},
  187. headers=_HEADERS,
  188. )
  189. assert response.status_code == 200, response.text
  190. rag.aedit_entity.assert_awaited_once()
  191. # ---------------------------------------------------------------------------
  192. # Part B: helper unit -- against real pipeline_status namespace
  193. # ---------------------------------------------------------------------------
  194. async def _with_pipeline_status(action):
  195. """Bootstrap pipeline_status, run ``action(pipeline_status)``, then tear down.
  196. ``initialize_share_data`` is idempotent within a process but
  197. ``finalize_share_data`` is required to release the Manager/lock state so
  198. repeated calls in subsequent tests start clean.
  199. """
  200. from lightrag.kg.shared_storage import (
  201. finalize_share_data,
  202. get_namespace_data,
  203. initialize_pipeline_status,
  204. initialize_share_data,
  205. )
  206. initialize_share_data()
  207. try:
  208. await initialize_pipeline_status(workspace="")
  209. pipeline_status = await get_namespace_data("pipeline_status", workspace="")
  210. await action(pipeline_status)
  211. finally:
  212. finalize_share_data()
  213. async def test_helper_raises_409_when_busy_flag_set():
  214. async def _do(pipeline_status):
  215. pipeline_status["busy"] = True
  216. rag = SimpleNamespace(workspace="")
  217. with pytest.raises(HTTPException) as exc_info:
  218. await check_pipeline_busy_or_raise(rag)
  219. assert exc_info.value.status_code == 409
  220. assert "Pipeline is busy" in exc_info.value.detail
  221. await _with_pipeline_status(_do)
  222. async def test_helper_returns_silently_when_pipeline_idle():
  223. async def _do(pipeline_status):
  224. pipeline_status["busy"] = False
  225. rag = SimpleNamespace(workspace="")
  226. # Should not raise.
  227. await check_pipeline_busy_or_raise(rag)
  228. await _with_pipeline_status(_do)
  229. async def test_helper_is_noop_when_pipeline_status_uninitialized():
  230. """When pipeline_status namespace was never bootstrapped the helper must pass.
  231. ``get_namespace_data`` raises ``PipelineNotInitializedError`` when the
  232. pipeline_status namespace is missing (share data initialized but the
  233. pipeline namespace never created); the helper swallows that error so test
  234. rigs without an end-to-end RAG bootstrap stay green. Mirrors the existing
  235. contract of ``_acquire_destructive_busy``.
  236. """
  237. from lightrag.kg.shared_storage import (
  238. finalize_share_data,
  239. initialize_share_data,
  240. )
  241. initialize_share_data()
  242. try:
  243. rag = SimpleNamespace(workspace="__never_bootstrapped__")
  244. # Intentionally skip ``initialize_pipeline_status``: helper should
  245. # catch ``PipelineNotInitializedError`` and return silently.
  246. await check_pipeline_busy_or_raise(rag)
  247. finally:
  248. finalize_share_data()