| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183 |
- #!/usr/bin/env python
- """
- Test script for Workspace Isolation Feature
- Comprehensive test suite covering workspace isolation in LightRAG:
- 1. Pipeline Status Isolation - Data isolation between workspaces
- 2. Lock Mechanism - Parallel execution for different workspaces, serial for same workspace
- 3. Backward Compatibility - Legacy code without workspace parameters
- 4. Multi-Workspace Concurrency - Concurrent operations on different workspaces
- 5. NamespaceLock Re-entrance Protection - Prevents deadlocks
- 6. Different Namespace Lock Isolation - Locks isolated by namespace
- 7. Error Handling - Invalid workspace configurations
- 8. Update Flags Workspace Isolation - Update flags properly isolated
- 9. Empty Workspace Standardization - Empty workspace handling
- 10. JsonKVStorage Workspace Isolation - Integration test for KV storage
- 11. LightRAG End-to-End Workspace Isolation - Complete E2E test with two instances
- Total: 11 test scenarios
- """
- import asyncio
- import time
- import os
- import shutil
- import numpy as np
- import pytest
- from pathlib import Path
- from typing import List, Tuple, Dict
- from lightrag.kg.shared_storage import (
- get_final_namespace,
- get_namespace_lock,
- get_default_workspace,
- set_default_workspace,
- initialize_share_data,
- finalize_share_data,
- initialize_pipeline_status,
- get_namespace_data,
- set_all_update_flags,
- clear_all_update_flags,
- get_all_update_flags_status,
- get_update_flag,
- )
- # =============================================================================
- # Test Configuration
- # =============================================================================
- # Test configuration is handled via pytest fixtures in conftest.py
- # - Use CLI options: --keep-artifacts, --stress-test, --test-workers=N
- # - Or environment variables: LIGHTRAG_KEEP_ARTIFACTS, LIGHTRAG_STRESS_TEST, LIGHTRAG_TEST_WORKERS
- # Priority: CLI options > Environment variables > Default values
- # =============================================================================
- # Pytest Fixtures
- # =============================================================================
- @pytest.fixture(autouse=True)
- def setup_shared_data():
- """Initialize shared data before each test"""
- initialize_share_data()
- yield
- finalize_share_data()
- async def _measure_lock_parallelism(
- workload: List[Tuple[str, str, str]], hold_time: float = 0.05
- ) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]:
- """Run lock acquisition workload and capture peak concurrency and timeline.
- Args:
- workload: List of (name, workspace, namespace) tuples
- hold_time: How long each worker holds the lock (seconds)
- Returns:
- Tuple of (max_parallel, timeline, metrics) where:
- - max_parallel: Peak number of concurrent lock holders
- - timeline: List of (name, event) tuples tracking execution order
- - metrics: Dict with performance metrics (total_duration, max_concurrency, etc.)
- """
- running = 0
- max_parallel = 0
- timeline: List[Tuple[str, str]] = []
- start_time = time.time()
- async def worker(name: str, workspace: str, namespace: str) -> None:
- nonlocal running, max_parallel
- lock = get_namespace_lock(namespace, workspace)
- async with lock:
- running += 1
- max_parallel = max(max_parallel, running)
- timeline.append((name, "start"))
- await asyncio.sleep(hold_time)
- timeline.append((name, "end"))
- running -= 1
- await asyncio.gather(*(worker(*args) for args in workload))
- metrics = {
- "total_duration": time.time() - start_time,
- "max_concurrency": max_parallel,
- "avg_hold_time": hold_time,
- "num_workers": len(workload),
- }
- return max_parallel, timeline, metrics
- def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
- """Ensure that timeline events never overlap for sequential execution.
- This function implements a finite state machine that validates:
- - No overlapping lock acquisitions (only one task active at a time)
- - Proper lock release order (task releases its own lock)
- - All locks are properly released
- Args:
- timeline: List of (name, event) tuples where event is "start" or "end"
- Raises:
- AssertionError: If timeline shows overlapping execution or improper locking
- """
- active_task = None
- for name, event in timeline:
- if event == "start":
- if active_task is not None:
- raise AssertionError(
- f"Task '{name}' started before '{active_task}' released the lock"
- )
- active_task = name
- else:
- if active_task != name:
- raise AssertionError(
- f"Task '{name}' finished while '{active_task}' was expected to hold the lock"
- )
- active_task = None
- if active_task is not None:
- raise AssertionError(f"Task '{active_task}' did not release the lock properly")
- # =============================================================================
- # Test 1: Pipeline Status Isolation Test
- # =============================================================================
- @pytest.mark.offline
- async def test_pipeline_status_isolation():
- """
- Test that pipeline status is isolated between different workspaces.
- """
- # Purpose: Ensure pipeline_status shared data remains unique per workspace.
- # Scope: initialize_pipeline_status and get_namespace_data interactions.
- print("\n" + "=" * 60)
- print("TEST 1: Pipeline Status Isolation")
- print("=" * 60)
- # Initialize shared storage
- initialize_share_data()
- # Initialize pipeline status for two different workspaces
- workspace1 = "test_workspace_1"
- workspace2 = "test_workspace_2"
- await initialize_pipeline_status(workspace1)
- await initialize_pipeline_status(workspace2)
- # Get pipeline status data for both workspaces
- data1 = await get_namespace_data("pipeline_status", workspace=workspace1)
- data2 = await get_namespace_data("pipeline_status", workspace=workspace2)
- # Verify they are independent objects
- assert (
- data1 is not data2
- ), "Pipeline status data objects are the same (should be different)"
- # Modify workspace1's data and verify workspace2 is not affected
- data1["test_key"] = "workspace1_value"
- # Re-fetch to ensure we get the latest data
- data1_check = await get_namespace_data("pipeline_status", workspace=workspace1)
- data2_check = await get_namespace_data("pipeline_status", workspace=workspace2)
- assert "test_key" in data1_check, "test_key not found in workspace1"
- assert (
- data1_check["test_key"] == "workspace1_value"
- ), f"workspace1 test_key value incorrect: {data1_check.get('test_key')}"
- assert (
- "test_key" not in data2_check
- ), f"test_key leaked to workspace2: {data2_check.get('test_key')}"
- print("✅ PASSED: Pipeline Status Isolation")
- print(" Different workspaces have isolated pipeline status")
- # =============================================================================
- # Test 2: Lock Mechanism Test (No Deadlocks)
- # =============================================================================
- @pytest.mark.offline
- async def test_lock_mechanism(stress_test_mode, parallel_workers):
- """
- Test that the new keyed lock mechanism works correctly without deadlocks.
- Tests both parallel execution for different workspaces and serialization
- for the same workspace.
- """
- # Purpose: Validate that keyed locks isolate workspaces while serializing
- # requests within the same workspace. Scope: get_namespace_lock scheduling
- # semantics for both cross-workspace and single-workspace cases.
- print("\n" + "=" * 60)
- print("TEST 2: Lock Mechanism (No Deadlocks)")
- print("=" * 60)
- # Test 2.1: Different workspaces should run in parallel
- print("\nTest 2.1: Different workspaces locks should be parallel")
- # Support stress testing with configurable number of workers
- num_workers = parallel_workers if stress_test_mode else 3
- parallel_workload = [
- (f"ws_{chr(97 + i)}", f"ws_{chr(97 + i)}", "test_namespace")
- for i in range(num_workers)
- ]
- max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism(
- parallel_workload
- )
- assert max_parallel >= 2, (
- "Locks for distinct workspaces should overlap; "
- f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}"
- )
- print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)")
- print(
- f" Locks overlapped for different workspaces (max concurrency={max_parallel})"
- )
- print(
- f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers"
- )
- # Test 2.2: Same workspace should serialize
- print("\nTest 2.2: Same workspace locks should serialize")
- serial_workload = [
- ("serial_run_1", "ws_same", "test_namespace"),
- ("serial_run_2", "ws_same", "test_namespace"),
- ]
- (
- max_parallel_serial,
- timeline_serial,
- metrics_serial,
- ) = await _measure_lock_parallelism(serial_workload)
- assert max_parallel_serial == 1, (
- "Same workspace locks should not overlap; "
- f"observed {max_parallel_serial} with timeline {timeline_serial}"
- )
- _assert_no_timeline_overlap(timeline_serial)
- print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)")
- print(" Same workspace operations executed sequentially with no overlap")
- print(
- f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks"
- )
- # =============================================================================
- # Test 3: Backward Compatibility Test
- # =============================================================================
- @pytest.mark.offline
- async def test_backward_compatibility():
- """
- Test that legacy code without workspace parameter still works correctly.
- """
- # Purpose: Validate backward-compatible defaults when workspace arguments
- # are omitted. Scope: get_final_namespace, set/get_default_workspace and
- # initialize_pipeline_status fallback behavior.
- print("\n" + "=" * 60)
- print("TEST 3: Backward Compatibility")
- print("=" * 60)
- # Test 3.1: get_final_namespace with None should use default workspace
- print("\nTest 3.1: get_final_namespace with workspace=None")
- set_default_workspace("my_default_workspace")
- final_ns = get_final_namespace("pipeline_status")
- expected = "my_default_workspace:pipeline_status"
- assert final_ns == expected, f"Expected {expected}, got {final_ns}"
- print("✅ PASSED: Backward Compatibility - get_final_namespace")
- print(f" Correctly uses default workspace: {final_ns}")
- # Test 3.2: get_default_workspace
- print("\nTest 3.2: get/set default workspace")
- set_default_workspace("test_default")
- retrieved = get_default_workspace()
- assert retrieved == "test_default", f"Expected 'test_default', got {retrieved}"
- print("✅ PASSED: Backward Compatibility - default workspace")
- print(f" Default workspace set/get correctly: {retrieved}")
- # Test 3.3: Empty workspace handling
- print("\nTest 3.3: Empty workspace handling")
- set_default_workspace("")
- final_ns_empty = get_final_namespace("pipeline_status", workspace=None)
- expected_empty = "pipeline_status" # Should be just the namespace without ':'
- assert (
- final_ns_empty == expected_empty
- ), f"Expected '{expected_empty}', got '{final_ns_empty}'"
- print("✅ PASSED: Backward Compatibility - empty workspace")
- print(f" Empty workspace handled correctly: '{final_ns_empty}'")
- # Test 3.4: None workspace with default set
- print("\nTest 3.4: initialize_pipeline_status with workspace=None")
- set_default_workspace("compat_test_workspace")
- initialize_share_data()
- await initialize_pipeline_status(workspace=None) # Should use default
- # Try to get data using the default workspace explicitly
- data = await get_namespace_data(
- "pipeline_status", workspace="compat_test_workspace"
- )
- assert (
- data is not None
- ), "Failed to initialize pipeline status with default workspace"
- print("✅ PASSED: Backward Compatibility - pipeline init with None")
- print(" Pipeline status initialized with default workspace")
- # =============================================================================
- # Test 4: Multi-Workspace Concurrency Test
- # =============================================================================
- @pytest.mark.offline
- async def test_multi_workspace_concurrency():
- """
- Test that multiple workspaces can operate concurrently without interference.
- Simulates concurrent operations on different workspaces.
- """
- # Purpose: Simulate concurrent workloads touching pipeline_status across
- # workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and
- # shared dictionary mutation while ensuring isolation.
- print("\n" + "=" * 60)
- print("TEST 4: Multi-Workspace Concurrency")
- print("=" * 60)
- initialize_share_data()
- async def workspace_operations(workspace_id):
- """Simulate operations on a specific workspace"""
- print(f"\n [{workspace_id}] Starting operations")
- # Initialize pipeline status
- await initialize_pipeline_status(workspace_id)
- # Get lock and perform operations
- lock = get_namespace_lock("test_operations", workspace_id)
- async with lock:
- # Get workspace data
- data = await get_namespace_data("pipeline_status", workspace=workspace_id)
- # Modify data
- data[f"{workspace_id}_key"] = f"{workspace_id}_value"
- data["timestamp"] = time.time()
- # Simulate some work
- await asyncio.sleep(0.1)
- print(f" [{workspace_id}] Completed operations")
- return workspace_id
- # Run multiple workspaces concurrently
- workspaces = ["concurrent_ws_1", "concurrent_ws_2", "concurrent_ws_3"]
- start = time.time()
- results_list = await asyncio.gather(
- *[workspace_operations(ws) for ws in workspaces]
- )
- elapsed = time.time() - start
- print(f"\n All workspaces completed in {elapsed:.2f}s")
- # Verify all workspaces completed
- assert set(results_list) == set(workspaces), "Not all workspaces completed"
- print("✅ PASSED: Multi-Workspace Concurrency - Execution")
- print(
- f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s"
- )
- # Verify data isolation - each workspace should have its own data
- print("\n Verifying data isolation...")
- for ws in workspaces:
- data = await get_namespace_data("pipeline_status", workspace=ws)
- expected_key = f"{ws}_key"
- expected_value = f"{ws}_value"
- assert (
- expected_key in data
- ), f"Data not properly isolated for {ws}: missing {expected_key}"
- assert (
- data[expected_key] == expected_value
- ), f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})"
- print(f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}")
- print("✅ PASSED: Multi-Workspace Concurrency - Data Isolation")
- print(" All workspaces have properly isolated data")
- # =============================================================================
- # Test 5: NamespaceLock Re-entrance Protection
- # =============================================================================
- @pytest.mark.offline
- async def test_namespace_lock_reentrance():
- """
- Test that NamespaceLock prevents re-entrance in the same coroutine
- and allows concurrent use in different coroutines.
- """
- # Purpose: Ensure NamespaceLock enforces single entry per coroutine while
- # allowing concurrent reuse through ContextVar isolation. Scope: lock
- # re-entrance checks and concurrent gather semantics.
- print("\n" + "=" * 60)
- print("TEST 5: NamespaceLock Re-entrance Protection")
- print("=" * 60)
- # Test 5.1: Same coroutine re-entrance should fail
- print("\nTest 5.1: Same coroutine re-entrance should raise RuntimeError")
- lock = get_namespace_lock("test_reentrance", "test_ws")
- reentrance_failed_correctly = False
- try:
- async with lock:
- print(" Acquired lock first time")
- # Try to acquire the same lock again in the same coroutine
- async with lock:
- print(" ERROR: Should not reach here - re-entrance succeeded!")
- except RuntimeError as e:
- if "already acquired" in str(e).lower():
- print(f" ✓ Re-entrance correctly blocked: {e}")
- reentrance_failed_correctly = True
- else:
- raise
- assert reentrance_failed_correctly, "Re-entrance protection not working"
- print("✅ PASSED: NamespaceLock Re-entrance Protection")
- print(" Re-entrance correctly raises RuntimeError")
- # Test 5.2: Same NamespaceLock instance in different coroutines should succeed
- print("\nTest 5.2: Same NamespaceLock instance in different coroutines")
- shared_lock = get_namespace_lock("test_concurrent", "test_ws")
- concurrent_results = []
- async def use_shared_lock(coroutine_id):
- """Use the same NamespaceLock instance"""
- async with shared_lock:
- concurrent_results.append(f"coroutine_{coroutine_id}_start")
- await asyncio.sleep(0.1)
- concurrent_results.append(f"coroutine_{coroutine_id}_end")
- # This should work because each coroutine gets its own ContextVar
- await asyncio.gather(
- use_shared_lock(1),
- use_shared_lock(2),
- )
- # Both coroutines should have completed
- expected_entries = 4 # 2 starts + 2 ends
- assert (
- len(concurrent_results) == expected_entries
- ), f"Expected {expected_entries} entries, got {len(concurrent_results)}"
- print("✅ PASSED: NamespaceLock Concurrent Reuse")
- print(
- f" Same NamespaceLock instance used successfully in {expected_entries // 2} concurrent coroutines"
- )
- # =============================================================================
- # Test 6: Different Namespace Lock Isolation
- # =============================================================================
- @pytest.mark.offline
- async def test_different_namespace_lock_isolation():
- """
- Test that locks for different namespaces (same workspace) are independent.
- """
- # Purpose: Confirm that namespace isolation is enforced even when workspace
- # is the same. Scope: get_namespace_lock behavior when namespaces differ.
- print("\n" + "=" * 60)
- print("TEST 6: Different Namespace Lock Isolation")
- print("=" * 60)
- print("\nTesting locks with same workspace but different namespaces")
- workload = [
- ("ns_a", "same_ws", "namespace_a"),
- ("ns_b", "same_ws", "namespace_b"),
- ("ns_c", "same_ws", "namespace_c"),
- ]
- max_parallel, timeline, metrics = await _measure_lock_parallelism(workload)
- assert max_parallel >= 2, (
- "Different namespaces within the same workspace should run concurrently; "
- f"observed max concurrency {max_parallel} with timeline {timeline}"
- )
- print("✅ PASSED: Different Namespace Lock Isolation")
- print(
- f" Different namespace locks ran in parallel (max concurrency={max_parallel})"
- )
- print(
- f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces"
- )
- # =============================================================================
- # Test 7: Error Handling
- # =============================================================================
- @pytest.mark.offline
- async def test_error_handling():
- """
- Test error handling for invalid workspace configurations.
- """
- # Purpose: Validate guardrails for workspace normalization and namespace
- # derivation. Scope: set_default_workspace conversions and get_final_namespace
- # failure paths when configuration is invalid.
- print("\n" + "=" * 60)
- print("TEST 7: Error Handling")
- print("=" * 60)
- # Test 7.0: Missing default workspace should raise ValueError
- print("\nTest 7.0: Missing workspace raises ValueError")
- with pytest.raises(ValueError):
- get_final_namespace("test_namespace", workspace=None)
- # Test 7.1: set_default_workspace(None) converts to empty string
- print("\nTest 7.1: set_default_workspace(None) converts to empty string")
- set_default_workspace(None)
- default_ws = get_default_workspace()
- # Should convert None to "" automatically
- assert default_ws == "", f"Expected empty string, got: '{default_ws}'"
- print("✅ PASSED: Error Handling - None to Empty String")
- print(
- f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'"
- )
- # Test 7.2: Empty string workspace behavior
- print("\nTest 7.2: Empty string workspace creates valid namespace")
- # With empty workspace, should create namespace without colon
- final_ns = get_final_namespace("test_namespace", workspace="")
- assert final_ns == "test_namespace", f"Unexpected namespace: '{final_ns}'"
- print("✅ PASSED: Error Handling - Empty Workspace Namespace")
- print(f" Empty workspace creates valid namespace: '{final_ns}'")
- # Restore default workspace for other tests
- set_default_workspace("")
- # =============================================================================
- # Test 8: Update Flags Workspace Isolation
- # =============================================================================
- @pytest.mark.offline
- async def test_update_flags_workspace_isolation():
- """
- Test that update flags are properly isolated between workspaces.
- """
- # Purpose: Confirm update flag setters/readers respect workspace scoping.
- # Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status,
- # and get_update_flag interactions across namespaces.
- print("\n" + "=" * 60)
- print("TEST 8: Update Flags Workspace Isolation")
- print("=" * 60)
- initialize_share_data()
- workspace1 = "update_flags_ws1"
- workspace2 = "update_flags_ws2"
- test_namespace = "test_update_flags_ns"
- # Initialize namespaces for both workspaces
- await initialize_pipeline_status(workspace1)
- await initialize_pipeline_status(workspace2)
- # Test 8.1: set_all_update_flags isolation
- print("\nTest 8.1: set_all_update_flags workspace isolation")
- # Create flags for both workspaces (simulating workers)
- flag1_obj = await get_update_flag(test_namespace, workspace=workspace1)
- flag2_obj = await get_update_flag(test_namespace, workspace=workspace2)
- # Initial state should be False
- assert flag1_obj.value is False, "Flag1 initial value should be False"
- assert flag2_obj.value is False, "Flag2 initial value should be False"
- # Set all flags for workspace1
- await set_all_update_flags(test_namespace, workspace=workspace1)
- # Check that only workspace1's flags are set
- assert (
- flag1_obj.value is True
- ), f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}"
- assert (
- flag2_obj.value is False
- ), f"Flag2 should still be False, got {flag2_obj.value}"
- print("✅ PASSED: Update Flags - set_all_update_flags Isolation")
- print(
- f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}"
- )
- # Test 8.2: clear_all_update_flags isolation
- print("\nTest 8.2: clear_all_update_flags workspace isolation")
- # Set flags for both workspaces
- await set_all_update_flags(test_namespace, workspace=workspace1)
- await set_all_update_flags(test_namespace, workspace=workspace2)
- # Verify both are set
- assert flag1_obj.value is True, "Flag1 should be True"
- assert flag2_obj.value is True, "Flag2 should be True"
- # Clear only workspace1
- await clear_all_update_flags(test_namespace, workspace=workspace1)
- # Check that only workspace1's flags are cleared
- assert (
- flag1_obj.value is False
- ), f"Flag1 should be False after clear, got {flag1_obj.value}"
- assert flag2_obj.value is True, f"Flag2 should still be True, got {flag2_obj.value}"
- print("✅ PASSED: Update Flags - clear_all_update_flags Isolation")
- print(
- f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}"
- )
- # Test 8.3: get_all_update_flags_status workspace filtering
- print("\nTest 8.3: get_all_update_flags_status workspace filtering")
- # Initialize more namespaces for testing
- await get_update_flag("ns_a", workspace=workspace1)
- await get_update_flag("ns_b", workspace=workspace1)
- await get_update_flag("ns_c", workspace=workspace2)
- # Set flags for workspace1
- await set_all_update_flags("ns_a", workspace=workspace1)
- await set_all_update_flags("ns_b", workspace=workspace1)
- # Set flags for workspace2
- await set_all_update_flags("ns_c", workspace=workspace2)
- # Get status for workspace1 only
- status1 = await get_all_update_flags_status(workspace=workspace1)
- # Check that workspace1's namespaces are present
- # The keys should include workspace1's namespaces but not workspace2's
- workspace1_keys = [k for k in status1.keys() if workspace1 in k]
- workspace2_keys = [k for k in status1.keys() if workspace2 in k]
- assert (
- len(workspace1_keys) > 0
- ), f"workspace1 keys should be present, got {len(workspace1_keys)}"
- assert (
- len(workspace2_keys) == 0
- ), f"workspace2 keys should not be present, got {len(workspace2_keys)}"
- for key, values in status1.items():
- assert all(values), f"All flags in {key} should be True, got {values}"
- # Workspace2 query should only surface workspace2 namespaces
- status2 = await get_all_update_flags_status(workspace=workspace2)
- expected_ws2_keys = {
- f"{workspace2}:{test_namespace}",
- f"{workspace2}:ns_c",
- }
- assert (
- set(status2.keys()) == expected_ws2_keys
- ), f"Unexpected namespaces for workspace2: {status2.keys()}"
- for key, values in status2.items():
- assert all(values), f"All flags in {key} should be True, got {values}"
- print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
- print(
- f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}"
- )
- # =============================================================================
- # Test 9: Empty Workspace Standardization
- # =============================================================================
- @pytest.mark.offline
- async def test_empty_workspace_standardization():
- """
- Test that empty workspace is properly standardized to "" instead of "_".
- """
- # Purpose: Verify namespace formatting when workspace is an empty string.
- # Scope: get_final_namespace output and initialize_pipeline_status behavior
- # between empty and non-empty workspaces.
- print("\n" + "=" * 60)
- print("TEST 9: Empty Workspace Standardization")
- print("=" * 60)
- # Test 9.1: Empty string workspace creates namespace without colon
- print("\nTest 9.1: Empty string workspace namespace format")
- set_default_workspace("")
- final_ns = get_final_namespace("test_namespace", workspace=None)
- # Should be just "test_namespace" without colon prefix
- assert (
- final_ns == "test_namespace"
- ), f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')"
- print("✅ PASSED: Empty Workspace Standardization - Format")
- print(f" Empty workspace creates correct namespace: '{final_ns}'")
- # Test 9.2: Empty workspace vs non-empty workspace behavior
- print("\nTest 9.2: Empty vs non-empty workspace behavior")
- initialize_share_data()
- # Initialize with empty workspace
- await initialize_pipeline_status(workspace="")
- data_empty = await get_namespace_data("pipeline_status", workspace="")
- # Initialize with non-empty workspace
- await initialize_pipeline_status(workspace="test_ws")
- data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws")
- # They should be different objects
- assert (
- data_empty is not data_nonempty
- ), "Empty and non-empty workspaces share data (should be independent)"
- print("✅ PASSED: Empty Workspace Standardization - Behavior")
- print(" Empty and non-empty workspaces have independent data")
- # =============================================================================
- # Test 10: JsonKVStorage Workspace Isolation (Integration Test)
- # =============================================================================
- @pytest.mark.offline
- async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
- """
- Integration test: Verify JsonKVStorage properly isolates data between workspaces.
- Creates two JsonKVStorage instances with different workspaces, writes different data,
- and verifies they don't mix.
- """
- # Purpose: Ensure JsonKVStorage respects workspace-specific directories and data.
- # Scope: storage initialization, upsert/get_by_id operations, and filesystem layout
- # inside the temporary working directory.
- print("\n" + "=" * 60)
- print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
- print("=" * 60)
- # Create temporary test directory under project temp/
- test_dir = str(
- Path(__file__).parent.parent.parent
- / "temp/test_json_kv_storage_workspace_isolation"
- )
- if os.path.exists(test_dir):
- shutil.rmtree(test_dir)
- os.makedirs(test_dir, exist_ok=True)
- print(f"\n Using test directory: {test_dir}")
- try:
- initialize_share_data()
- # Mock embedding function
- async def mock_embedding_func(texts: list[str]) -> np.ndarray:
- return np.random.rand(len(texts), 384) # 384-dimensional vectors
- # Global config
- global_config = {
- "working_dir": test_dir,
- "embedding_batch_num": 10,
- }
- # Test 10.1: Create two JsonKVStorage instances with different workspaces
- print(
- "\nTest 10.1: Create two JsonKVStorage instances with different workspaces"
- )
- from lightrag.kg.json_kv_impl import JsonKVStorage
- storage1 = JsonKVStorage(
- namespace="entities",
- workspace="workspace1",
- global_config=global_config,
- embedding_func=mock_embedding_func,
- )
- storage2 = JsonKVStorage(
- namespace="entities",
- workspace="workspace2",
- global_config=global_config,
- embedding_func=mock_embedding_func,
- )
- # Initialize both storages
- await storage1.initialize()
- await storage2.initialize()
- print(" Storage1 created: workspace=workspace1, namespace=entities")
- print(" Storage2 created: workspace=workspace2, namespace=entities")
- # Test 10.2: Write different data to each storage
- print("\nTest 10.2: Write different data to each storage")
- # Write to storage1 (upsert expects dict[str, dict])
- await storage1.upsert(
- {
- "entity1": {
- "content": "Data from workspace1 - AI Research",
- "type": "entity",
- },
- "entity2": {
- "content": "Data from workspace1 - Machine Learning",
- "type": "entity",
- },
- }
- )
- print(" Written to storage1: entity1, entity2")
- # Persist data to disk
- await storage1.index_done_callback()
- print(" Persisted storage1 data to disk")
- # Write to storage2
- await storage2.upsert(
- {
- "entity1": {
- "content": "Data from workspace2 - Deep Learning",
- "type": "entity",
- },
- "entity2": {
- "content": "Data from workspace2 - Neural Networks",
- "type": "entity",
- },
- }
- )
- print(" Written to storage2: entity1, entity2")
- # Persist data to disk
- await storage2.index_done_callback()
- print(" Persisted storage2 data to disk")
- # Test 10.3: Read data from each storage and verify isolation
- print("\nTest 10.3: Read data and verify isolation")
- # Read from storage1
- result1_entity1 = await storage1.get_by_id("entity1")
- result1_entity2 = await storage1.get_by_id("entity2")
- # Read from storage2
- result2_entity1 = await storage2.get_by_id("entity1")
- result2_entity2 = await storage2.get_by_id("entity2")
- print(f" Storage1 entity1: {result1_entity1}")
- print(f" Storage1 entity2: {result1_entity2}")
- print(f" Storage2 entity1: {result2_entity1}")
- print(f" Storage2 entity2: {result2_entity2}")
- # Verify isolation (get_by_id returns dict)
- assert result1_entity1 is not None, "Storage1 entity1 should not be None"
- assert result1_entity2 is not None, "Storage1 entity2 should not be None"
- assert result2_entity1 is not None, "Storage2 entity1 should not be None"
- assert result2_entity2 is not None, "Storage2 entity2 should not be None"
- assert (
- result1_entity1.get("content") == "Data from workspace1 - AI Research"
- ), "Storage1 entity1 content mismatch"
- assert (
- result1_entity2.get("content") == "Data from workspace1 - Machine Learning"
- ), "Storage1 entity2 content mismatch"
- assert (
- result2_entity1.get("content") == "Data from workspace2 - Deep Learning"
- ), "Storage2 entity1 content mismatch"
- assert (
- result2_entity2.get("content") == "Data from workspace2 - Neural Networks"
- ), "Storage2 entity2 content mismatch"
- assert result1_entity1.get("content") != result2_entity1.get(
- "content"
- ), "Storage1 and Storage2 entity1 should have different content"
- assert result1_entity2.get("content") != result2_entity2.get(
- "content"
- ), "Storage1 and Storage2 entity2 should have different content"
- print("✅ PASSED: JsonKVStorage - Data Isolation")
- print(
- " Two storage instances correctly isolated: ws1 and ws2 have different data"
- )
- # Test 10.4: Verify file structure
- print("\nTest 10.4: Verify file structure")
- ws1_dir = Path(test_dir) / "workspace1"
- ws2_dir = Path(test_dir) / "workspace2"
- ws1_exists = ws1_dir.exists()
- ws2_exists = ws2_dir.exists()
- print(f" workspace1 directory exists: {ws1_exists}")
- print(f" workspace2 directory exists: {ws2_exists}")
- assert ws1_exists, "workspace1 directory should exist"
- assert ws2_exists, "workspace2 directory should exist"
- print("✅ PASSED: JsonKVStorage - File Structure")
- print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
- finally:
- # Cleanup test directory (unless keep_test_artifacts is set)
- if os.path.exists(test_dir) and not keep_test_artifacts:
- shutil.rmtree(test_dir)
- print(f"\n Cleaned up test directory: {test_dir}")
- elif keep_test_artifacts:
- print(f"\n Kept test directory for inspection: {test_dir}")
- # =============================================================================
- # Test 11: LightRAG End-to-End Integration Test
- # =============================================================================
- @pytest.mark.offline
- async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts):
- """
- End-to-end test: Create two LightRAG instances with different workspaces,
- insert different data, and verify file separation.
- Uses mock LLM and embedding functions to avoid external API calls.
- """
- # Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace.
- # Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem
- # verification for generated storage files.
- print("\n" + "=" * 60)
- print("TEST 11: LightRAG End-to-End Workspace Isolation")
- print("=" * 60)
- # Create temporary test directory under project temp/
- test_dir = str(
- Path(__file__).parent.parent.parent
- / "temp/test_lightrag_end_to_end_workspace_isolation"
- )
- if os.path.exists(test_dir):
- shutil.rmtree(test_dir)
- os.makedirs(test_dir, exist_ok=True)
- print(f"\n Using test directory: {test_dir}")
- try:
- # Factory function to create different mock LLM functions for each workspace
- def create_mock_llm_func(workspace_name):
- """Create a mock LLM function that returns different content based on workspace"""
- async def mock_llm_func(
- prompt, system_prompt=None, history_messages=[], **kwargs
- ) -> str:
- # Add coroutine switching to simulate async I/O and allow concurrent execution
- await asyncio.sleep(0)
- # Return different responses based on workspace
- # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description
- # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description
- if workspace_name == "project_a":
- return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines.
- entity<|#|>Machine Learning<|#|>concept<|#|>Machine Learning is a subset of AI that enables systems to learn from data.
- relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related field<|#|>Machine Learning is a key component and subset of Artificial Intelligence.
- <|COMPLETE|>"""
- else: # project_b
- return """entity<|#|>Deep Learning<|#|>concept<|#|>Deep Learning is a subset of machine learning using neural networks with multiple layers.
- entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks.
- relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations.
- <|COMPLETE|>"""
- return mock_llm_func
- # Mock embedding function
- async def mock_embedding_func(texts: list[str]) -> np.ndarray:
- # Add coroutine switching to simulate async I/O and allow concurrent execution
- await asyncio.sleep(0)
- return np.random.rand(len(texts), 384) # 384-dimensional vectors
- # Test 11.1: Create two LightRAG instances with different workspaces
- print("\nTest 11.1: Create two LightRAG instances with different workspaces")
- from lightrag import LightRAG
- from lightrag.utils import EmbeddingFunc, Tokenizer
- # Create different mock LLM functions for each workspace
- mock_llm_func_a = create_mock_llm_func("project_a")
- mock_llm_func_b = create_mock_llm_func("project_b")
- class _SimpleTokenizerImpl:
- def encode(self, content: str) -> list[int]:
- return [ord(ch) for ch in content]
- def decode(self, tokens: list[int]) -> str:
- return "".join(chr(t) for t in tokens)
- tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl())
- rag1 = LightRAG(
- working_dir=test_dir,
- workspace="project_a",
- llm_model_func=mock_llm_func_a,
- embedding_func=EmbeddingFunc(
- embedding_dim=384,
- max_token_size=8192,
- func=mock_embedding_func,
- ),
- tokenizer=tokenizer,
- )
- rag2 = LightRAG(
- working_dir=test_dir,
- workspace="project_b",
- llm_model_func=mock_llm_func_b,
- embedding_func=EmbeddingFunc(
- embedding_dim=384,
- max_token_size=8192,
- func=mock_embedding_func,
- ),
- tokenizer=tokenizer,
- )
- # Initialize storages
- await rag1.initialize_storages()
- await rag2.initialize_storages()
- print(" RAG1 created: workspace=project_a")
- print(" RAG2 created: workspace=project_b")
- # Test 11.2: Insert different data to each RAG instance (CONCURRENTLY)
- print("\nTest 11.2: Insert different data to each RAG instance (concurrently)")
- text_for_project_a = "This document is about Artificial Intelligence and Machine Learning. AI is transforming the world."
- text_for_project_b = "This document is about Deep Learning and Neural Networks. Deep learning uses multiple layers."
- # Insert to both projects concurrently to test workspace isolation under concurrent load
- print(" Starting concurrent insert operations...")
- start_time = time.time()
- await asyncio.gather(
- rag1.ainsert(text_for_project_a), rag2.ainsert(text_for_project_b)
- )
- elapsed_time = time.time() - start_time
- print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)")
- print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)")
- print(f" Total concurrent execution time: {elapsed_time:.3f}s")
- # Test 11.3: Verify file structure
- print("\nTest 11.3: Verify workspace directory structure")
- project_a_dir = Path(test_dir) / "project_a"
- project_b_dir = Path(test_dir) / "project_b"
- project_a_exists = project_a_dir.exists()
- project_b_exists = project_b_dir.exists()
- print(f" project_a directory: {project_a_dir}")
- print(f" project_a exists: {project_a_exists}")
- print(f" project_b directory: {project_b_dir}")
- print(f" project_b exists: {project_b_exists}")
- assert project_a_exists, "project_a directory should exist"
- assert project_b_exists, "project_b directory should exist"
- # List files in each directory
- print("\n Files in project_a/:")
- for file in sorted(project_a_dir.glob("*")):
- if file.is_file():
- size = file.stat().st_size
- print(f" - {file.name} ({size} bytes)")
- print("\n Files in project_b/:")
- for file in sorted(project_b_dir.glob("*")):
- if file.is_file():
- size = file.stat().st_size
- print(f" - {file.name} ({size} bytes)")
- print("✅ PASSED: LightRAG E2E - File Structure")
- print(" Workspace directories correctly created and separated")
- # Test 11.4: Verify data isolation by checking file contents
- print("\nTest 11.4: Verify data isolation (check file contents)")
- # Check if full_docs storage files exist and contain different content
- docs_a_file = project_a_dir / "kv_store_full_docs.json"
- docs_b_file = project_b_dir / "kv_store_full_docs.json"
- if docs_a_file.exists() and docs_b_file.exists():
- import json
- with open(docs_a_file, "r") as f:
- docs_a_content = json.load(f)
- with open(docs_b_file, "r") as f:
- docs_b_content = json.load(f)
- print(f" project_a doc count: {len(docs_a_content)}")
- print(f" project_b doc count: {len(docs_b_content)}")
- # Verify they contain different data
- assert (
- docs_a_content != docs_b_content
- ), "Document storage not properly isolated"
- # Verify each workspace contains its own text content
- docs_a_str = json.dumps(docs_a_content)
- docs_b_str = json.dumps(docs_b_content)
- # Check project_a contains its text and NOT project_b's text
- assert (
- "Artificial Intelligence" in docs_a_str
- ), "project_a should contain 'Artificial Intelligence'"
- assert (
- "Machine Learning" in docs_a_str
- ), "project_a should contain 'Machine Learning'"
- assert (
- "Deep Learning" not in docs_a_str
- ), "project_a should NOT contain 'Deep Learning' from project_b"
- assert (
- "Neural Networks" not in docs_a_str
- ), "project_a should NOT contain 'Neural Networks' from project_b"
- # Check project_b contains its text and NOT project_a's text
- assert (
- "Deep Learning" in docs_b_str
- ), "project_b should contain 'Deep Learning'"
- assert (
- "Neural Networks" in docs_b_str
- ), "project_b should contain 'Neural Networks'"
- assert (
- "Artificial Intelligence" not in docs_b_str
- ), "project_b should NOT contain 'Artificial Intelligence' from project_a"
- # Note: "Machine Learning" might appear in project_b's text, so we skip that check
- print("✅ PASSED: LightRAG E2E - Data Isolation")
- print(" Document storage correctly isolated between workspaces")
- print(" project_a contains only its own data")
- print(" project_b contains only its own data")
- else:
- print(" Document storage files not found (may not be created yet)")
- print("✅ PASSED: LightRAG E2E - Data Isolation")
- print(" Skipped file content check (files not created)")
- print("\n ✓ Test complete - workspace isolation verified at E2E level")
- finally:
- # Cleanup test directory (unless keep_test_artifacts is set)
- if os.path.exists(test_dir) and not keep_test_artifacts:
- shutil.rmtree(test_dir)
- print(f"\n Cleaned up test directory: {test_dir}")
- elif keep_test_artifacts:
- print(f"\n Kept test directory for inspection: {test_dir}")
|