test_workspace_isolation.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183
  1. #!/usr/bin/env python
  2. """
  3. Test script for Workspace Isolation Feature
  4. Comprehensive test suite covering workspace isolation in LightRAG:
  5. 1. Pipeline Status Isolation - Data isolation between workspaces
  6. 2. Lock Mechanism - Parallel execution for different workspaces, serial for same workspace
  7. 3. Backward Compatibility - Legacy code without workspace parameters
  8. 4. Multi-Workspace Concurrency - Concurrent operations on different workspaces
  9. 5. NamespaceLock Re-entrance Protection - Prevents deadlocks
  10. 6. Different Namespace Lock Isolation - Locks isolated by namespace
  11. 7. Error Handling - Invalid workspace configurations
  12. 8. Update Flags Workspace Isolation - Update flags properly isolated
  13. 9. Empty Workspace Standardization - Empty workspace handling
  14. 10. JsonKVStorage Workspace Isolation - Integration test for KV storage
  15. 11. LightRAG End-to-End Workspace Isolation - Complete E2E test with two instances
  16. Total: 11 test scenarios
  17. """
  18. import asyncio
  19. import time
  20. import os
  21. import shutil
  22. import numpy as np
  23. import pytest
  24. from pathlib import Path
  25. from typing import List, Tuple, Dict
  26. from lightrag.kg.shared_storage import (
  27. get_final_namespace,
  28. get_namespace_lock,
  29. get_default_workspace,
  30. set_default_workspace,
  31. initialize_share_data,
  32. finalize_share_data,
  33. initialize_pipeline_status,
  34. get_namespace_data,
  35. set_all_update_flags,
  36. clear_all_update_flags,
  37. get_all_update_flags_status,
  38. get_update_flag,
  39. )
  40. # =============================================================================
  41. # Test Configuration
  42. # =============================================================================
  43. # Test configuration is handled via pytest fixtures in conftest.py
  44. # - Use CLI options: --keep-artifacts, --stress-test, --test-workers=N
  45. # - Or environment variables: LIGHTRAG_KEEP_ARTIFACTS, LIGHTRAG_STRESS_TEST, LIGHTRAG_TEST_WORKERS
  46. # Priority: CLI options > Environment variables > Default values
  47. # =============================================================================
  48. # Pytest Fixtures
  49. # =============================================================================
  50. @pytest.fixture(autouse=True)
  51. def setup_shared_data():
  52. """Initialize shared data before each test"""
  53. initialize_share_data()
  54. yield
  55. finalize_share_data()
  56. async def _measure_lock_parallelism(
  57. workload: List[Tuple[str, str, str]], hold_time: float = 0.05
  58. ) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]:
  59. """Run lock acquisition workload and capture peak concurrency and timeline.
  60. Args:
  61. workload: List of (name, workspace, namespace) tuples
  62. hold_time: How long each worker holds the lock (seconds)
  63. Returns:
  64. Tuple of (max_parallel, timeline, metrics) where:
  65. - max_parallel: Peak number of concurrent lock holders
  66. - timeline: List of (name, event) tuples tracking execution order
  67. - metrics: Dict with performance metrics (total_duration, max_concurrency, etc.)
  68. """
  69. running = 0
  70. max_parallel = 0
  71. timeline: List[Tuple[str, str]] = []
  72. start_time = time.time()
  73. async def worker(name: str, workspace: str, namespace: str) -> None:
  74. nonlocal running, max_parallel
  75. lock = get_namespace_lock(namespace, workspace)
  76. async with lock:
  77. running += 1
  78. max_parallel = max(max_parallel, running)
  79. timeline.append((name, "start"))
  80. await asyncio.sleep(hold_time)
  81. timeline.append((name, "end"))
  82. running -= 1
  83. await asyncio.gather(*(worker(*args) for args in workload))
  84. metrics = {
  85. "total_duration": time.time() - start_time,
  86. "max_concurrency": max_parallel,
  87. "avg_hold_time": hold_time,
  88. "num_workers": len(workload),
  89. }
  90. return max_parallel, timeline, metrics
  91. def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
  92. """Ensure that timeline events never overlap for sequential execution.
  93. This function implements a finite state machine that validates:
  94. - No overlapping lock acquisitions (only one task active at a time)
  95. - Proper lock release order (task releases its own lock)
  96. - All locks are properly released
  97. Args:
  98. timeline: List of (name, event) tuples where event is "start" or "end"
  99. Raises:
  100. AssertionError: If timeline shows overlapping execution or improper locking
  101. """
  102. active_task = None
  103. for name, event in timeline:
  104. if event == "start":
  105. if active_task is not None:
  106. raise AssertionError(
  107. f"Task '{name}' started before '{active_task}' released the lock"
  108. )
  109. active_task = name
  110. else:
  111. if active_task != name:
  112. raise AssertionError(
  113. f"Task '{name}' finished while '{active_task}' was expected to hold the lock"
  114. )
  115. active_task = None
  116. if active_task is not None:
  117. raise AssertionError(f"Task '{active_task}' did not release the lock properly")
  118. # =============================================================================
  119. # Test 1: Pipeline Status Isolation Test
  120. # =============================================================================
  121. @pytest.mark.offline
  122. async def test_pipeline_status_isolation():
  123. """
  124. Test that pipeline status is isolated between different workspaces.
  125. """
  126. # Purpose: Ensure pipeline_status shared data remains unique per workspace.
  127. # Scope: initialize_pipeline_status and get_namespace_data interactions.
  128. print("\n" + "=" * 60)
  129. print("TEST 1: Pipeline Status Isolation")
  130. print("=" * 60)
  131. # Initialize shared storage
  132. initialize_share_data()
  133. # Initialize pipeline status for two different workspaces
  134. workspace1 = "test_workspace_1"
  135. workspace2 = "test_workspace_2"
  136. await initialize_pipeline_status(workspace1)
  137. await initialize_pipeline_status(workspace2)
  138. # Get pipeline status data for both workspaces
  139. data1 = await get_namespace_data("pipeline_status", workspace=workspace1)
  140. data2 = await get_namespace_data("pipeline_status", workspace=workspace2)
  141. # Verify they are independent objects
  142. assert (
  143. data1 is not data2
  144. ), "Pipeline status data objects are the same (should be different)"
  145. # Modify workspace1's data and verify workspace2 is not affected
  146. data1["test_key"] = "workspace1_value"
  147. # Re-fetch to ensure we get the latest data
  148. data1_check = await get_namespace_data("pipeline_status", workspace=workspace1)
  149. data2_check = await get_namespace_data("pipeline_status", workspace=workspace2)
  150. assert "test_key" in data1_check, "test_key not found in workspace1"
  151. assert (
  152. data1_check["test_key"] == "workspace1_value"
  153. ), f"workspace1 test_key value incorrect: {data1_check.get('test_key')}"
  154. assert (
  155. "test_key" not in data2_check
  156. ), f"test_key leaked to workspace2: {data2_check.get('test_key')}"
  157. print("✅ PASSED: Pipeline Status Isolation")
  158. print(" Different workspaces have isolated pipeline status")
  159. # =============================================================================
  160. # Test 2: Lock Mechanism Test (No Deadlocks)
  161. # =============================================================================
  162. @pytest.mark.offline
  163. async def test_lock_mechanism(stress_test_mode, parallel_workers):
  164. """
  165. Test that the new keyed lock mechanism works correctly without deadlocks.
  166. Tests both parallel execution for different workspaces and serialization
  167. for the same workspace.
  168. """
  169. # Purpose: Validate that keyed locks isolate workspaces while serializing
  170. # requests within the same workspace. Scope: get_namespace_lock scheduling
  171. # semantics for both cross-workspace and single-workspace cases.
  172. print("\n" + "=" * 60)
  173. print("TEST 2: Lock Mechanism (No Deadlocks)")
  174. print("=" * 60)
  175. # Test 2.1: Different workspaces should run in parallel
  176. print("\nTest 2.1: Different workspaces locks should be parallel")
  177. # Support stress testing with configurable number of workers
  178. num_workers = parallel_workers if stress_test_mode else 3
  179. parallel_workload = [
  180. (f"ws_{chr(97 + i)}", f"ws_{chr(97 + i)}", "test_namespace")
  181. for i in range(num_workers)
  182. ]
  183. max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism(
  184. parallel_workload
  185. )
  186. assert max_parallel >= 2, (
  187. "Locks for distinct workspaces should overlap; "
  188. f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}"
  189. )
  190. print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)")
  191. print(
  192. f" Locks overlapped for different workspaces (max concurrency={max_parallel})"
  193. )
  194. print(
  195. f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers"
  196. )
  197. # Test 2.2: Same workspace should serialize
  198. print("\nTest 2.2: Same workspace locks should serialize")
  199. serial_workload = [
  200. ("serial_run_1", "ws_same", "test_namespace"),
  201. ("serial_run_2", "ws_same", "test_namespace"),
  202. ]
  203. (
  204. max_parallel_serial,
  205. timeline_serial,
  206. metrics_serial,
  207. ) = await _measure_lock_parallelism(serial_workload)
  208. assert max_parallel_serial == 1, (
  209. "Same workspace locks should not overlap; "
  210. f"observed {max_parallel_serial} with timeline {timeline_serial}"
  211. )
  212. _assert_no_timeline_overlap(timeline_serial)
  213. print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)")
  214. print(" Same workspace operations executed sequentially with no overlap")
  215. print(
  216. f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks"
  217. )
  218. # =============================================================================
  219. # Test 3: Backward Compatibility Test
  220. # =============================================================================
  221. @pytest.mark.offline
  222. async def test_backward_compatibility():
  223. """
  224. Test that legacy code without workspace parameter still works correctly.
  225. """
  226. # Purpose: Validate backward-compatible defaults when workspace arguments
  227. # are omitted. Scope: get_final_namespace, set/get_default_workspace and
  228. # initialize_pipeline_status fallback behavior.
  229. print("\n" + "=" * 60)
  230. print("TEST 3: Backward Compatibility")
  231. print("=" * 60)
  232. # Test 3.1: get_final_namespace with None should use default workspace
  233. print("\nTest 3.1: get_final_namespace with workspace=None")
  234. set_default_workspace("my_default_workspace")
  235. final_ns = get_final_namespace("pipeline_status")
  236. expected = "my_default_workspace:pipeline_status"
  237. assert final_ns == expected, f"Expected {expected}, got {final_ns}"
  238. print("✅ PASSED: Backward Compatibility - get_final_namespace")
  239. print(f" Correctly uses default workspace: {final_ns}")
  240. # Test 3.2: get_default_workspace
  241. print("\nTest 3.2: get/set default workspace")
  242. set_default_workspace("test_default")
  243. retrieved = get_default_workspace()
  244. assert retrieved == "test_default", f"Expected 'test_default', got {retrieved}"
  245. print("✅ PASSED: Backward Compatibility - default workspace")
  246. print(f" Default workspace set/get correctly: {retrieved}")
  247. # Test 3.3: Empty workspace handling
  248. print("\nTest 3.3: Empty workspace handling")
  249. set_default_workspace("")
  250. final_ns_empty = get_final_namespace("pipeline_status", workspace=None)
  251. expected_empty = "pipeline_status" # Should be just the namespace without ':'
  252. assert (
  253. final_ns_empty == expected_empty
  254. ), f"Expected '{expected_empty}', got '{final_ns_empty}'"
  255. print("✅ PASSED: Backward Compatibility - empty workspace")
  256. print(f" Empty workspace handled correctly: '{final_ns_empty}'")
  257. # Test 3.4: None workspace with default set
  258. print("\nTest 3.4: initialize_pipeline_status with workspace=None")
  259. set_default_workspace("compat_test_workspace")
  260. initialize_share_data()
  261. await initialize_pipeline_status(workspace=None) # Should use default
  262. # Try to get data using the default workspace explicitly
  263. data = await get_namespace_data(
  264. "pipeline_status", workspace="compat_test_workspace"
  265. )
  266. assert (
  267. data is not None
  268. ), "Failed to initialize pipeline status with default workspace"
  269. print("✅ PASSED: Backward Compatibility - pipeline init with None")
  270. print(" Pipeline status initialized with default workspace")
  271. # =============================================================================
  272. # Test 4: Multi-Workspace Concurrency Test
  273. # =============================================================================
  274. @pytest.mark.offline
  275. async def test_multi_workspace_concurrency():
  276. """
  277. Test that multiple workspaces can operate concurrently without interference.
  278. Simulates concurrent operations on different workspaces.
  279. """
  280. # Purpose: Simulate concurrent workloads touching pipeline_status across
  281. # workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and
  282. # shared dictionary mutation while ensuring isolation.
  283. print("\n" + "=" * 60)
  284. print("TEST 4: Multi-Workspace Concurrency")
  285. print("=" * 60)
  286. initialize_share_data()
  287. async def workspace_operations(workspace_id):
  288. """Simulate operations on a specific workspace"""
  289. print(f"\n [{workspace_id}] Starting operations")
  290. # Initialize pipeline status
  291. await initialize_pipeline_status(workspace_id)
  292. # Get lock and perform operations
  293. lock = get_namespace_lock("test_operations", workspace_id)
  294. async with lock:
  295. # Get workspace data
  296. data = await get_namespace_data("pipeline_status", workspace=workspace_id)
  297. # Modify data
  298. data[f"{workspace_id}_key"] = f"{workspace_id}_value"
  299. data["timestamp"] = time.time()
  300. # Simulate some work
  301. await asyncio.sleep(0.1)
  302. print(f" [{workspace_id}] Completed operations")
  303. return workspace_id
  304. # Run multiple workspaces concurrently
  305. workspaces = ["concurrent_ws_1", "concurrent_ws_2", "concurrent_ws_3"]
  306. start = time.time()
  307. results_list = await asyncio.gather(
  308. *[workspace_operations(ws) for ws in workspaces]
  309. )
  310. elapsed = time.time() - start
  311. print(f"\n All workspaces completed in {elapsed:.2f}s")
  312. # Verify all workspaces completed
  313. assert set(results_list) == set(workspaces), "Not all workspaces completed"
  314. print("✅ PASSED: Multi-Workspace Concurrency - Execution")
  315. print(
  316. f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s"
  317. )
  318. # Verify data isolation - each workspace should have its own data
  319. print("\n Verifying data isolation...")
  320. for ws in workspaces:
  321. data = await get_namespace_data("pipeline_status", workspace=ws)
  322. expected_key = f"{ws}_key"
  323. expected_value = f"{ws}_value"
  324. assert (
  325. expected_key in data
  326. ), f"Data not properly isolated for {ws}: missing {expected_key}"
  327. assert (
  328. data[expected_key] == expected_value
  329. ), f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})"
  330. print(f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}")
  331. print("✅ PASSED: Multi-Workspace Concurrency - Data Isolation")
  332. print(" All workspaces have properly isolated data")
  333. # =============================================================================
  334. # Test 5: NamespaceLock Re-entrance Protection
  335. # =============================================================================
  336. @pytest.mark.offline
  337. async def test_namespace_lock_reentrance():
  338. """
  339. Test that NamespaceLock prevents re-entrance in the same coroutine
  340. and allows concurrent use in different coroutines.
  341. """
  342. # Purpose: Ensure NamespaceLock enforces single entry per coroutine while
  343. # allowing concurrent reuse through ContextVar isolation. Scope: lock
  344. # re-entrance checks and concurrent gather semantics.
  345. print("\n" + "=" * 60)
  346. print("TEST 5: NamespaceLock Re-entrance Protection")
  347. print("=" * 60)
  348. # Test 5.1: Same coroutine re-entrance should fail
  349. print("\nTest 5.1: Same coroutine re-entrance should raise RuntimeError")
  350. lock = get_namespace_lock("test_reentrance", "test_ws")
  351. reentrance_failed_correctly = False
  352. try:
  353. async with lock:
  354. print(" Acquired lock first time")
  355. # Try to acquire the same lock again in the same coroutine
  356. async with lock:
  357. print(" ERROR: Should not reach here - re-entrance succeeded!")
  358. except RuntimeError as e:
  359. if "already acquired" in str(e).lower():
  360. print(f" ✓ Re-entrance correctly blocked: {e}")
  361. reentrance_failed_correctly = True
  362. else:
  363. raise
  364. assert reentrance_failed_correctly, "Re-entrance protection not working"
  365. print("✅ PASSED: NamespaceLock Re-entrance Protection")
  366. print(" Re-entrance correctly raises RuntimeError")
  367. # Test 5.2: Same NamespaceLock instance in different coroutines should succeed
  368. print("\nTest 5.2: Same NamespaceLock instance in different coroutines")
  369. shared_lock = get_namespace_lock("test_concurrent", "test_ws")
  370. concurrent_results = []
  371. async def use_shared_lock(coroutine_id):
  372. """Use the same NamespaceLock instance"""
  373. async with shared_lock:
  374. concurrent_results.append(f"coroutine_{coroutine_id}_start")
  375. await asyncio.sleep(0.1)
  376. concurrent_results.append(f"coroutine_{coroutine_id}_end")
  377. # This should work because each coroutine gets its own ContextVar
  378. await asyncio.gather(
  379. use_shared_lock(1),
  380. use_shared_lock(2),
  381. )
  382. # Both coroutines should have completed
  383. expected_entries = 4 # 2 starts + 2 ends
  384. assert (
  385. len(concurrent_results) == expected_entries
  386. ), f"Expected {expected_entries} entries, got {len(concurrent_results)}"
  387. print("✅ PASSED: NamespaceLock Concurrent Reuse")
  388. print(
  389. f" Same NamespaceLock instance used successfully in {expected_entries // 2} concurrent coroutines"
  390. )
  391. # =============================================================================
  392. # Test 6: Different Namespace Lock Isolation
  393. # =============================================================================
  394. @pytest.mark.offline
  395. async def test_different_namespace_lock_isolation():
  396. """
  397. Test that locks for different namespaces (same workspace) are independent.
  398. """
  399. # Purpose: Confirm that namespace isolation is enforced even when workspace
  400. # is the same. Scope: get_namespace_lock behavior when namespaces differ.
  401. print("\n" + "=" * 60)
  402. print("TEST 6: Different Namespace Lock Isolation")
  403. print("=" * 60)
  404. print("\nTesting locks with same workspace but different namespaces")
  405. workload = [
  406. ("ns_a", "same_ws", "namespace_a"),
  407. ("ns_b", "same_ws", "namespace_b"),
  408. ("ns_c", "same_ws", "namespace_c"),
  409. ]
  410. max_parallel, timeline, metrics = await _measure_lock_parallelism(workload)
  411. assert max_parallel >= 2, (
  412. "Different namespaces within the same workspace should run concurrently; "
  413. f"observed max concurrency {max_parallel} with timeline {timeline}"
  414. )
  415. print("✅ PASSED: Different Namespace Lock Isolation")
  416. print(
  417. f" Different namespace locks ran in parallel (max concurrency={max_parallel})"
  418. )
  419. print(
  420. f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces"
  421. )
  422. # =============================================================================
  423. # Test 7: Error Handling
  424. # =============================================================================
  425. @pytest.mark.offline
  426. async def test_error_handling():
  427. """
  428. Test error handling for invalid workspace configurations.
  429. """
  430. # Purpose: Validate guardrails for workspace normalization and namespace
  431. # derivation. Scope: set_default_workspace conversions and get_final_namespace
  432. # failure paths when configuration is invalid.
  433. print("\n" + "=" * 60)
  434. print("TEST 7: Error Handling")
  435. print("=" * 60)
  436. # Test 7.0: Missing default workspace should raise ValueError
  437. print("\nTest 7.0: Missing workspace raises ValueError")
  438. with pytest.raises(ValueError):
  439. get_final_namespace("test_namespace", workspace=None)
  440. # Test 7.1: set_default_workspace(None) converts to empty string
  441. print("\nTest 7.1: set_default_workspace(None) converts to empty string")
  442. set_default_workspace(None)
  443. default_ws = get_default_workspace()
  444. # Should convert None to "" automatically
  445. assert default_ws == "", f"Expected empty string, got: '{default_ws}'"
  446. print("✅ PASSED: Error Handling - None to Empty String")
  447. print(
  448. f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'"
  449. )
  450. # Test 7.2: Empty string workspace behavior
  451. print("\nTest 7.2: Empty string workspace creates valid namespace")
  452. # With empty workspace, should create namespace without colon
  453. final_ns = get_final_namespace("test_namespace", workspace="")
  454. assert final_ns == "test_namespace", f"Unexpected namespace: '{final_ns}'"
  455. print("✅ PASSED: Error Handling - Empty Workspace Namespace")
  456. print(f" Empty workspace creates valid namespace: '{final_ns}'")
  457. # Restore default workspace for other tests
  458. set_default_workspace("")
  459. # =============================================================================
  460. # Test 8: Update Flags Workspace Isolation
  461. # =============================================================================
  462. @pytest.mark.offline
  463. async def test_update_flags_workspace_isolation():
  464. """
  465. Test that update flags are properly isolated between workspaces.
  466. """
  467. # Purpose: Confirm update flag setters/readers respect workspace scoping.
  468. # Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status,
  469. # and get_update_flag interactions across namespaces.
  470. print("\n" + "=" * 60)
  471. print("TEST 8: Update Flags Workspace Isolation")
  472. print("=" * 60)
  473. initialize_share_data()
  474. workspace1 = "update_flags_ws1"
  475. workspace2 = "update_flags_ws2"
  476. test_namespace = "test_update_flags_ns"
  477. # Initialize namespaces for both workspaces
  478. await initialize_pipeline_status(workspace1)
  479. await initialize_pipeline_status(workspace2)
  480. # Test 8.1: set_all_update_flags isolation
  481. print("\nTest 8.1: set_all_update_flags workspace isolation")
  482. # Create flags for both workspaces (simulating workers)
  483. flag1_obj = await get_update_flag(test_namespace, workspace=workspace1)
  484. flag2_obj = await get_update_flag(test_namespace, workspace=workspace2)
  485. # Initial state should be False
  486. assert flag1_obj.value is False, "Flag1 initial value should be False"
  487. assert flag2_obj.value is False, "Flag2 initial value should be False"
  488. # Set all flags for workspace1
  489. await set_all_update_flags(test_namespace, workspace=workspace1)
  490. # Check that only workspace1's flags are set
  491. assert (
  492. flag1_obj.value is True
  493. ), f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}"
  494. assert (
  495. flag2_obj.value is False
  496. ), f"Flag2 should still be False, got {flag2_obj.value}"
  497. print("✅ PASSED: Update Flags - set_all_update_flags Isolation")
  498. print(
  499. f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}"
  500. )
  501. # Test 8.2: clear_all_update_flags isolation
  502. print("\nTest 8.2: clear_all_update_flags workspace isolation")
  503. # Set flags for both workspaces
  504. await set_all_update_flags(test_namespace, workspace=workspace1)
  505. await set_all_update_flags(test_namespace, workspace=workspace2)
  506. # Verify both are set
  507. assert flag1_obj.value is True, "Flag1 should be True"
  508. assert flag2_obj.value is True, "Flag2 should be True"
  509. # Clear only workspace1
  510. await clear_all_update_flags(test_namespace, workspace=workspace1)
  511. # Check that only workspace1's flags are cleared
  512. assert (
  513. flag1_obj.value is False
  514. ), f"Flag1 should be False after clear, got {flag1_obj.value}"
  515. assert flag2_obj.value is True, f"Flag2 should still be True, got {flag2_obj.value}"
  516. print("✅ PASSED: Update Flags - clear_all_update_flags Isolation")
  517. print(
  518. f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}"
  519. )
  520. # Test 8.3: get_all_update_flags_status workspace filtering
  521. print("\nTest 8.3: get_all_update_flags_status workspace filtering")
  522. # Initialize more namespaces for testing
  523. await get_update_flag("ns_a", workspace=workspace1)
  524. await get_update_flag("ns_b", workspace=workspace1)
  525. await get_update_flag("ns_c", workspace=workspace2)
  526. # Set flags for workspace1
  527. await set_all_update_flags("ns_a", workspace=workspace1)
  528. await set_all_update_flags("ns_b", workspace=workspace1)
  529. # Set flags for workspace2
  530. await set_all_update_flags("ns_c", workspace=workspace2)
  531. # Get status for workspace1 only
  532. status1 = await get_all_update_flags_status(workspace=workspace1)
  533. # Check that workspace1's namespaces are present
  534. # The keys should include workspace1's namespaces but not workspace2's
  535. workspace1_keys = [k for k in status1.keys() if workspace1 in k]
  536. workspace2_keys = [k for k in status1.keys() if workspace2 in k]
  537. assert (
  538. len(workspace1_keys) > 0
  539. ), f"workspace1 keys should be present, got {len(workspace1_keys)}"
  540. assert (
  541. len(workspace2_keys) == 0
  542. ), f"workspace2 keys should not be present, got {len(workspace2_keys)}"
  543. for key, values in status1.items():
  544. assert all(values), f"All flags in {key} should be True, got {values}"
  545. # Workspace2 query should only surface workspace2 namespaces
  546. status2 = await get_all_update_flags_status(workspace=workspace2)
  547. expected_ws2_keys = {
  548. f"{workspace2}:{test_namespace}",
  549. f"{workspace2}:ns_c",
  550. }
  551. assert (
  552. set(status2.keys()) == expected_ws2_keys
  553. ), f"Unexpected namespaces for workspace2: {status2.keys()}"
  554. for key, values in status2.items():
  555. assert all(values), f"All flags in {key} should be True, got {values}"
  556. print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
  557. print(
  558. f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}"
  559. )
  560. # =============================================================================
  561. # Test 9: Empty Workspace Standardization
  562. # =============================================================================
  563. @pytest.mark.offline
  564. async def test_empty_workspace_standardization():
  565. """
  566. Test that empty workspace is properly standardized to "" instead of "_".
  567. """
  568. # Purpose: Verify namespace formatting when workspace is an empty string.
  569. # Scope: get_final_namespace output and initialize_pipeline_status behavior
  570. # between empty and non-empty workspaces.
  571. print("\n" + "=" * 60)
  572. print("TEST 9: Empty Workspace Standardization")
  573. print("=" * 60)
  574. # Test 9.1: Empty string workspace creates namespace without colon
  575. print("\nTest 9.1: Empty string workspace namespace format")
  576. set_default_workspace("")
  577. final_ns = get_final_namespace("test_namespace", workspace=None)
  578. # Should be just "test_namespace" without colon prefix
  579. assert (
  580. final_ns == "test_namespace"
  581. ), f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')"
  582. print("✅ PASSED: Empty Workspace Standardization - Format")
  583. print(f" Empty workspace creates correct namespace: '{final_ns}'")
  584. # Test 9.2: Empty workspace vs non-empty workspace behavior
  585. print("\nTest 9.2: Empty vs non-empty workspace behavior")
  586. initialize_share_data()
  587. # Initialize with empty workspace
  588. await initialize_pipeline_status(workspace="")
  589. data_empty = await get_namespace_data("pipeline_status", workspace="")
  590. # Initialize with non-empty workspace
  591. await initialize_pipeline_status(workspace="test_ws")
  592. data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws")
  593. # They should be different objects
  594. assert (
  595. data_empty is not data_nonempty
  596. ), "Empty and non-empty workspaces share data (should be independent)"
  597. print("✅ PASSED: Empty Workspace Standardization - Behavior")
  598. print(" Empty and non-empty workspaces have independent data")
  599. # =============================================================================
  600. # Test 10: JsonKVStorage Workspace Isolation (Integration Test)
  601. # =============================================================================
  602. @pytest.mark.offline
  603. async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
  604. """
  605. Integration test: Verify JsonKVStorage properly isolates data between workspaces.
  606. Creates two JsonKVStorage instances with different workspaces, writes different data,
  607. and verifies they don't mix.
  608. """
  609. # Purpose: Ensure JsonKVStorage respects workspace-specific directories and data.
  610. # Scope: storage initialization, upsert/get_by_id operations, and filesystem layout
  611. # inside the temporary working directory.
  612. print("\n" + "=" * 60)
  613. print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
  614. print("=" * 60)
  615. # Create temporary test directory under project temp/
  616. test_dir = str(
  617. Path(__file__).parent.parent.parent
  618. / "temp/test_json_kv_storage_workspace_isolation"
  619. )
  620. if os.path.exists(test_dir):
  621. shutil.rmtree(test_dir)
  622. os.makedirs(test_dir, exist_ok=True)
  623. print(f"\n Using test directory: {test_dir}")
  624. try:
  625. initialize_share_data()
  626. # Mock embedding function
  627. async def mock_embedding_func(texts: list[str]) -> np.ndarray:
  628. return np.random.rand(len(texts), 384) # 384-dimensional vectors
  629. # Global config
  630. global_config = {
  631. "working_dir": test_dir,
  632. "embedding_batch_num": 10,
  633. }
  634. # Test 10.1: Create two JsonKVStorage instances with different workspaces
  635. print(
  636. "\nTest 10.1: Create two JsonKVStorage instances with different workspaces"
  637. )
  638. from lightrag.kg.json_kv_impl import JsonKVStorage
  639. storage1 = JsonKVStorage(
  640. namespace="entities",
  641. workspace="workspace1",
  642. global_config=global_config,
  643. embedding_func=mock_embedding_func,
  644. )
  645. storage2 = JsonKVStorage(
  646. namespace="entities",
  647. workspace="workspace2",
  648. global_config=global_config,
  649. embedding_func=mock_embedding_func,
  650. )
  651. # Initialize both storages
  652. await storage1.initialize()
  653. await storage2.initialize()
  654. print(" Storage1 created: workspace=workspace1, namespace=entities")
  655. print(" Storage2 created: workspace=workspace2, namespace=entities")
  656. # Test 10.2: Write different data to each storage
  657. print("\nTest 10.2: Write different data to each storage")
  658. # Write to storage1 (upsert expects dict[str, dict])
  659. await storage1.upsert(
  660. {
  661. "entity1": {
  662. "content": "Data from workspace1 - AI Research",
  663. "type": "entity",
  664. },
  665. "entity2": {
  666. "content": "Data from workspace1 - Machine Learning",
  667. "type": "entity",
  668. },
  669. }
  670. )
  671. print(" Written to storage1: entity1, entity2")
  672. # Persist data to disk
  673. await storage1.index_done_callback()
  674. print(" Persisted storage1 data to disk")
  675. # Write to storage2
  676. await storage2.upsert(
  677. {
  678. "entity1": {
  679. "content": "Data from workspace2 - Deep Learning",
  680. "type": "entity",
  681. },
  682. "entity2": {
  683. "content": "Data from workspace2 - Neural Networks",
  684. "type": "entity",
  685. },
  686. }
  687. )
  688. print(" Written to storage2: entity1, entity2")
  689. # Persist data to disk
  690. await storage2.index_done_callback()
  691. print(" Persisted storage2 data to disk")
  692. # Test 10.3: Read data from each storage and verify isolation
  693. print("\nTest 10.3: Read data and verify isolation")
  694. # Read from storage1
  695. result1_entity1 = await storage1.get_by_id("entity1")
  696. result1_entity2 = await storage1.get_by_id("entity2")
  697. # Read from storage2
  698. result2_entity1 = await storage2.get_by_id("entity1")
  699. result2_entity2 = await storage2.get_by_id("entity2")
  700. print(f" Storage1 entity1: {result1_entity1}")
  701. print(f" Storage1 entity2: {result1_entity2}")
  702. print(f" Storage2 entity1: {result2_entity1}")
  703. print(f" Storage2 entity2: {result2_entity2}")
  704. # Verify isolation (get_by_id returns dict)
  705. assert result1_entity1 is not None, "Storage1 entity1 should not be None"
  706. assert result1_entity2 is not None, "Storage1 entity2 should not be None"
  707. assert result2_entity1 is not None, "Storage2 entity1 should not be None"
  708. assert result2_entity2 is not None, "Storage2 entity2 should not be None"
  709. assert (
  710. result1_entity1.get("content") == "Data from workspace1 - AI Research"
  711. ), "Storage1 entity1 content mismatch"
  712. assert (
  713. result1_entity2.get("content") == "Data from workspace1 - Machine Learning"
  714. ), "Storage1 entity2 content mismatch"
  715. assert (
  716. result2_entity1.get("content") == "Data from workspace2 - Deep Learning"
  717. ), "Storage2 entity1 content mismatch"
  718. assert (
  719. result2_entity2.get("content") == "Data from workspace2 - Neural Networks"
  720. ), "Storage2 entity2 content mismatch"
  721. assert result1_entity1.get("content") != result2_entity1.get(
  722. "content"
  723. ), "Storage1 and Storage2 entity1 should have different content"
  724. assert result1_entity2.get("content") != result2_entity2.get(
  725. "content"
  726. ), "Storage1 and Storage2 entity2 should have different content"
  727. print("✅ PASSED: JsonKVStorage - Data Isolation")
  728. print(
  729. " Two storage instances correctly isolated: ws1 and ws2 have different data"
  730. )
  731. # Test 10.4: Verify file structure
  732. print("\nTest 10.4: Verify file structure")
  733. ws1_dir = Path(test_dir) / "workspace1"
  734. ws2_dir = Path(test_dir) / "workspace2"
  735. ws1_exists = ws1_dir.exists()
  736. ws2_exists = ws2_dir.exists()
  737. print(f" workspace1 directory exists: {ws1_exists}")
  738. print(f" workspace2 directory exists: {ws2_exists}")
  739. assert ws1_exists, "workspace1 directory should exist"
  740. assert ws2_exists, "workspace2 directory should exist"
  741. print("✅ PASSED: JsonKVStorage - File Structure")
  742. print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
  743. finally:
  744. # Cleanup test directory (unless keep_test_artifacts is set)
  745. if os.path.exists(test_dir) and not keep_test_artifacts:
  746. shutil.rmtree(test_dir)
  747. print(f"\n Cleaned up test directory: {test_dir}")
  748. elif keep_test_artifacts:
  749. print(f"\n Kept test directory for inspection: {test_dir}")
  750. # =============================================================================
  751. # Test 11: LightRAG End-to-End Integration Test
  752. # =============================================================================
  753. @pytest.mark.offline
  754. async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts):
  755. """
  756. End-to-end test: Create two LightRAG instances with different workspaces,
  757. insert different data, and verify file separation.
  758. Uses mock LLM and embedding functions to avoid external API calls.
  759. """
  760. # Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace.
  761. # Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem
  762. # verification for generated storage files.
  763. print("\n" + "=" * 60)
  764. print("TEST 11: LightRAG End-to-End Workspace Isolation")
  765. print("=" * 60)
  766. # Create temporary test directory under project temp/
  767. test_dir = str(
  768. Path(__file__).parent.parent.parent
  769. / "temp/test_lightrag_end_to_end_workspace_isolation"
  770. )
  771. if os.path.exists(test_dir):
  772. shutil.rmtree(test_dir)
  773. os.makedirs(test_dir, exist_ok=True)
  774. print(f"\n Using test directory: {test_dir}")
  775. try:
  776. # Factory function to create different mock LLM functions for each workspace
  777. def create_mock_llm_func(workspace_name):
  778. """Create a mock LLM function that returns different content based on workspace"""
  779. async def mock_llm_func(
  780. prompt, system_prompt=None, history_messages=[], **kwargs
  781. ) -> str:
  782. # Add coroutine switching to simulate async I/O and allow concurrent execution
  783. await asyncio.sleep(0)
  784. # Return different responses based on workspace
  785. # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description
  786. # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description
  787. if workspace_name == "project_a":
  788. return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines.
  789. entity<|#|>Machine Learning<|#|>concept<|#|>Machine Learning is a subset of AI that enables systems to learn from data.
  790. relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related field<|#|>Machine Learning is a key component and subset of Artificial Intelligence.
  791. <|COMPLETE|>"""
  792. else: # project_b
  793. return """entity<|#|>Deep Learning<|#|>concept<|#|>Deep Learning is a subset of machine learning using neural networks with multiple layers.
  794. entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks.
  795. relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations.
  796. <|COMPLETE|>"""
  797. return mock_llm_func
  798. # Mock embedding function
  799. async def mock_embedding_func(texts: list[str]) -> np.ndarray:
  800. # Add coroutine switching to simulate async I/O and allow concurrent execution
  801. await asyncio.sleep(0)
  802. return np.random.rand(len(texts), 384) # 384-dimensional vectors
  803. # Test 11.1: Create two LightRAG instances with different workspaces
  804. print("\nTest 11.1: Create two LightRAG instances with different workspaces")
  805. from lightrag import LightRAG
  806. from lightrag.utils import EmbeddingFunc, Tokenizer
  807. # Create different mock LLM functions for each workspace
  808. mock_llm_func_a = create_mock_llm_func("project_a")
  809. mock_llm_func_b = create_mock_llm_func("project_b")
  810. class _SimpleTokenizerImpl:
  811. def encode(self, content: str) -> list[int]:
  812. return [ord(ch) for ch in content]
  813. def decode(self, tokens: list[int]) -> str:
  814. return "".join(chr(t) for t in tokens)
  815. tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl())
  816. rag1 = LightRAG(
  817. working_dir=test_dir,
  818. workspace="project_a",
  819. llm_model_func=mock_llm_func_a,
  820. embedding_func=EmbeddingFunc(
  821. embedding_dim=384,
  822. max_token_size=8192,
  823. func=mock_embedding_func,
  824. ),
  825. tokenizer=tokenizer,
  826. )
  827. rag2 = LightRAG(
  828. working_dir=test_dir,
  829. workspace="project_b",
  830. llm_model_func=mock_llm_func_b,
  831. embedding_func=EmbeddingFunc(
  832. embedding_dim=384,
  833. max_token_size=8192,
  834. func=mock_embedding_func,
  835. ),
  836. tokenizer=tokenizer,
  837. )
  838. # Initialize storages
  839. await rag1.initialize_storages()
  840. await rag2.initialize_storages()
  841. print(" RAG1 created: workspace=project_a")
  842. print(" RAG2 created: workspace=project_b")
  843. # Test 11.2: Insert different data to each RAG instance (CONCURRENTLY)
  844. print("\nTest 11.2: Insert different data to each RAG instance (concurrently)")
  845. text_for_project_a = "This document is about Artificial Intelligence and Machine Learning. AI is transforming the world."
  846. text_for_project_b = "This document is about Deep Learning and Neural Networks. Deep learning uses multiple layers."
  847. # Insert to both projects concurrently to test workspace isolation under concurrent load
  848. print(" Starting concurrent insert operations...")
  849. start_time = time.time()
  850. await asyncio.gather(
  851. rag1.ainsert(text_for_project_a), rag2.ainsert(text_for_project_b)
  852. )
  853. elapsed_time = time.time() - start_time
  854. print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)")
  855. print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)")
  856. print(f" Total concurrent execution time: {elapsed_time:.3f}s")
  857. # Test 11.3: Verify file structure
  858. print("\nTest 11.3: Verify workspace directory structure")
  859. project_a_dir = Path(test_dir) / "project_a"
  860. project_b_dir = Path(test_dir) / "project_b"
  861. project_a_exists = project_a_dir.exists()
  862. project_b_exists = project_b_dir.exists()
  863. print(f" project_a directory: {project_a_dir}")
  864. print(f" project_a exists: {project_a_exists}")
  865. print(f" project_b directory: {project_b_dir}")
  866. print(f" project_b exists: {project_b_exists}")
  867. assert project_a_exists, "project_a directory should exist"
  868. assert project_b_exists, "project_b directory should exist"
  869. # List files in each directory
  870. print("\n Files in project_a/:")
  871. for file in sorted(project_a_dir.glob("*")):
  872. if file.is_file():
  873. size = file.stat().st_size
  874. print(f" - {file.name} ({size} bytes)")
  875. print("\n Files in project_b/:")
  876. for file in sorted(project_b_dir.glob("*")):
  877. if file.is_file():
  878. size = file.stat().st_size
  879. print(f" - {file.name} ({size} bytes)")
  880. print("✅ PASSED: LightRAG E2E - File Structure")
  881. print(" Workspace directories correctly created and separated")
  882. # Test 11.4: Verify data isolation by checking file contents
  883. print("\nTest 11.4: Verify data isolation (check file contents)")
  884. # Check if full_docs storage files exist and contain different content
  885. docs_a_file = project_a_dir / "kv_store_full_docs.json"
  886. docs_b_file = project_b_dir / "kv_store_full_docs.json"
  887. if docs_a_file.exists() and docs_b_file.exists():
  888. import json
  889. with open(docs_a_file, "r") as f:
  890. docs_a_content = json.load(f)
  891. with open(docs_b_file, "r") as f:
  892. docs_b_content = json.load(f)
  893. print(f" project_a doc count: {len(docs_a_content)}")
  894. print(f" project_b doc count: {len(docs_b_content)}")
  895. # Verify they contain different data
  896. assert (
  897. docs_a_content != docs_b_content
  898. ), "Document storage not properly isolated"
  899. # Verify each workspace contains its own text content
  900. docs_a_str = json.dumps(docs_a_content)
  901. docs_b_str = json.dumps(docs_b_content)
  902. # Check project_a contains its text and NOT project_b's text
  903. assert (
  904. "Artificial Intelligence" in docs_a_str
  905. ), "project_a should contain 'Artificial Intelligence'"
  906. assert (
  907. "Machine Learning" in docs_a_str
  908. ), "project_a should contain 'Machine Learning'"
  909. assert (
  910. "Deep Learning" not in docs_a_str
  911. ), "project_a should NOT contain 'Deep Learning' from project_b"
  912. assert (
  913. "Neural Networks" not in docs_a_str
  914. ), "project_a should NOT contain 'Neural Networks' from project_b"
  915. # Check project_b contains its text and NOT project_a's text
  916. assert (
  917. "Deep Learning" in docs_b_str
  918. ), "project_b should contain 'Deep Learning'"
  919. assert (
  920. "Neural Networks" in docs_b_str
  921. ), "project_b should contain 'Neural Networks'"
  922. assert (
  923. "Artificial Intelligence" not in docs_b_str
  924. ), "project_b should NOT contain 'Artificial Intelligence' from project_a"
  925. # Note: "Machine Learning" might appear in project_b's text, so we skip that check
  926. print("✅ PASSED: LightRAG E2E - Data Isolation")
  927. print(" Document storage correctly isolated between workspaces")
  928. print(" project_a contains only its own data")
  929. print(" project_b contains only its own data")
  930. else:
  931. print(" Document storage files not found (may not be created yet)")
  932. print("✅ PASSED: LightRAG E2E - Data Isolation")
  933. print(" Skipped file content check (files not created)")
  934. print("\n ✓ Test complete - workspace isolation verified at E2E level")
  935. finally:
  936. # Cleanup test directory (unless keep_test_artifacts is set)
  937. if os.path.exists(test_dir) and not keep_test_artifacts:
  938. shutil.rmtree(test_dir)
  939. print(f"\n Cleaned up test directory: {test_dir}")
  940. elif keep_test_artifacts:
  941. print(f"\n Kept test directory for inspection: {test_dir}")