test_postgres_migration.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. import pytest
  2. from unittest.mock import patch, AsyncMock
  3. import numpy as np
  4. from lightrag.utils import EmbeddingFunc
  5. from lightrag.kg.postgres_impl import (
  6. PGVectorStorage,
  7. )
  8. from lightrag.namespace import NameSpace
  9. # Mock PostgreSQLDB
  10. @pytest.fixture
  11. def mock_pg_db():
  12. """Mock PostgreSQL database connection"""
  13. db = AsyncMock()
  14. db.workspace = "test_workspace"
  15. # Mock query responses with multirows support
  16. async def mock_query(sql, params=None, multirows=False, **kwargs):
  17. # Default return value
  18. if multirows:
  19. return [] # Return empty list for multirows
  20. return {"exists": False, "count": 0}
  21. # Mock for execute that mimics PostgreSQLDB.execute() behavior
  22. async def mock_execute(sql, data=None, **kwargs):
  23. """
  24. Mock that mimics PostgreSQLDB.execute() behavior:
  25. - Accepts data as dict[str, Any] | None (second parameter)
  26. - Internally converts dict.values() to tuple for AsyncPG
  27. """
  28. # Mimic real execute() which accepts dict and converts to tuple
  29. if data is not None and not isinstance(data, dict):
  30. raise TypeError(
  31. f"PostgreSQLDB.execute() expects data as dict, got {type(data).__name__}"
  32. )
  33. return None
  34. db.query = AsyncMock(side_effect=mock_query)
  35. db.execute = AsyncMock(side_effect=mock_execute)
  36. return db
  37. # Mock get_data_init_lock to avoid async lock issues in tests
  38. @pytest.fixture(autouse=True)
  39. def mock_data_init_lock():
  40. with patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock:
  41. mock_lock_ctx = AsyncMock()
  42. mock_lock.return_value = mock_lock_ctx
  43. yield mock_lock
  44. # Mock ClientManager
  45. @pytest.fixture
  46. def mock_client_manager(mock_pg_db):
  47. with patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager:
  48. mock_manager.get_client = AsyncMock(return_value=mock_pg_db)
  49. mock_manager.release_client = AsyncMock()
  50. yield mock_manager
  51. # Mock Embedding function
  52. @pytest.fixture
  53. def mock_embedding_func():
  54. async def embed_func(texts, **kwargs):
  55. return np.array([[0.1] * 768 for _ in texts])
  56. func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test_model")
  57. return func
  58. async def test_postgres_table_naming(
  59. mock_client_manager, mock_pg_db, mock_embedding_func
  60. ):
  61. """Test if table name is correctly generated with model suffix"""
  62. config = {
  63. "embedding_batch_num": 10,
  64. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  65. }
  66. storage = PGVectorStorage(
  67. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  68. global_config=config,
  69. embedding_func=mock_embedding_func,
  70. workspace="test_ws",
  71. )
  72. # Verify table name contains model suffix
  73. expected_suffix = "test_model_768d"
  74. assert expected_suffix in storage.table_name
  75. assert storage.table_name == f"LIGHTRAG_VDB_CHUNKS_{expected_suffix}"
  76. # Verify legacy table name
  77. assert storage.legacy_table_name == "LIGHTRAG_VDB_CHUNKS"
  78. async def test_postgres_migration_trigger(
  79. mock_client_manager, mock_pg_db, mock_embedding_func
  80. ):
  81. """Test if migration logic is triggered correctly"""
  82. config = {
  83. "embedding_batch_num": 10,
  84. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  85. }
  86. storage = PGVectorStorage(
  87. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  88. global_config=config,
  89. embedding_func=mock_embedding_func,
  90. workspace="test_ws",
  91. )
  92. # Setup mocks for migration scenario
  93. # 1. New table does not exist, legacy table exists
  94. async def mock_check_table_exists(table_name):
  95. return table_name == storage.legacy_table_name
  96. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  97. # 2. Legacy table has 100 records
  98. mock_rows = [
  99. {"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"}
  100. for i in range(100)
  101. ]
  102. migration_state = {"new_table_count": 0}
  103. async def mock_query(sql, params=None, multirows=False, **kwargs):
  104. if "COUNT(*)" in sql:
  105. sql_upper = sql.upper()
  106. legacy_table = storage.legacy_table_name.upper()
  107. new_table = storage.table_name.upper()
  108. is_new_table = new_table in sql_upper
  109. is_legacy_table = legacy_table in sql_upper and not is_new_table
  110. if is_new_table:
  111. return {"count": migration_state["new_table_count"]}
  112. if is_legacy_table:
  113. return {"count": 100}
  114. return {"count": 0}
  115. elif multirows and "SELECT *" in sql:
  116. # Mock batch fetch for migration using keyset pagination
  117. # New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3
  118. # or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2
  119. if "WHERE workspace" in sql:
  120. if "id >" in sql:
  121. # Keyset pagination: params = [workspace, last_id, limit]
  122. last_id = params[1] if len(params) > 1 else None
  123. # Find rows after last_id
  124. start_idx = 0
  125. for i, row in enumerate(mock_rows):
  126. if row["id"] == last_id:
  127. start_idx = i + 1
  128. break
  129. limit = params[2] if len(params) > 2 else 500
  130. else:
  131. # First batch (no last_id): params = [workspace, limit]
  132. start_idx = 0
  133. limit = params[1] if len(params) > 1 else 500
  134. else:
  135. # No workspace filter with keyset
  136. if "id >" in sql:
  137. last_id = params[0] if params else None
  138. start_idx = 0
  139. for i, row in enumerate(mock_rows):
  140. if row["id"] == last_id:
  141. start_idx = i + 1
  142. break
  143. limit = params[1] if len(params) > 1 else 500
  144. else:
  145. start_idx = 0
  146. limit = params[0] if params else 500
  147. end = min(start_idx + limit, len(mock_rows))
  148. return mock_rows[start_idx:end]
  149. return {}
  150. mock_pg_db.query = AsyncMock(side_effect=mock_query)
  151. # Track migration through _run_with_retry calls
  152. migration_executed = []
  153. async def mock_run_with_retry(operation, **kwargs):
  154. # Track that migration batch operation was called
  155. migration_executed.append(True)
  156. migration_state["new_table_count"] = 100
  157. return None
  158. mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry)
  159. with patch(
  160. "lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
  161. ):
  162. # Initialize storage (should trigger migration)
  163. await storage.initialize()
  164. # Verify migration was executed by checking _run_with_retry was called
  165. # (batch migration uses _run_with_retry with executemany)
  166. assert len(migration_executed) > 0, "Migration should have been executed"
  167. async def test_postgres_no_migration_needed(
  168. mock_client_manager, mock_pg_db, mock_embedding_func
  169. ):
  170. """Test scenario where new table already exists (no migration needed)"""
  171. config = {
  172. "embedding_batch_num": 10,
  173. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  174. }
  175. storage = PGVectorStorage(
  176. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  177. global_config=config,
  178. embedding_func=mock_embedding_func,
  179. workspace="test_ws",
  180. )
  181. # Mock: new table already exists
  182. async def mock_check_table_exists(table_name):
  183. return table_name == storage.table_name
  184. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  185. with patch(
  186. "lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
  187. ) as mock_create:
  188. await storage.initialize()
  189. # Verify no table creation was attempted
  190. mock_create.assert_not_called()
  191. async def test_scenario_1_new_workspace_creation(
  192. mock_client_manager, mock_pg_db, mock_embedding_func
  193. ):
  194. """
  195. Scenario 1: New workspace creation
  196. Expected behavior:
  197. - No legacy table exists
  198. - Directly create new table with model suffix
  199. - No migration needed
  200. """
  201. config = {
  202. "embedding_batch_num": 10,
  203. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  204. }
  205. embedding_func = EmbeddingFunc(
  206. embedding_dim=3072,
  207. func=mock_embedding_func.func,
  208. model_name="text-embedding-3-large",
  209. )
  210. storage = PGVectorStorage(
  211. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  212. global_config=config,
  213. embedding_func=embedding_func,
  214. workspace="new_workspace",
  215. )
  216. # Mock: neither table exists
  217. async def mock_check_table_exists(table_name):
  218. return False
  219. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  220. with patch(
  221. "lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
  222. ) as mock_create:
  223. await storage.initialize()
  224. # Verify table name format
  225. assert "text_embedding_3_large_3072d" in storage.table_name
  226. # Verify new table creation was called
  227. mock_create.assert_called_once()
  228. call_args = mock_create.call_args
  229. assert (
  230. call_args[0][1] == storage.table_name
  231. ) # table_name is second positional arg
  232. async def test_scenario_2_legacy_upgrade_migration(
  233. mock_client_manager, mock_pg_db, mock_embedding_func
  234. ):
  235. """
  236. Scenario 2: Upgrade from legacy version
  237. Expected behavior:
  238. - Legacy table exists (without model suffix)
  239. - New table doesn't exist
  240. - Automatically migrate data to new table with suffix
  241. """
  242. config = {
  243. "embedding_batch_num": 10,
  244. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  245. }
  246. embedding_func = EmbeddingFunc(
  247. embedding_dim=1536,
  248. func=mock_embedding_func.func,
  249. model_name="text-embedding-ada-002",
  250. )
  251. storage = PGVectorStorage(
  252. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  253. global_config=config,
  254. embedding_func=embedding_func,
  255. workspace="legacy_workspace",
  256. )
  257. # Mock: only legacy table exists
  258. async def mock_check_table_exists(table_name):
  259. return table_name == storage.legacy_table_name
  260. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  261. # Mock: legacy table has 50 records
  262. mock_rows = [
  263. {
  264. "id": f"legacy_id_{i}",
  265. "content": f"legacy_content_{i}",
  266. "workspace": "legacy_workspace",
  267. }
  268. for i in range(50)
  269. ]
  270. # Track which queries have been made for proper response
  271. query_history = []
  272. migration_state = {"new_table_count": 0}
  273. async def mock_query(sql, params=None, multirows=False, **kwargs):
  274. query_history.append(sql)
  275. if "COUNT(*)" in sql:
  276. # Determine table type:
  277. # - Legacy: contains base name but NOT model suffix
  278. # - New: contains model suffix (e.g., text_embedding_ada_002_1536d)
  279. sql_upper = sql.upper()
  280. base_name = storage.legacy_table_name.upper()
  281. # Check if this is querying the new table (has model suffix)
  282. has_model_suffix = storage.table_name.upper() in sql_upper
  283. is_legacy_table = base_name in sql_upper and not has_model_suffix
  284. has_workspace_filter = "WHERE workspace" in sql
  285. if is_legacy_table and has_workspace_filter:
  286. # Count for legacy table with workspace filter (before migration)
  287. return {"count": 50}
  288. elif is_legacy_table and not has_workspace_filter:
  289. # Total count for legacy table
  290. return {"count": 50}
  291. else:
  292. # New table count (before/after migration)
  293. return {"count": migration_state["new_table_count"]}
  294. elif multirows and "SELECT *" in sql:
  295. # Mock batch fetch for migration using keyset pagination
  296. # New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3
  297. # or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2
  298. if "WHERE workspace" in sql:
  299. if "id >" in sql:
  300. # Keyset pagination: params = [workspace, last_id, limit]
  301. last_id = params[1] if len(params) > 1 else None
  302. # Find rows after last_id
  303. start_idx = 0
  304. for i, row in enumerate(mock_rows):
  305. if row["id"] == last_id:
  306. start_idx = i + 1
  307. break
  308. limit = params[2] if len(params) > 2 else 500
  309. else:
  310. # First batch (no last_id): params = [workspace, limit]
  311. start_idx = 0
  312. limit = params[1] if len(params) > 1 else 500
  313. else:
  314. # No workspace filter with keyset
  315. if "id >" in sql:
  316. last_id = params[0] if params else None
  317. start_idx = 0
  318. for i, row in enumerate(mock_rows):
  319. if row["id"] == last_id:
  320. start_idx = i + 1
  321. break
  322. limit = params[1] if len(params) > 1 else 500
  323. else:
  324. start_idx = 0
  325. limit = params[0] if params else 500
  326. end = min(start_idx + limit, len(mock_rows))
  327. return mock_rows[start_idx:end]
  328. return {}
  329. mock_pg_db.query = AsyncMock(side_effect=mock_query)
  330. # Track migration through _run_with_retry calls
  331. migration_executed = []
  332. async def mock_run_with_retry(operation, **kwargs):
  333. # Track that migration batch operation was called
  334. migration_executed.append(True)
  335. migration_state["new_table_count"] = 50
  336. return None
  337. mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry)
  338. with patch(
  339. "lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
  340. ) as mock_create:
  341. await storage.initialize()
  342. # Verify table name contains ada-002
  343. assert "text_embedding_ada_002_1536d" in storage.table_name
  344. # Verify migration was executed (batch migration uses _run_with_retry)
  345. assert len(migration_executed) > 0, "Migration should have been executed"
  346. mock_create.assert_called_once()
  347. async def test_scenario_3_multi_model_coexistence(
  348. mock_client_manager, mock_pg_db, mock_embedding_func
  349. ):
  350. """
  351. Scenario 3: Multiple embedding models coexist
  352. Expected behavior:
  353. - Different embedding models create separate tables
  354. - Tables are isolated by model suffix
  355. - No interference between different models
  356. """
  357. config = {
  358. "embedding_batch_num": 10,
  359. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  360. }
  361. # Workspace A: uses bge-small (768d)
  362. embedding_func_a = EmbeddingFunc(
  363. embedding_dim=768, func=mock_embedding_func.func, model_name="bge-small"
  364. )
  365. storage_a = PGVectorStorage(
  366. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  367. global_config=config,
  368. embedding_func=embedding_func_a,
  369. workspace="workspace_a",
  370. )
  371. # Workspace B: uses bge-large (1024d)
  372. async def embed_func_b(texts, **kwargs):
  373. return np.array([[0.1] * 1024 for _ in texts])
  374. embedding_func_b = EmbeddingFunc(
  375. embedding_dim=1024, func=embed_func_b, model_name="bge-large"
  376. )
  377. storage_b = PGVectorStorage(
  378. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  379. global_config=config,
  380. embedding_func=embedding_func_b,
  381. workspace="workspace_b",
  382. )
  383. # Verify different table names
  384. assert storage_a.table_name != storage_b.table_name
  385. assert "bge_small_768d" in storage_a.table_name
  386. assert "bge_large_1024d" in storage_b.table_name
  387. # Mock: both tables don't exist yet
  388. async def mock_check_table_exists(table_name):
  389. return False
  390. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  391. with patch(
  392. "lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
  393. ) as mock_create:
  394. # Initialize both storages
  395. await storage_a.initialize()
  396. await storage_b.initialize()
  397. # Verify two separate tables were created
  398. assert mock_create.call_count == 2
  399. # Verify table names are different
  400. call_args_list = mock_create.call_args_list
  401. table_names = [call[0][1] for call in call_args_list] # Second positional arg
  402. assert len(set(table_names)) == 2 # Two unique table names
  403. assert storage_a.table_name in table_names
  404. assert storage_b.table_name in table_names
  405. async def test_case1_empty_legacy_auto_cleanup(
  406. mock_client_manager, mock_pg_db, mock_embedding_func
  407. ):
  408. """
  409. Case 1a: Both new and legacy tables exist, but legacy is EMPTY
  410. Expected: Automatically delete empty legacy table (safe cleanup)
  411. """
  412. config = {
  413. "embedding_batch_num": 10,
  414. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  415. }
  416. embedding_func = EmbeddingFunc(
  417. embedding_dim=1536,
  418. func=mock_embedding_func.func,
  419. model_name="test-model",
  420. )
  421. storage = PGVectorStorage(
  422. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  423. global_config=config,
  424. embedding_func=embedding_func,
  425. workspace="test_ws",
  426. )
  427. # Mock: Both tables exist
  428. async def mock_check_table_exists(table_name):
  429. return True # Both new and legacy exist
  430. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  431. # Mock: Legacy table is empty (0 records)
  432. async def mock_query(sql, params=None, multirows=False, **kwargs):
  433. if "COUNT(*)" in sql:
  434. if storage.legacy_table_name in sql:
  435. return {"count": 0} # Empty legacy table
  436. else:
  437. return {"count": 100} # New table has data
  438. return {}
  439. mock_pg_db.query = AsyncMock(side_effect=mock_query)
  440. with patch("lightrag.kg.postgres_impl.logger"):
  441. await storage.initialize()
  442. # Verify: Empty legacy table should be automatically cleaned up
  443. # Empty tables are safe to delete without data loss risk
  444. delete_calls = [
  445. call
  446. for call in mock_pg_db.execute.call_args_list
  447. if call[0][0] and "DROP TABLE" in call[0][0]
  448. ]
  449. assert len(delete_calls) >= 1, "Empty legacy table should be auto-deleted"
  450. # Check if legacy table was dropped
  451. dropped_table = storage.legacy_table_name
  452. assert any(
  453. dropped_table in str(call) for call in delete_calls
  454. ), f"Expected to drop empty legacy table '{dropped_table}'"
  455. print(
  456. f"✅ Case 1a: Empty legacy table '{dropped_table}' auto-deleted successfully"
  457. )
  458. async def test_case1_nonempty_legacy_warning(
  459. mock_client_manager, mock_pg_db, mock_embedding_func
  460. ):
  461. """
  462. Case 1b: Both new and legacy tables exist, and legacy HAS DATA
  463. Expected: Log warning, do not delete legacy (preserve data)
  464. """
  465. config = {
  466. "embedding_batch_num": 10,
  467. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  468. }
  469. embedding_func = EmbeddingFunc(
  470. embedding_dim=1536,
  471. func=mock_embedding_func.func,
  472. model_name="test-model",
  473. )
  474. storage = PGVectorStorage(
  475. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  476. global_config=config,
  477. embedding_func=embedding_func,
  478. workspace="test_ws",
  479. )
  480. # Mock: Both tables exist
  481. async def mock_check_table_exists(table_name):
  482. return True # Both new and legacy exist
  483. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
  484. # Mock: Legacy table has data (50 records)
  485. async def mock_query(sql, params=None, multirows=False, **kwargs):
  486. if "COUNT(*)" in sql:
  487. if storage.legacy_table_name in sql:
  488. return {"count": 50} # Legacy has data
  489. else:
  490. return {"count": 100} # New table has data
  491. return {}
  492. mock_pg_db.query = AsyncMock(side_effect=mock_query)
  493. with patch("lightrag.kg.postgres_impl.logger"):
  494. await storage.initialize()
  495. # Verify: Legacy table with data should be preserved
  496. # We never auto-delete tables that contain data to prevent accidental data loss
  497. delete_calls = [
  498. call
  499. for call in mock_pg_db.execute.call_args_list
  500. if call[0][0] and "DROP TABLE" in call[0][0]
  501. ]
  502. # Check if legacy table was deleted (it should not be)
  503. dropped_table = storage.legacy_table_name
  504. legacy_deleted = any(dropped_table in str(call) for call in delete_calls)
  505. assert not legacy_deleted, "Legacy table with data should NOT be auto-deleted"
  506. print(
  507. f"✅ Case 1b: Legacy table '{dropped_table}' with data preserved (warning only)"
  508. )
  509. async def test_case1_sequential_workspace_migration(
  510. mock_client_manager, mock_pg_db, mock_embedding_func
  511. ):
  512. """
  513. Case 1c: Sequential workspace migration (Multi-tenant scenario)
  514. Critical bug fix verification:
  515. Timeline:
  516. 1. Legacy table has workspace_a (3 records) + workspace_b (3 records)
  517. 2. Workspace A initializes first → Case 3 (only legacy exists) → migrates A's data
  518. 3. Workspace B initializes later → Case 3 (both tables exist, legacy has B's data) → should migrate B's data
  519. 4. Verify workspace B's data is correctly migrated to new table
  520. This test verifies the migration logic correctly handles multi-tenant scenarios
  521. where different workspaces migrate sequentially.
  522. """
  523. config = {
  524. "embedding_batch_num": 10,
  525. "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
  526. }
  527. embedding_func = EmbeddingFunc(
  528. embedding_dim=1536,
  529. func=mock_embedding_func.func,
  530. model_name="test-model",
  531. )
  532. # Mock data: Legacy table has 6 records total (3 from workspace_a, 3 from workspace_b)
  533. mock_rows_a = [
  534. {"id": f"a_{i}", "content": f"A content {i}", "workspace": "workspace_a"}
  535. for i in range(3)
  536. ]
  537. mock_rows_b = [
  538. {"id": f"b_{i}", "content": f"B content {i}", "workspace": "workspace_b"}
  539. for i in range(3)
  540. ]
  541. # Track migration state
  542. migration_state = {
  543. "new_table_exists": False,
  544. "workspace_a_migrated": False,
  545. "workspace_a_migration_count": 0,
  546. "workspace_b_migration_count": 0,
  547. }
  548. # Step 1: Simulate workspace_a initialization (Case 3 - only legacy exists)
  549. # CRITICAL: Set db.workspace to workspace_a
  550. mock_pg_db.workspace = "workspace_a"
  551. storage_a = PGVectorStorage(
  552. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  553. global_config=config,
  554. embedding_func=embedding_func,
  555. workspace="workspace_a",
  556. )
  557. # Mock table_exists for workspace_a
  558. async def mock_check_table_exists_a(table_name):
  559. if table_name == storage_a.legacy_table_name:
  560. return True
  561. if table_name == storage_a.table_name:
  562. return migration_state["new_table_exists"]
  563. return False
  564. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists_a)
  565. # Mock query for workspace_a (Case 3)
  566. async def mock_query_a(sql, params=None, multirows=False, **kwargs):
  567. sql_upper = sql.upper()
  568. base_name = storage_a.legacy_table_name.upper()
  569. if "COUNT(*)" in sql:
  570. has_model_suffix = "TEST_MODEL_1536D" in sql_upper
  571. is_legacy = base_name in sql_upper and not has_model_suffix
  572. has_workspace_filter = "WHERE workspace" in sql
  573. if is_legacy and has_workspace_filter:
  574. workspace = params[0] if params and len(params) > 0 else None
  575. if workspace == "workspace_a":
  576. return {"count": 3}
  577. elif workspace == "workspace_b":
  578. return {"count": 3}
  579. elif is_legacy and not has_workspace_filter:
  580. # Global count in legacy table
  581. return {"count": 6}
  582. elif has_model_suffix:
  583. if has_workspace_filter:
  584. workspace = params[0] if params and len(params) > 0 else None
  585. if workspace == "workspace_a":
  586. return {"count": migration_state["workspace_a_migration_count"]}
  587. if workspace == "workspace_b":
  588. return {"count": migration_state["workspace_b_migration_count"]}
  589. return {
  590. "count": migration_state["workspace_a_migration_count"]
  591. + migration_state["workspace_b_migration_count"]
  592. }
  593. elif multirows and "SELECT *" in sql:
  594. if "WHERE workspace" in sql:
  595. workspace = params[0] if params and len(params) > 0 else None
  596. if workspace == "workspace_a":
  597. # Handle keyset pagination
  598. if "id >" in sql:
  599. # params = [workspace, last_id, limit]
  600. last_id = params[1] if len(params) > 1 else None
  601. start_idx = 0
  602. for i, row in enumerate(mock_rows_a):
  603. if row["id"] == last_id:
  604. start_idx = i + 1
  605. break
  606. limit = params[2] if len(params) > 2 else 500
  607. else:
  608. # First batch: params = [workspace, limit]
  609. start_idx = 0
  610. limit = params[1] if len(params) > 1 else 500
  611. end = min(start_idx + limit, len(mock_rows_a))
  612. return mock_rows_a[start_idx:end]
  613. return {}
  614. mock_pg_db.query = AsyncMock(side_effect=mock_query_a)
  615. # Track migration via _run_with_retry (batch migration uses this)
  616. migration_a_executed = []
  617. async def mock_run_with_retry_a(operation, **kwargs):
  618. migration_a_executed.append(True)
  619. migration_state["workspace_a_migration_count"] = len(mock_rows_a)
  620. return None
  621. mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry_a)
  622. # Initialize workspace_a (Case 3)
  623. with patch("lightrag.kg.postgres_impl.logger"):
  624. await storage_a.initialize()
  625. migration_state["new_table_exists"] = True
  626. migration_state["workspace_a_migrated"] = True
  627. print("✅ Step 1: Workspace A initialized")
  628. # Verify migration was executed via _run_with_retry (batch migration uses executemany)
  629. assert (
  630. len(migration_a_executed) > 0
  631. ), "Migration should have been executed for workspace_a"
  632. print(f"✅ Step 1: Migration executed {len(migration_a_executed)} batch(es)")
  633. # Step 2: Simulate workspace_b initialization (Case 3 - both exist, but legacy has B's data)
  634. # CRITICAL: Set db.workspace to workspace_b
  635. mock_pg_db.workspace = "workspace_b"
  636. storage_b = PGVectorStorage(
  637. namespace=NameSpace.VECTOR_STORE_CHUNKS,
  638. global_config=config,
  639. embedding_func=embedding_func,
  640. workspace="workspace_b",
  641. )
  642. mock_pg_db.reset_mock()
  643. # Mock table_exists for workspace_b (both exist)
  644. async def mock_check_table_exists_b(table_name):
  645. return True # Both tables exist
  646. mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists_b)
  647. # Mock query for workspace_b (Case 3)
  648. async def mock_query_b(sql, params=None, multirows=False, **kwargs):
  649. sql_upper = sql.upper()
  650. base_name = storage_b.legacy_table_name.upper()
  651. if "COUNT(*)" in sql:
  652. has_model_suffix = "TEST_MODEL_1536D" in sql_upper
  653. is_legacy = base_name in sql_upper and not has_model_suffix
  654. has_workspace_filter = "WHERE workspace" in sql
  655. if is_legacy and has_workspace_filter:
  656. workspace = params[0] if params and len(params) > 0 else None
  657. if workspace == "workspace_b":
  658. return {"count": 3} # workspace_b still has data in legacy
  659. elif workspace == "workspace_a":
  660. return {"count": 0} # workspace_a already migrated
  661. elif is_legacy and not has_workspace_filter:
  662. # Global count: only workspace_b data remains
  663. return {"count": 3}
  664. elif has_model_suffix:
  665. if has_workspace_filter:
  666. workspace = params[0] if params and len(params) > 0 else None
  667. if workspace == "workspace_b":
  668. return {"count": migration_state["workspace_b_migration_count"]}
  669. elif workspace == "workspace_a":
  670. return {"count": 3}
  671. else:
  672. return {"count": 3 + migration_state["workspace_b_migration_count"]}
  673. elif multirows and "SELECT *" in sql:
  674. if "WHERE workspace" in sql:
  675. workspace = params[0] if params and len(params) > 0 else None
  676. if workspace == "workspace_b":
  677. # Handle keyset pagination
  678. if "id >" in sql:
  679. # params = [workspace, last_id, limit]
  680. last_id = params[1] if len(params) > 1 else None
  681. start_idx = 0
  682. for i, row in enumerate(mock_rows_b):
  683. if row["id"] == last_id:
  684. start_idx = i + 1
  685. break
  686. limit = params[2] if len(params) > 2 else 500
  687. else:
  688. # First batch: params = [workspace, limit]
  689. start_idx = 0
  690. limit = params[1] if len(params) > 1 else 500
  691. end = min(start_idx + limit, len(mock_rows_b))
  692. return mock_rows_b[start_idx:end]
  693. return {}
  694. mock_pg_db.query = AsyncMock(side_effect=mock_query_b)
  695. # Track migration via _run_with_retry for workspace_b
  696. migration_b_executed = []
  697. async def mock_run_with_retry_b(operation, **kwargs):
  698. migration_b_executed.append(True)
  699. migration_state["workspace_b_migration_count"] = len(mock_rows_b)
  700. return None
  701. mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry_b)
  702. # Initialize workspace_b (Case 3 - both tables exist)
  703. with patch("lightrag.kg.postgres_impl.logger"):
  704. await storage_b.initialize()
  705. print("✅ Step 2: Workspace B initialized")
  706. # Verify workspace_b migration happens when new table has no workspace_b data
  707. # but legacy table still has workspace_b data.
  708. assert (
  709. len(migration_b_executed) > 0
  710. ), "Migration should have been executed for workspace_b"
  711. print("✅ Step 2: Migration executed for workspace_b")
  712. print("\n🎉 Case 1c: Sequential workspace migration verification complete!")
  713. print(" - Workspace A: Migrated successfully (only legacy existed)")
  714. print(" - Workspace B: Migrated successfully (new table empty for workspace_b)")