| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- #!/usr/bin/env python
- """
- Test Neo4j full-text index functionality, specifically:
- 1. Workspace-specific index naming
- 2. Legacy index migration
- 3. search_labels functionality with workspace-specific indexes
- """
- import asyncio
- import os
- import sys
- import pytest
- import numpy as np
- # Add the project root directory to the Python path
- sys.path.append(
- os.path.dirname(
- os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- )
- )
- from lightrag.kg.shared_storage import initialize_share_data
- # Mock embedding function that returns random vectors
- async def mock_embedding_func(texts):
- return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors
- @pytest.fixture
- async def neo4j_storage():
- """
- Initialize Neo4j storage for testing.
- Requires Neo4j to be running and configured via environment variables.
- """
- # Check if Neo4j is configured
- if not os.getenv("NEO4J_URI"):
- pytest.skip("Neo4j not configured (NEO4J_URI not set)")
- from lightrag.kg.neo4j_impl import Neo4JStorage
- # Initialize shared_storage for locks
- initialize_share_data()
- global_config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
- "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
- }
- storage = Neo4JStorage(
- namespace="test_fulltext_index",
- workspace="test_workspace",
- global_config=global_config,
- embedding_func=mock_embedding_func,
- )
- # Initialize the connection
- await storage.initialize()
- # Clean up any existing data
- await storage.drop()
- yield storage
- # Cleanup
- await storage.drop()
- await storage.finalize()
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_fulltext_index_creation(neo4j_storage):
- """
- Test that the full-text index is created with the workspace-specific name.
- """
- storage = neo4j_storage
- workspace_label = storage._get_workspace_label()
- expected_index_name = f"entity_id_fulltext_idx_{workspace_label}"
- # Query Neo4j to check if the index exists
- async with storage._driver.session(database=storage._DATABASE) as session:
- result = await session.run("SHOW FULLTEXT INDEXES")
- indexes = await result.data()
- await result.consume()
- # Check if the workspace-specific index exists
- index_names = [idx["name"] for idx in indexes]
- assert (
- expected_index_name in index_names
- ), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}"
- # Check if the legacy index doesn't exist (should be migrated if it was there)
- legacy_index_name = "entity_id_fulltext_idx"
- if legacy_index_name in index_names:
- # If legacy index exists, it should be for a different workspace
- # or it means migration didn't happen
- print(
- f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'"
- )
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_search_labels_with_workspace_index(neo4j_storage):
- """
- Test that search_labels uses the workspace-specific index and returns results.
- """
- storage = neo4j_storage
- # Insert test nodes
- test_nodes = [
- {
- "entity_id": "Artificial Intelligence",
- "description": "AI field",
- "keywords": "AI,ML,DL",
- "entity_type": "Technology",
- },
- {
- "entity_id": "Machine Learning",
- "description": "ML subfield",
- "keywords": "supervised,unsupervised",
- "entity_type": "Technology",
- },
- {
- "entity_id": "Deep Learning",
- "description": "DL subfield",
- "keywords": "neural networks",
- "entity_type": "Technology",
- },
- {
- "entity_id": "Natural Language Processing",
- "description": "NLP field",
- "keywords": "text,language",
- "entity_type": "Technology",
- },
- ]
- for node_data in test_nodes:
- await storage.upsert_node(node_data["entity_id"], node_data)
- # Give the index time to become consistent (eventually consistent index)
- await asyncio.sleep(2)
- # Test search_labels
- results = await storage.search_labels("Learning", limit=10)
- # Should find nodes with "Learning" in them
- assert len(results) > 0, "search_labels should return results for 'Learning'"
- assert any(
- "Learning" in result for result in results
- ), "Results should contain 'Learning'"
- # Test case-insensitive search
- results_lower = await storage.search_labels("learning", limit=10)
- assert len(results_lower) > 0, "search_labels should be case-insensitive"
- # Test partial match
- results_partial = await storage.search_labels("Intelli", limit=10)
- assert (
- len(results_partial) > 0
- ), "search_labels should support partial matching with wildcard"
- assert any(
- "Intelligence" in result for result in results_partial
- ), "Should find 'Artificial Intelligence'"
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_search_labels_chinese_text(neo4j_storage):
- """
- Test that search_labels works with Chinese text using the CJK analyzer.
- """
- storage = neo4j_storage
- # Insert Chinese test nodes
- chinese_nodes = [
- {
- "entity_id": "人工智能",
- "description": "人工智能领域",
- "keywords": "AI,机器学习",
- "entity_type": "技术",
- },
- {
- "entity_id": "机器学习",
- "description": "机器学习子领域",
- "keywords": "监督学习,无监督学习",
- "entity_type": "技术",
- },
- {
- "entity_id": "深度学习",
- "description": "深度学习子领域",
- "keywords": "神经网络",
- "entity_type": "技术",
- },
- ]
- for node_data in chinese_nodes:
- await storage.upsert_node(node_data["entity_id"], node_data)
- # Give the index time to become consistent
- await asyncio.sleep(2)
- # Test Chinese text search
- results = await storage.search_labels("学习", limit=10)
- # Should find nodes with "学习" in them
- assert len(results) > 0, "search_labels should return results for Chinese text"
- assert any(
- "学习" in result for result in results
- ), "Results should contain Chinese characters '学习'"
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_search_labels_fallback_to_contains(neo4j_storage):
- """
- Test that search_labels falls back to CONTAINS search if the index fails.
- This can happen with older Neo4j versions or if the index is not yet available.
- """
- storage = neo4j_storage
- # Insert test nodes
- test_nodes = [
- {
- "entity_id": "Test Node Alpha",
- "description": "Test node",
- "keywords": "test",
- "entity_type": "Test",
- },
- {
- "entity_id": "Test Node Beta",
- "description": "Test node",
- "keywords": "test",
- "entity_type": "Test",
- },
- ]
- for node_data in test_nodes:
- await storage.upsert_node(node_data["entity_id"], node_data)
- # Even if the full-text index is not available, CONTAINS should work
- results = await storage.search_labels("Alpha", limit=10)
- # Should find the node using fallback CONTAINS search
- assert len(results) > 0, "Fallback CONTAINS search should return results"
- assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'"
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_multiple_workspaces_have_separate_indexes(neo4j_storage):
- """
- Test that different workspaces have their own separate indexes.
- """
- from lightrag.kg.neo4j_impl import Neo4JStorage
- # Create storage for workspace 1
- storage1 = neo4j_storage
- # Create storage for workspace 2
- global_config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
- "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
- }
- storage2 = Neo4JStorage(
- namespace="test_fulltext_index",
- workspace="test_workspace_2",
- global_config=global_config,
- embedding_func=mock_embedding_func,
- )
- await storage2.initialize()
- await storage2.drop()
- try:
- # Check that both workspaces have their own indexes
- async with storage1._driver.session(database=storage1._DATABASE) as session:
- result = await session.run("SHOW FULLTEXT INDEXES")
- indexes = await result.data()
- await result.consume()
- index_names = [idx["name"] for idx in indexes]
- workspace1_index = (
- f"entity_id_fulltext_idx_{storage1._get_workspace_label()}"
- )
- workspace2_index = (
- f"entity_id_fulltext_idx_{storage2._get_workspace_label()}"
- )
- assert (
- workspace1_index in index_names
- ), f"Workspace 1 index '{workspace1_index}' should exist"
- assert (
- workspace2_index in index_names
- ), f"Workspace 2 index '{workspace2_index}' should exist"
- finally:
- # Clean up: drop the fulltext index created for workspace 2 to prevent accumulation
- try:
- async with storage2._driver.session(database=storage2._DATABASE) as session:
- index_name = storage2._get_fulltext_index_name(
- storage2._get_workspace_label()
- )
- drop_query = f"DROP INDEX {index_name} IF EXISTS"
- result = await session.run(drop_query)
- await result.consume()
- except Exception:
- pass # Ignore errors during cleanup
- await storage2.drop()
- await storage2.finalize()
- if __name__ == "__main__":
- # Run tests with pytest
- pytest.main([__file__, "-v", "--run-integration"])
|