| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- import pytest
- from unittest.mock import MagicMock, patch, AsyncMock
- import numpy as np
- pytest.importorskip(
- "qdrant_client",
- reason="qdrant-client is required for Qdrant storage tests",
- )
- from qdrant_client import models # noqa: E402
- from lightrag.utils import EmbeddingFunc # noqa: E402
- from lightrag.kg.qdrant_impl import QdrantVectorDBStorage # noqa: E402
- # Mock QdrantClient
- @pytest.fixture
- def mock_qdrant_client():
- with patch("lightrag.kg.qdrant_impl.QdrantClient") as mock_client_cls:
- client = mock_client_cls.return_value
- client.collection_exists.return_value = False
- client.count.return_value.count = 0
- # Mock payload schema and vector config for get_collection
- collection_info = MagicMock()
- collection_info.payload_schema = {}
- # Mock vector dimension to match mock_embedding_func (768d)
- collection_info.config.params.vectors.size = 768
- client.get_collection.return_value = collection_info
- yield client
- # Mock get_data_init_lock to avoid async lock issues in tests
- @pytest.fixture(autouse=True)
- def mock_data_init_lock():
- with patch("lightrag.kg.qdrant_impl.get_data_init_lock") as mock_lock:
- mock_lock_ctx = AsyncMock()
- mock_lock.return_value = mock_lock_ctx
- yield mock_lock
- # Mock Embedding function
- @pytest.fixture
- def mock_embedding_func():
- async def embed_func(texts, **kwargs):
- return np.array([[0.1] * 768 for _ in texts])
- func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test-model")
- return func
- async def test_qdrant_collection_naming(mock_qdrant_client, mock_embedding_func):
- """Test if collection name is correctly generated with model suffix"""
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=mock_embedding_func,
- workspace="test_ws",
- )
- # Verify collection name contains model suffix
- expected_suffix = "test_model_768d"
- assert expected_suffix in storage.final_namespace
- assert storage.final_namespace == f"lightrag_vdb_chunks_{expected_suffix}"
- async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func):
- """Test if migration logic is triggered correctly"""
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=mock_embedding_func,
- workspace="test_ws",
- )
- # Legacy collection name (without model suffix)
- legacy_collection = "lightrag_vdb_chunks"
- # Setup mocks for migration scenario
- # 1. New collection does not exist, only legacy exists
- mock_qdrant_client.collection_exists.side_effect = lambda name: (
- name == legacy_collection
- )
- # 2. Legacy collection exists and has data
- migration_state = {"new_workspace_count": 0}
- def count_mock(collection_name, exact=True, count_filter=None):
- mock_result = MagicMock()
- if collection_name == legacy_collection:
- mock_result.count = 100
- elif collection_name == storage.final_namespace:
- mock_result.count = migration_state["new_workspace_count"]
- else:
- mock_result.count = 0
- return mock_result
- mock_qdrant_client.count.side_effect = count_mock
- # 3. Mock scroll for data migration
- mock_point = MagicMock()
- mock_point.id = "old_id"
- mock_point.vector = [0.1] * 768
- mock_point.payload = {"content": "test"} # No workspace_id in payload
- # When payload_schema is empty, the code first samples payloads to detect workspace_id
- # Then proceeds with migration batches
- # Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
- mock_qdrant_client.scroll.side_effect = [
- ([mock_point], "_"), # Sampling scroll - no workspace_id found
- ([mock_point], "next_offset"), # Migration batch
- ([], None), # End of migration
- ]
- def upsert_mock(*args, **kwargs):
- migration_state["new_workspace_count"] = 100
- return None
- mock_qdrant_client.upsert.side_effect = upsert_mock
- # Initialize storage (triggers migration)
- await storage.initialize()
- # Verify migration steps
- # 1. Legacy count checked
- mock_qdrant_client.count.assert_any_call(
- collection_name=legacy_collection, exact=True
- )
- # 2. New collection created
- mock_qdrant_client.create_collection.assert_called()
- # 3. Data scrolled from legacy
- # First call (index 0) is sampling scroll with limit=10
- # Second call (index 1) is migration batch with limit=500
- assert mock_qdrant_client.scroll.call_count >= 2
- # Check sampling scroll
- sampling_call = mock_qdrant_client.scroll.call_args_list[0]
- assert sampling_call.kwargs["collection_name"] == legacy_collection
- assert sampling_call.kwargs["limit"] == 10
- # Check migration batch scroll
- migration_call = mock_qdrant_client.scroll.call_args_list[1]
- assert migration_call.kwargs["collection_name"] == legacy_collection
- assert migration_call.kwargs["limit"] == 500
- # 4. Data upserted to new
- mock_qdrant_client.upsert.assert_called()
- # 5. Payload index created
- mock_qdrant_client.create_payload_index.assert_called()
- async def test_qdrant_no_migration_needed(mock_qdrant_client, mock_embedding_func):
- """Test scenario where new collection already exists (Case 1 in setup_collection)
- When only the new collection exists and no legacy collection is found,
- the implementation should:
- 1. Create payload index on the new collection (ensure index exists)
- 2. NOT attempt any data migration (no scroll calls)
- """
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=mock_embedding_func,
- workspace="test_ws",
- )
- # Only new collection exists (no legacy collection found)
- mock_qdrant_client.collection_exists.side_effect = lambda name: (
- name == storage.final_namespace
- )
- # Initialize
- await storage.initialize()
- # Should create payload index on the new collection (ensure index)
- mock_qdrant_client.create_payload_index.assert_called_with(
- collection_name=storage.final_namespace,
- field_name="workspace_id",
- field_schema=models.KeywordIndexParams(
- type=models.KeywordIndexType.KEYWORD,
- is_tenant=True,
- ),
- )
- # Should NOT migrate (no scroll calls since no legacy collection exists)
- mock_qdrant_client.scroll.assert_not_called()
- # ============================================================================
- # Tests for scenarios described in design document (Lines 606-649)
- # ============================================================================
- async def test_scenario_1_new_workspace_creation(
- mock_qdrant_client, mock_embedding_func
- ):
- """
- 场景1:新建workspace
- 预期:直接创建lightrag_vdb_chunks_text_embedding_3_large_3072d
- """
- # Use a large embedding model
- large_model_func = EmbeddingFunc(
- embedding_dim=3072,
- func=mock_embedding_func.func,
- model_name="text-embedding-3-large",
- )
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=large_model_func,
- workspace="test_new",
- )
- # Case 3: Neither legacy nor new collection exists
- mock_qdrant_client.collection_exists.return_value = False
- # Initialize storage
- await storage.initialize()
- # Verify: Should create new collection with model suffix
- expected_collection = "lightrag_vdb_chunks_text_embedding_3_large_3072d"
- assert storage.final_namespace == expected_collection
- # Verify create_collection was called with correct name
- create_calls = [
- call for call in mock_qdrant_client.create_collection.call_args_list
- ]
- assert len(create_calls) > 0
- assert (
- create_calls[0][0][0] == expected_collection
- or create_calls[0].kwargs.get("collection_name") == expected_collection
- )
- # Verify no migration was attempted
- mock_qdrant_client.scroll.assert_not_called()
- print(
- f"✅ Scenario 1: New workspace created with collection '{expected_collection}'"
- )
- async def test_scenario_2_legacy_upgrade_migration(
- mock_qdrant_client, mock_embedding_func
- ):
- """
- 场景2:从旧版本升级
- 已存在lightrag_vdb_chunks(无后缀)
- 预期:自动迁移数据到lightrag_vdb_chunks_text_embedding_ada_002_1536d
- 注意:迁移后不再自动删除遗留集合,需要手动删除
- """
- # Use ada-002 model
- ada_func = EmbeddingFunc(
- embedding_dim=1536,
- func=mock_embedding_func.func,
- model_name="text-embedding-ada-002",
- )
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=ada_func,
- workspace="test_legacy",
- )
- # Legacy collection name (without model suffix)
- legacy_collection = "lightrag_vdb_chunks"
- new_collection = storage.final_namespace
- # Case 4: Only legacy collection exists
- mock_qdrant_client.collection_exists.side_effect = lambda name: (
- name == legacy_collection
- )
- # Mock legacy collection info with 1536d vectors
- legacy_collection_info = MagicMock()
- legacy_collection_info.payload_schema = {}
- legacy_collection_info.config.params.vectors.size = 1536
- mock_qdrant_client.get_collection.return_value = legacy_collection_info
- migration_state = {"new_workspace_count": 0}
- def count_mock(collection_name, exact=True, count_filter=None):
- mock_result = MagicMock()
- if collection_name == legacy_collection:
- mock_result.count = 150
- elif collection_name == new_collection:
- mock_result.count = migration_state["new_workspace_count"]
- else:
- mock_result.count = 0
- return mock_result
- mock_qdrant_client.count.side_effect = count_mock
- # Mock scroll results (simulate migration in batches)
- mock_points = []
- for i in range(10):
- point = MagicMock()
- point.id = f"legacy-{i}"
- point.vector = [0.1] * 1536
- # No workspace_id in payload - simulates legacy data
- point.payload = {"content": f"Legacy document {i}", "id": f"doc-{i}"}
- mock_points.append(point)
- # When payload_schema is empty, the code first samples payloads to detect workspace_id
- # Then proceeds with migration batches
- # Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
- mock_qdrant_client.scroll.side_effect = [
- (mock_points, "_"), # Sampling scroll - no workspace_id found in payloads
- (mock_points, "offset1"), # Migration batch
- ([], None), # End of migration
- ]
- def upsert_mock(*args, **kwargs):
- migration_state["new_workspace_count"] = 150
- return None
- mock_qdrant_client.upsert.side_effect = upsert_mock
- # Initialize (triggers migration)
- await storage.initialize()
- # Verify: New collection should be created
- expected_new_collection = "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
- assert storage.final_namespace == expected_new_collection
- # Verify migration steps
- # 1. Check legacy count
- mock_qdrant_client.count.assert_any_call(
- collection_name=legacy_collection, exact=True
- )
- # 2. Create new collection
- mock_qdrant_client.create_collection.assert_called()
- # 3. Scroll legacy data
- scroll_calls = [call for call in mock_qdrant_client.scroll.call_args_list]
- assert len(scroll_calls) >= 1
- assert scroll_calls[0].kwargs["collection_name"] == legacy_collection
- # 4. Upsert to new collection
- upsert_calls = [call for call in mock_qdrant_client.upsert.call_args_list]
- assert len(upsert_calls) >= 1
- assert upsert_calls[0].kwargs["collection_name"] == new_collection
- # Note: Legacy collection is NOT automatically deleted after migration
- # Manual deletion is required after data migration verification
- print(
- f"✅ Scenario 2: Legacy data migrated from '{legacy_collection}' to '{expected_new_collection}'"
- )
- async def test_scenario_3_multi_model_coexistence(mock_qdrant_client):
- """
- 场景3:多模型并存
- 预期:两个独立的collection,互不干扰
- """
- # Model A: bge-small with 768d
- async def embed_func_a(texts, **kwargs):
- return np.array([[0.1] * 768 for _ in texts])
- model_a_func = EmbeddingFunc(
- embedding_dim=768, func=embed_func_a, model_name="bge-small"
- )
- # Model B: bge-large with 1024d
- async def embed_func_b(texts, **kwargs):
- return np.array([[0.2] * 1024 for _ in texts])
- model_b_func = EmbeddingFunc(
- embedding_dim=1024, func=embed_func_b, model_name="bge-large"
- )
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- # Create storage for workspace A with model A
- storage_a = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=model_a_func,
- workspace="workspace_a",
- )
- # Create storage for workspace B with model B
- storage_b = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=model_b_func,
- workspace="workspace_b",
- )
- # Verify: Collection names are different
- assert storage_a.final_namespace != storage_b.final_namespace
- # Verify: Model A collection
- expected_collection_a = "lightrag_vdb_chunks_bge_small_768d"
- assert storage_a.final_namespace == expected_collection_a
- # Verify: Model B collection
- expected_collection_b = "lightrag_vdb_chunks_bge_large_1024d"
- assert storage_b.final_namespace == expected_collection_b
- # Verify: Different embedding dimensions are preserved
- assert storage_a.embedding_func.embedding_dim == 768
- assert storage_b.embedding_func.embedding_dim == 1024
- print("✅ Scenario 3: Multi-model coexistence verified")
- print(f" - Workspace A: {expected_collection_a} (768d)")
- print(f" - Workspace B: {expected_collection_b} (1024d)")
- print(" - Collections are independent")
- async def test_case1_empty_legacy_auto_cleanup(mock_qdrant_client, mock_embedding_func):
- """
- Case 1a: 新旧collection都存在,且旧库为空
- 预期:自动删除旧库
- """
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=mock_embedding_func,
- workspace="test_ws",
- )
- # Legacy collection name (without model suffix)
- legacy_collection = "lightrag_vdb_chunks"
- new_collection = storage.final_namespace
- # Mock: Both collections exist
- mock_qdrant_client.collection_exists.side_effect = lambda name: (
- name
- in [
- legacy_collection,
- new_collection,
- ]
- )
- # Mock: Legacy collection is empty (0 records)
- def count_mock(collection_name, exact=True, count_filter=None):
- mock_result = MagicMock()
- if collection_name == legacy_collection:
- mock_result.count = 0 # Empty legacy collection
- else:
- mock_result.count = 100 # New collection has data
- return mock_result
- mock_qdrant_client.count.side_effect = count_mock
- # Mock get_collection for Case 2 check
- collection_info = MagicMock()
- collection_info.payload_schema = {"workspace_id": True}
- mock_qdrant_client.get_collection.return_value = collection_info
- # Initialize storage
- await storage.initialize()
- # Verify: Empty legacy collection should be automatically cleaned up
- # Empty collections are safe to delete without data loss risk
- delete_calls = [
- call for call in mock_qdrant_client.delete_collection.call_args_list
- ]
- assert len(delete_calls) >= 1, "Empty legacy collection should be auto-deleted"
- deleted_collection = (
- delete_calls[0][0][0]
- if delete_calls[0][0]
- else delete_calls[0].kwargs.get("collection_name")
- )
- assert (
- deleted_collection == legacy_collection
- ), f"Expected to delete '{legacy_collection}', but deleted '{deleted_collection}'"
- print(
- f"✅ Case 1a: Empty legacy collection '{legacy_collection}' auto-deleted successfully"
- )
- async def test_case1_nonempty_legacy_warning(mock_qdrant_client, mock_embedding_func):
- """
- Case 1b: 新旧collection都存在,且旧库有数据
- 预期:警告但不删除
- """
- config = {
- "embedding_batch_num": 10,
- "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
- }
- storage = QdrantVectorDBStorage(
- namespace="chunks",
- global_config=config,
- embedding_func=mock_embedding_func,
- workspace="test_ws",
- )
- # Legacy collection name (without model suffix)
- legacy_collection = "lightrag_vdb_chunks"
- new_collection = storage.final_namespace
- # Mock: Both collections exist
- mock_qdrant_client.collection_exists.side_effect = lambda name: (
- name
- in [
- legacy_collection,
- new_collection,
- ]
- )
- # Mock: Legacy collection has data (50 records)
- def count_mock(collection_name, exact=True, count_filter=None):
- mock_result = MagicMock()
- if collection_name == legacy_collection:
- mock_result.count = 50 # Legacy has data
- else:
- mock_result.count = 100 # New collection has data
- return mock_result
- mock_qdrant_client.count.side_effect = count_mock
- # Mock get_collection for Case 2 check
- collection_info = MagicMock()
- collection_info.payload_schema = {"workspace_id": True}
- mock_qdrant_client.get_collection.return_value = collection_info
- # Initialize storage
- await storage.initialize()
- # Verify: Legacy collection with data should be preserved
- # We never auto-delete collections that contain data to prevent accidental data loss
- delete_calls = [
- call for call in mock_qdrant_client.delete_collection.call_args_list
- ]
- # Check if legacy collection was deleted (it should not be)
- legacy_deleted = any(
- (call[0][0] if call[0] else call.kwargs.get("collection_name"))
- == legacy_collection
- for call in delete_calls
- )
- assert not legacy_deleted, "Legacy collection with data should NOT be auto-deleted"
- print(
- f"✅ Case 1b: Legacy collection '{legacy_collection}' with data preserved (warning only)"
- )
|