test_qdrant_migration.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. import pytest
  2. from unittest.mock import MagicMock, patch, AsyncMock
  3. import numpy as np
  4. pytest.importorskip(
  5. "qdrant_client",
  6. reason="qdrant-client is required for Qdrant storage tests",
  7. )
  8. from qdrant_client import models # noqa: E402
  9. from lightrag.utils import EmbeddingFunc # noqa: E402
  10. from lightrag.kg.qdrant_impl import QdrantVectorDBStorage # noqa: E402
  11. # Mock QdrantClient
  12. @pytest.fixture
  13. def mock_qdrant_client():
  14. with patch("lightrag.kg.qdrant_impl.QdrantClient") as mock_client_cls:
  15. client = mock_client_cls.return_value
  16. client.collection_exists.return_value = False
  17. client.count.return_value.count = 0
  18. # Mock payload schema and vector config for get_collection
  19. collection_info = MagicMock()
  20. collection_info.payload_schema = {}
  21. # Mock vector dimension to match mock_embedding_func (768d)
  22. collection_info.config.params.vectors.size = 768
  23. client.get_collection.return_value = collection_info
  24. yield client
  25. # Mock get_data_init_lock to avoid async lock issues in tests
  26. @pytest.fixture(autouse=True)
  27. def mock_data_init_lock():
  28. with patch("lightrag.kg.qdrant_impl.get_data_init_lock") as mock_lock:
  29. mock_lock_ctx = AsyncMock()
  30. mock_lock.return_value = mock_lock_ctx
  31. yield mock_lock
  32. # Mock Embedding function
  33. @pytest.fixture
  34. def mock_embedding_func():
  35. async def embed_func(texts, **kwargs):
  36. return np.array([[0.1] * 768 for _ in texts])
  37. func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test-model")
  38. return func
  39. async def test_qdrant_collection_naming(mock_qdrant_client, mock_embedding_func):
  40. """Test if collection name is correctly generated with model suffix"""
  41. config = {
  42. "embedding_batch_num": 10,
  43. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  44. }
  45. storage = QdrantVectorDBStorage(
  46. namespace="chunks",
  47. global_config=config,
  48. embedding_func=mock_embedding_func,
  49. workspace="test_ws",
  50. )
  51. # Verify collection name contains model suffix
  52. expected_suffix = "test_model_768d"
  53. assert expected_suffix in storage.final_namespace
  54. assert storage.final_namespace == f"lightrag_vdb_chunks_{expected_suffix}"
  55. async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func):
  56. """Test if migration logic is triggered correctly"""
  57. config = {
  58. "embedding_batch_num": 10,
  59. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  60. }
  61. storage = QdrantVectorDBStorage(
  62. namespace="chunks",
  63. global_config=config,
  64. embedding_func=mock_embedding_func,
  65. workspace="test_ws",
  66. )
  67. # Legacy collection name (without model suffix)
  68. legacy_collection = "lightrag_vdb_chunks"
  69. # Setup mocks for migration scenario
  70. # 1. New collection does not exist, only legacy exists
  71. mock_qdrant_client.collection_exists.side_effect = lambda name: (
  72. name == legacy_collection
  73. )
  74. # 2. Legacy collection exists and has data
  75. migration_state = {"new_workspace_count": 0}
  76. def count_mock(collection_name, exact=True, count_filter=None):
  77. mock_result = MagicMock()
  78. if collection_name == legacy_collection:
  79. mock_result.count = 100
  80. elif collection_name == storage.final_namespace:
  81. mock_result.count = migration_state["new_workspace_count"]
  82. else:
  83. mock_result.count = 0
  84. return mock_result
  85. mock_qdrant_client.count.side_effect = count_mock
  86. # 3. Mock scroll for data migration
  87. mock_point = MagicMock()
  88. mock_point.id = "old_id"
  89. mock_point.vector = [0.1] * 768
  90. mock_point.payload = {"content": "test"} # No workspace_id in payload
  91. # When payload_schema is empty, the code first samples payloads to detect workspace_id
  92. # Then proceeds with migration batches
  93. # Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
  94. mock_qdrant_client.scroll.side_effect = [
  95. ([mock_point], "_"), # Sampling scroll - no workspace_id found
  96. ([mock_point], "next_offset"), # Migration batch
  97. ([], None), # End of migration
  98. ]
  99. def upsert_mock(*args, **kwargs):
  100. migration_state["new_workspace_count"] = 100
  101. return None
  102. mock_qdrant_client.upsert.side_effect = upsert_mock
  103. # Initialize storage (triggers migration)
  104. await storage.initialize()
  105. # Verify migration steps
  106. # 1. Legacy count checked
  107. mock_qdrant_client.count.assert_any_call(
  108. collection_name=legacy_collection, exact=True
  109. )
  110. # 2. New collection created
  111. mock_qdrant_client.create_collection.assert_called()
  112. # 3. Data scrolled from legacy
  113. # First call (index 0) is sampling scroll with limit=10
  114. # Second call (index 1) is migration batch with limit=500
  115. assert mock_qdrant_client.scroll.call_count >= 2
  116. # Check sampling scroll
  117. sampling_call = mock_qdrant_client.scroll.call_args_list[0]
  118. assert sampling_call.kwargs["collection_name"] == legacy_collection
  119. assert sampling_call.kwargs["limit"] == 10
  120. # Check migration batch scroll
  121. migration_call = mock_qdrant_client.scroll.call_args_list[1]
  122. assert migration_call.kwargs["collection_name"] == legacy_collection
  123. assert migration_call.kwargs["limit"] == 500
  124. # 4. Data upserted to new
  125. mock_qdrant_client.upsert.assert_called()
  126. # 5. Payload index created
  127. mock_qdrant_client.create_payload_index.assert_called()
  128. async def test_qdrant_no_migration_needed(mock_qdrant_client, mock_embedding_func):
  129. """Test scenario where new collection already exists (Case 1 in setup_collection)
  130. When only the new collection exists and no legacy collection is found,
  131. the implementation should:
  132. 1. Create payload index on the new collection (ensure index exists)
  133. 2. NOT attempt any data migration (no scroll calls)
  134. """
  135. config = {
  136. "embedding_batch_num": 10,
  137. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  138. }
  139. storage = QdrantVectorDBStorage(
  140. namespace="chunks",
  141. global_config=config,
  142. embedding_func=mock_embedding_func,
  143. workspace="test_ws",
  144. )
  145. # Only new collection exists (no legacy collection found)
  146. mock_qdrant_client.collection_exists.side_effect = lambda name: (
  147. name == storage.final_namespace
  148. )
  149. # Initialize
  150. await storage.initialize()
  151. # Should create payload index on the new collection (ensure index)
  152. mock_qdrant_client.create_payload_index.assert_called_with(
  153. collection_name=storage.final_namespace,
  154. field_name="workspace_id",
  155. field_schema=models.KeywordIndexParams(
  156. type=models.KeywordIndexType.KEYWORD,
  157. is_tenant=True,
  158. ),
  159. )
  160. # Should NOT migrate (no scroll calls since no legacy collection exists)
  161. mock_qdrant_client.scroll.assert_not_called()
  162. # ============================================================================
  163. # Tests for scenarios described in design document (Lines 606-649)
  164. # ============================================================================
  165. async def test_scenario_1_new_workspace_creation(
  166. mock_qdrant_client, mock_embedding_func
  167. ):
  168. """
  169. 场景1:新建workspace
  170. 预期:直接创建lightrag_vdb_chunks_text_embedding_3_large_3072d
  171. """
  172. # Use a large embedding model
  173. large_model_func = EmbeddingFunc(
  174. embedding_dim=3072,
  175. func=mock_embedding_func.func,
  176. model_name="text-embedding-3-large",
  177. )
  178. config = {
  179. "embedding_batch_num": 10,
  180. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  181. }
  182. storage = QdrantVectorDBStorage(
  183. namespace="chunks",
  184. global_config=config,
  185. embedding_func=large_model_func,
  186. workspace="test_new",
  187. )
  188. # Case 3: Neither legacy nor new collection exists
  189. mock_qdrant_client.collection_exists.return_value = False
  190. # Initialize storage
  191. await storage.initialize()
  192. # Verify: Should create new collection with model suffix
  193. expected_collection = "lightrag_vdb_chunks_text_embedding_3_large_3072d"
  194. assert storage.final_namespace == expected_collection
  195. # Verify create_collection was called with correct name
  196. create_calls = [
  197. call for call in mock_qdrant_client.create_collection.call_args_list
  198. ]
  199. assert len(create_calls) > 0
  200. assert (
  201. create_calls[0][0][0] == expected_collection
  202. or create_calls[0].kwargs.get("collection_name") == expected_collection
  203. )
  204. # Verify no migration was attempted
  205. mock_qdrant_client.scroll.assert_not_called()
  206. print(
  207. f"✅ Scenario 1: New workspace created with collection '{expected_collection}'"
  208. )
  209. async def test_scenario_2_legacy_upgrade_migration(
  210. mock_qdrant_client, mock_embedding_func
  211. ):
  212. """
  213. 场景2:从旧版本升级
  214. 已存在lightrag_vdb_chunks(无后缀)
  215. 预期:自动迁移数据到lightrag_vdb_chunks_text_embedding_ada_002_1536d
  216. 注意:迁移后不再自动删除遗留集合,需要手动删除
  217. """
  218. # Use ada-002 model
  219. ada_func = EmbeddingFunc(
  220. embedding_dim=1536,
  221. func=mock_embedding_func.func,
  222. model_name="text-embedding-ada-002",
  223. )
  224. config = {
  225. "embedding_batch_num": 10,
  226. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  227. }
  228. storage = QdrantVectorDBStorage(
  229. namespace="chunks",
  230. global_config=config,
  231. embedding_func=ada_func,
  232. workspace="test_legacy",
  233. )
  234. # Legacy collection name (without model suffix)
  235. legacy_collection = "lightrag_vdb_chunks"
  236. new_collection = storage.final_namespace
  237. # Case 4: Only legacy collection exists
  238. mock_qdrant_client.collection_exists.side_effect = lambda name: (
  239. name == legacy_collection
  240. )
  241. # Mock legacy collection info with 1536d vectors
  242. legacy_collection_info = MagicMock()
  243. legacy_collection_info.payload_schema = {}
  244. legacy_collection_info.config.params.vectors.size = 1536
  245. mock_qdrant_client.get_collection.return_value = legacy_collection_info
  246. migration_state = {"new_workspace_count": 0}
  247. def count_mock(collection_name, exact=True, count_filter=None):
  248. mock_result = MagicMock()
  249. if collection_name == legacy_collection:
  250. mock_result.count = 150
  251. elif collection_name == new_collection:
  252. mock_result.count = migration_state["new_workspace_count"]
  253. else:
  254. mock_result.count = 0
  255. return mock_result
  256. mock_qdrant_client.count.side_effect = count_mock
  257. # Mock scroll results (simulate migration in batches)
  258. mock_points = []
  259. for i in range(10):
  260. point = MagicMock()
  261. point.id = f"legacy-{i}"
  262. point.vector = [0.1] * 1536
  263. # No workspace_id in payload - simulates legacy data
  264. point.payload = {"content": f"Legacy document {i}", "id": f"doc-{i}"}
  265. mock_points.append(point)
  266. # When payload_schema is empty, the code first samples payloads to detect workspace_id
  267. # Then proceeds with migration batches
  268. # Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
  269. mock_qdrant_client.scroll.side_effect = [
  270. (mock_points, "_"), # Sampling scroll - no workspace_id found in payloads
  271. (mock_points, "offset1"), # Migration batch
  272. ([], None), # End of migration
  273. ]
  274. def upsert_mock(*args, **kwargs):
  275. migration_state["new_workspace_count"] = 150
  276. return None
  277. mock_qdrant_client.upsert.side_effect = upsert_mock
  278. # Initialize (triggers migration)
  279. await storage.initialize()
  280. # Verify: New collection should be created
  281. expected_new_collection = "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
  282. assert storage.final_namespace == expected_new_collection
  283. # Verify migration steps
  284. # 1. Check legacy count
  285. mock_qdrant_client.count.assert_any_call(
  286. collection_name=legacy_collection, exact=True
  287. )
  288. # 2. Create new collection
  289. mock_qdrant_client.create_collection.assert_called()
  290. # 3. Scroll legacy data
  291. scroll_calls = [call for call in mock_qdrant_client.scroll.call_args_list]
  292. assert len(scroll_calls) >= 1
  293. assert scroll_calls[0].kwargs["collection_name"] == legacy_collection
  294. # 4. Upsert to new collection
  295. upsert_calls = [call for call in mock_qdrant_client.upsert.call_args_list]
  296. assert len(upsert_calls) >= 1
  297. assert upsert_calls[0].kwargs["collection_name"] == new_collection
  298. # Note: Legacy collection is NOT automatically deleted after migration
  299. # Manual deletion is required after data migration verification
  300. print(
  301. f"✅ Scenario 2: Legacy data migrated from '{legacy_collection}' to '{expected_new_collection}'"
  302. )
  303. async def test_scenario_3_multi_model_coexistence(mock_qdrant_client):
  304. """
  305. 场景3:多模型并存
  306. 预期:两个独立的collection,互不干扰
  307. """
  308. # Model A: bge-small with 768d
  309. async def embed_func_a(texts, **kwargs):
  310. return np.array([[0.1] * 768 for _ in texts])
  311. model_a_func = EmbeddingFunc(
  312. embedding_dim=768, func=embed_func_a, model_name="bge-small"
  313. )
  314. # Model B: bge-large with 1024d
  315. async def embed_func_b(texts, **kwargs):
  316. return np.array([[0.2] * 1024 for _ in texts])
  317. model_b_func = EmbeddingFunc(
  318. embedding_dim=1024, func=embed_func_b, model_name="bge-large"
  319. )
  320. config = {
  321. "embedding_batch_num": 10,
  322. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  323. }
  324. # Create storage for workspace A with model A
  325. storage_a = QdrantVectorDBStorage(
  326. namespace="chunks",
  327. global_config=config,
  328. embedding_func=model_a_func,
  329. workspace="workspace_a",
  330. )
  331. # Create storage for workspace B with model B
  332. storage_b = QdrantVectorDBStorage(
  333. namespace="chunks",
  334. global_config=config,
  335. embedding_func=model_b_func,
  336. workspace="workspace_b",
  337. )
  338. # Verify: Collection names are different
  339. assert storage_a.final_namespace != storage_b.final_namespace
  340. # Verify: Model A collection
  341. expected_collection_a = "lightrag_vdb_chunks_bge_small_768d"
  342. assert storage_a.final_namespace == expected_collection_a
  343. # Verify: Model B collection
  344. expected_collection_b = "lightrag_vdb_chunks_bge_large_1024d"
  345. assert storage_b.final_namespace == expected_collection_b
  346. # Verify: Different embedding dimensions are preserved
  347. assert storage_a.embedding_func.embedding_dim == 768
  348. assert storage_b.embedding_func.embedding_dim == 1024
  349. print("✅ Scenario 3: Multi-model coexistence verified")
  350. print(f" - Workspace A: {expected_collection_a} (768d)")
  351. print(f" - Workspace B: {expected_collection_b} (1024d)")
  352. print(" - Collections are independent")
  353. async def test_case1_empty_legacy_auto_cleanup(mock_qdrant_client, mock_embedding_func):
  354. """
  355. Case 1a: 新旧collection都存在,且旧库为空
  356. 预期:自动删除旧库
  357. """
  358. config = {
  359. "embedding_batch_num": 10,
  360. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  361. }
  362. storage = QdrantVectorDBStorage(
  363. namespace="chunks",
  364. global_config=config,
  365. embedding_func=mock_embedding_func,
  366. workspace="test_ws",
  367. )
  368. # Legacy collection name (without model suffix)
  369. legacy_collection = "lightrag_vdb_chunks"
  370. new_collection = storage.final_namespace
  371. # Mock: Both collections exist
  372. mock_qdrant_client.collection_exists.side_effect = lambda name: (
  373. name
  374. in [
  375. legacy_collection,
  376. new_collection,
  377. ]
  378. )
  379. # Mock: Legacy collection is empty (0 records)
  380. def count_mock(collection_name, exact=True, count_filter=None):
  381. mock_result = MagicMock()
  382. if collection_name == legacy_collection:
  383. mock_result.count = 0 # Empty legacy collection
  384. else:
  385. mock_result.count = 100 # New collection has data
  386. return mock_result
  387. mock_qdrant_client.count.side_effect = count_mock
  388. # Mock get_collection for Case 2 check
  389. collection_info = MagicMock()
  390. collection_info.payload_schema = {"workspace_id": True}
  391. mock_qdrant_client.get_collection.return_value = collection_info
  392. # Initialize storage
  393. await storage.initialize()
  394. # Verify: Empty legacy collection should be automatically cleaned up
  395. # Empty collections are safe to delete without data loss risk
  396. delete_calls = [
  397. call for call in mock_qdrant_client.delete_collection.call_args_list
  398. ]
  399. assert len(delete_calls) >= 1, "Empty legacy collection should be auto-deleted"
  400. deleted_collection = (
  401. delete_calls[0][0][0]
  402. if delete_calls[0][0]
  403. else delete_calls[0].kwargs.get("collection_name")
  404. )
  405. assert (
  406. deleted_collection == legacy_collection
  407. ), f"Expected to delete '{legacy_collection}', but deleted '{deleted_collection}'"
  408. print(
  409. f"✅ Case 1a: Empty legacy collection '{legacy_collection}' auto-deleted successfully"
  410. )
  411. async def test_case1_nonempty_legacy_warning(mock_qdrant_client, mock_embedding_func):
  412. """
  413. Case 1b: 新旧collection都存在,且旧库有数据
  414. 预期:警告但不删除
  415. """
  416. config = {
  417. "embedding_batch_num": 10,
  418. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  419. }
  420. storage = QdrantVectorDBStorage(
  421. namespace="chunks",
  422. global_config=config,
  423. embedding_func=mock_embedding_func,
  424. workspace="test_ws",
  425. )
  426. # Legacy collection name (without model suffix)
  427. legacy_collection = "lightrag_vdb_chunks"
  428. new_collection = storage.final_namespace
  429. # Mock: Both collections exist
  430. mock_qdrant_client.collection_exists.side_effect = lambda name: (
  431. name
  432. in [
  433. legacy_collection,
  434. new_collection,
  435. ]
  436. )
  437. # Mock: Legacy collection has data (50 records)
  438. def count_mock(collection_name, exact=True, count_filter=None):
  439. mock_result = MagicMock()
  440. if collection_name == legacy_collection:
  441. mock_result.count = 50 # Legacy has data
  442. else:
  443. mock_result.count = 100 # New collection has data
  444. return mock_result
  445. mock_qdrant_client.count.side_effect = count_mock
  446. # Mock get_collection for Case 2 check
  447. collection_info = MagicMock()
  448. collection_info.payload_schema = {"workspace_id": True}
  449. mock_qdrant_client.get_collection.return_value = collection_info
  450. # Initialize storage
  451. await storage.initialize()
  452. # Verify: Legacy collection with data should be preserved
  453. # We never auto-delete collections that contain data to prevent accidental data loss
  454. delete_calls = [
  455. call for call in mock_qdrant_client.delete_collection.call_args_list
  456. ]
  457. # Check if legacy collection was deleted (it should not be)
  458. legacy_deleted = any(
  459. (call[0][0] if call[0] else call.kwargs.get("collection_name"))
  460. == legacy_collection
  461. for call in delete_calls
  462. )
  463. assert not legacy_deleted, "Legacy collection with data should NOT be auto-deleted"
  464. print(
  465. f"✅ Case 1b: Legacy collection '{legacy_collection}' with data preserved (warning only)"
  466. )