test_neo4j_fulltext_index.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. #!/usr/bin/env python
  2. """
  3. Test Neo4j full-text index functionality, specifically:
  4. 1. Workspace-specific index naming
  5. 2. Legacy index migration
  6. 3. search_labels functionality with workspace-specific indexes
  7. """
  8. import asyncio
  9. import os
  10. import sys
  11. import pytest
  12. import numpy as np
  13. # Add the project root directory to the Python path
  14. sys.path.append(
  15. os.path.dirname(
  16. os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  17. )
  18. )
  19. from lightrag.kg.shared_storage import initialize_share_data
  20. # Mock embedding function that returns random vectors
  21. async def mock_embedding_func(texts):
  22. return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors
  23. @pytest.fixture
  24. async def neo4j_storage():
  25. """
  26. Initialize Neo4j storage for testing.
  27. Requires Neo4j to be running and configured via environment variables.
  28. """
  29. # Check if Neo4j is configured
  30. if not os.getenv("NEO4J_URI"):
  31. pytest.skip("Neo4j not configured (NEO4J_URI not set)")
  32. from lightrag.kg.neo4j_impl import Neo4JStorage
  33. # Initialize shared_storage for locks
  34. initialize_share_data()
  35. global_config = {
  36. "embedding_batch_num": 10,
  37. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
  38. "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
  39. }
  40. storage = Neo4JStorage(
  41. namespace="test_fulltext_index",
  42. workspace="test_workspace",
  43. global_config=global_config,
  44. embedding_func=mock_embedding_func,
  45. )
  46. # Initialize the connection
  47. await storage.initialize()
  48. # Clean up any existing data
  49. await storage.drop()
  50. yield storage
  51. # Cleanup
  52. await storage.drop()
  53. await storage.finalize()
  54. @pytest.mark.integration
  55. @pytest.mark.requires_db
  56. async def test_fulltext_index_creation(neo4j_storage):
  57. """
  58. Test that the full-text index is created with the workspace-specific name.
  59. """
  60. storage = neo4j_storage
  61. workspace_label = storage._get_workspace_label()
  62. expected_index_name = f"entity_id_fulltext_idx_{workspace_label}"
  63. # Query Neo4j to check if the index exists
  64. async with storage._driver.session(database=storage._DATABASE) as session:
  65. result = await session.run("SHOW FULLTEXT INDEXES")
  66. indexes = await result.data()
  67. await result.consume()
  68. # Check if the workspace-specific index exists
  69. index_names = [idx["name"] for idx in indexes]
  70. assert (
  71. expected_index_name in index_names
  72. ), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}"
  73. # Check if the legacy index doesn't exist (should be migrated if it was there)
  74. legacy_index_name = "entity_id_fulltext_idx"
  75. if legacy_index_name in index_names:
  76. # If legacy index exists, it should be for a different workspace
  77. # or it means migration didn't happen
  78. print(
  79. f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'"
  80. )
  81. @pytest.mark.integration
  82. @pytest.mark.requires_db
  83. async def test_search_labels_with_workspace_index(neo4j_storage):
  84. """
  85. Test that search_labels uses the workspace-specific index and returns results.
  86. """
  87. storage = neo4j_storage
  88. # Insert test nodes
  89. test_nodes = [
  90. {
  91. "entity_id": "Artificial Intelligence",
  92. "description": "AI field",
  93. "keywords": "AI,ML,DL",
  94. "entity_type": "Technology",
  95. },
  96. {
  97. "entity_id": "Machine Learning",
  98. "description": "ML subfield",
  99. "keywords": "supervised,unsupervised",
  100. "entity_type": "Technology",
  101. },
  102. {
  103. "entity_id": "Deep Learning",
  104. "description": "DL subfield",
  105. "keywords": "neural networks",
  106. "entity_type": "Technology",
  107. },
  108. {
  109. "entity_id": "Natural Language Processing",
  110. "description": "NLP field",
  111. "keywords": "text,language",
  112. "entity_type": "Technology",
  113. },
  114. ]
  115. for node_data in test_nodes:
  116. await storage.upsert_node(node_data["entity_id"], node_data)
  117. # Give the index time to become consistent (eventually consistent index)
  118. await asyncio.sleep(2)
  119. # Test search_labels
  120. results = await storage.search_labels("Learning", limit=10)
  121. # Should find nodes with "Learning" in them
  122. assert len(results) > 0, "search_labels should return results for 'Learning'"
  123. assert any(
  124. "Learning" in result for result in results
  125. ), "Results should contain 'Learning'"
  126. # Test case-insensitive search
  127. results_lower = await storage.search_labels("learning", limit=10)
  128. assert len(results_lower) > 0, "search_labels should be case-insensitive"
  129. # Test partial match
  130. results_partial = await storage.search_labels("Intelli", limit=10)
  131. assert (
  132. len(results_partial) > 0
  133. ), "search_labels should support partial matching with wildcard"
  134. assert any(
  135. "Intelligence" in result for result in results_partial
  136. ), "Should find 'Artificial Intelligence'"
  137. @pytest.mark.integration
  138. @pytest.mark.requires_db
  139. async def test_search_labels_chinese_text(neo4j_storage):
  140. """
  141. Test that search_labels works with Chinese text using the CJK analyzer.
  142. """
  143. storage = neo4j_storage
  144. # Insert Chinese test nodes
  145. chinese_nodes = [
  146. {
  147. "entity_id": "人工智能",
  148. "description": "人工智能领域",
  149. "keywords": "AI,机器学习",
  150. "entity_type": "技术",
  151. },
  152. {
  153. "entity_id": "机器学习",
  154. "description": "机器学习子领域",
  155. "keywords": "监督学习,无监督学习",
  156. "entity_type": "技术",
  157. },
  158. {
  159. "entity_id": "深度学习",
  160. "description": "深度学习子领域",
  161. "keywords": "神经网络",
  162. "entity_type": "技术",
  163. },
  164. ]
  165. for node_data in chinese_nodes:
  166. await storage.upsert_node(node_data["entity_id"], node_data)
  167. # Give the index time to become consistent
  168. await asyncio.sleep(2)
  169. # Test Chinese text search
  170. results = await storage.search_labels("学习", limit=10)
  171. # Should find nodes with "学习" in them
  172. assert len(results) > 0, "search_labels should return results for Chinese text"
  173. assert any(
  174. "学习" in result for result in results
  175. ), "Results should contain Chinese characters '学习'"
  176. @pytest.mark.integration
  177. @pytest.mark.requires_db
  178. async def test_search_labels_fallback_to_contains(neo4j_storage):
  179. """
  180. Test that search_labels falls back to CONTAINS search if the index fails.
  181. This can happen with older Neo4j versions or if the index is not yet available.
  182. """
  183. storage = neo4j_storage
  184. # Insert test nodes
  185. test_nodes = [
  186. {
  187. "entity_id": "Test Node Alpha",
  188. "description": "Test node",
  189. "keywords": "test",
  190. "entity_type": "Test",
  191. },
  192. {
  193. "entity_id": "Test Node Beta",
  194. "description": "Test node",
  195. "keywords": "test",
  196. "entity_type": "Test",
  197. },
  198. ]
  199. for node_data in test_nodes:
  200. await storage.upsert_node(node_data["entity_id"], node_data)
  201. # Even if the full-text index is not available, CONTAINS should work
  202. results = await storage.search_labels("Alpha", limit=10)
  203. # Should find the node using fallback CONTAINS search
  204. assert len(results) > 0, "Fallback CONTAINS search should return results"
  205. assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'"
  206. @pytest.mark.integration
  207. @pytest.mark.requires_db
  208. async def test_multiple_workspaces_have_separate_indexes(neo4j_storage):
  209. """
  210. Test that different workspaces have their own separate indexes.
  211. """
  212. from lightrag.kg.neo4j_impl import Neo4JStorage
  213. # Create storage for workspace 1
  214. storage1 = neo4j_storage
  215. # Create storage for workspace 2
  216. global_config = {
  217. "embedding_batch_num": 10,
  218. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
  219. "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
  220. }
  221. storage2 = Neo4JStorage(
  222. namespace="test_fulltext_index",
  223. workspace="test_workspace_2",
  224. global_config=global_config,
  225. embedding_func=mock_embedding_func,
  226. )
  227. await storage2.initialize()
  228. await storage2.drop()
  229. try:
  230. # Check that both workspaces have their own indexes
  231. async with storage1._driver.session(database=storage1._DATABASE) as session:
  232. result = await session.run("SHOW FULLTEXT INDEXES")
  233. indexes = await result.data()
  234. await result.consume()
  235. index_names = [idx["name"] for idx in indexes]
  236. workspace1_index = (
  237. f"entity_id_fulltext_idx_{storage1._get_workspace_label()}"
  238. )
  239. workspace2_index = (
  240. f"entity_id_fulltext_idx_{storage2._get_workspace_label()}"
  241. )
  242. assert (
  243. workspace1_index in index_names
  244. ), f"Workspace 1 index '{workspace1_index}' should exist"
  245. assert (
  246. workspace2_index in index_names
  247. ), f"Workspace 2 index '{workspace2_index}' should exist"
  248. finally:
  249. # Clean up: drop the fulltext index created for workspace 2 to prevent accumulation
  250. try:
  251. async with storage2._driver.session(database=storage2._DATABASE) as session:
  252. index_name = storage2._get_fulltext_index_name(
  253. storage2._get_workspace_label()
  254. )
  255. drop_query = f"DROP INDEX {index_name} IF EXISTS"
  256. result = await session.run(drop_query)
  257. await result.consume()
  258. except Exception:
  259. pass # Ignore errors during cleanup
  260. await storage2.drop()
  261. await storage2.finalize()
  262. if __name__ == "__main__":
  263. # Run tests with pytest
  264. pytest.main([__file__, "-v", "--run-integration"])