test_postgres_retry_integration.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. """
  2. Integration test suite for PostgreSQL retry mechanism using real database.
  3. This test suite connects to a real PostgreSQL database using credentials from .env
  4. and tests the retry mechanism with actual network failures.
  5. Prerequisites:
  6. 1. PostgreSQL server running and accessible
  7. 2. .env file with POSTGRES_* configuration
  8. 3. asyncpg installed: pip install asyncpg
  9. """
  10. import pytest
  11. import asyncio
  12. import os
  13. import time
  14. from dotenv import load_dotenv
  15. from unittest.mock import patch
  16. from lightrag.kg.postgres_impl import PostgreSQLDB
  17. asyncpg = pytest.importorskip("asyncpg")
  18. # Load environment variables
  19. load_dotenv(dotenv_path=".env", override=False)
  20. @pytest.mark.integration
  21. @pytest.mark.requires_db
  22. class TestPostgresRetryIntegration:
  23. """Integration tests for PostgreSQL retry mechanism with real database."""
  24. @pytest.fixture
  25. def db_config(self):
  26. """Load database configuration from environment variables.
  27. Uses new HA-optimized defaults that match postgres_impl.py ClientManager.get_config():
  28. - 10 retry attempts (up from 3)
  29. - 3.0s initial backoff (up from 0.5s)
  30. - 30.0s max backoff (up from 5.0s)
  31. """
  32. return {
  33. "host": os.getenv("POSTGRES_HOST", "localhost"),
  34. "port": int(os.getenv("POSTGRES_PORT", "5432")),
  35. "user": os.getenv("POSTGRES_USER", "postgres"),
  36. "password": os.getenv("POSTGRES_PASSWORD", ""),
  37. "database": os.getenv("POSTGRES_DATABASE", "postgres"),
  38. "workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"),
  39. "max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")),
  40. # Connection retry configuration - mirrors postgres_impl.py ClientManager.get_config()
  41. # NEW DEFAULTS optimized for HA deployments
  42. "connection_retry_attempts": min(
  43. 100,
  44. int(os.getenv("POSTGRES_CONNECTION_RETRIES", "10")), # 3 → 10
  45. ),
  46. "connection_retry_backoff": min(
  47. 300.0,
  48. float(
  49. os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "3.0")
  50. ), # 0.5 → 3.0
  51. ),
  52. "connection_retry_backoff_max": min(
  53. 600.0,
  54. float(
  55. os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "30.0")
  56. ), # 5.0 → 30.0
  57. ),
  58. "pool_close_timeout": min(
  59. 30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0"))
  60. ),
  61. }
  62. @pytest.mark.asyncio
  63. async def test_real_connection_success(self, db_config):
  64. """
  65. Test successful connection to real PostgreSQL database.
  66. This validates that:
  67. 1. Database credentials are correct
  68. 2. Connection pool initializes properly
  69. 3. Basic query works
  70. """
  71. print("\n" + "=" * 80)
  72. print("INTEGRATION TEST 1: Real Database Connection")
  73. print("=" * 80)
  74. print(
  75. f" → Connecting to {db_config['host']}:{db_config['port']}/{db_config['database']}"
  76. )
  77. db = PostgreSQLDB(db_config)
  78. try:
  79. # Initialize database connection
  80. await db.initdb()
  81. print(" ✓ Connection successful")
  82. # Test simple query
  83. result = await db.query("SELECT 1 as test", multirows=False)
  84. assert result is not None
  85. assert result.get("test") == 1
  86. print(" ✓ Query executed successfully")
  87. print("\n✅ Test passed: Real database connection works")
  88. print("=" * 80)
  89. finally:
  90. if db.pool:
  91. await db.pool.close()
  92. @pytest.mark.asyncio
  93. async def test_simulated_transient_error_with_real_db(self, db_config):
  94. """
  95. Test retry mechanism with simulated transient errors on real database.
  96. Simulates connection failures on first 2 attempts, then succeeds.
  97. Uses new HA defaults (10 retries, 3s backoff).
  98. """
  99. print("\n" + "=" * 80)
  100. print("INTEGRATION TEST 2: Simulated Transient Errors")
  101. print("=" * 80)
  102. db = PostgreSQLDB(db_config)
  103. attempt_count = {"value": 0}
  104. # Original create_pool function
  105. original_create_pool = asyncpg.create_pool
  106. async def mock_create_pool_with_failures(*args, **kwargs):
  107. """Mock that fails first 2 times, then calls real create_pool."""
  108. attempt_count["value"] += 1
  109. print(f" → Connection attempt {attempt_count['value']}")
  110. if attempt_count["value"] <= 2:
  111. print(" ✗ Simulating connection failure")
  112. raise asyncpg.exceptions.ConnectionFailureError(
  113. f"Simulated failure on attempt {attempt_count['value']}"
  114. )
  115. print(" ✓ Allowing real connection")
  116. return await original_create_pool(*args, **kwargs)
  117. try:
  118. # Patch create_pool to simulate failures
  119. with patch(
  120. "asyncpg.create_pool", side_effect=mock_create_pool_with_failures
  121. ):
  122. await db.initdb()
  123. assert (
  124. attempt_count["value"] == 3
  125. ), f"Expected 3 attempts, got {attempt_count['value']}"
  126. assert db.pool is not None, "Pool should be initialized after retries"
  127. # Verify database is actually working
  128. result = await db.query("SELECT 1 as test", multirows=False)
  129. assert result.get("test") == 1
  130. print(
  131. f"\n✅ Test passed: Retry mechanism worked, connected after {attempt_count['value']} attempts"
  132. )
  133. print("=" * 80)
  134. finally:
  135. if db.pool:
  136. await db.pool.close()
  137. @pytest.mark.asyncio
  138. async def test_query_retry_with_real_db(self, db_config):
  139. """
  140. Test query-level retry with simulated connection issues.
  141. Tests that queries retry on transient failures by simulating
  142. a temporary database unavailability.
  143. Uses new HA defaults (10 retries, 3s backoff).
  144. """
  145. print("\n" + "=" * 80)
  146. print("INTEGRATION TEST 3: Query-Level Retry")
  147. print("=" * 80)
  148. db = PostgreSQLDB(db_config)
  149. try:
  150. # First initialize normally
  151. await db.initdb()
  152. print(" ✓ Database initialized")
  153. # Close the pool to simulate connection loss
  154. print(" → Simulating connection loss (closing pool)...")
  155. await db.pool.close()
  156. db.pool = None
  157. # Now query should trigger pool recreation and retry
  158. print(" → Attempting query (should auto-reconnect)...")
  159. result = await db.query("SELECT 1 as test", multirows=False)
  160. assert result.get("test") == 1, "Query should succeed after reconnection"
  161. assert db.pool is not None, "Pool should be recreated"
  162. print(" ✓ Query succeeded after automatic reconnection")
  163. print("\n✅ Test passed: Auto-reconnection works correctly")
  164. print("=" * 80)
  165. finally:
  166. if db.pool:
  167. await db.pool.close()
  168. @pytest.mark.asyncio
  169. async def test_concurrent_queries_with_real_db(self, db_config):
  170. """
  171. Test concurrent queries to validate thread safety and connection pooling.
  172. Runs multiple concurrent queries to ensure no deadlocks or race conditions.
  173. Uses new HA defaults (10 retries, 3s backoff).
  174. """
  175. print("\n" + "=" * 80)
  176. print("INTEGRATION TEST 4: Concurrent Queries")
  177. print("=" * 80)
  178. db = PostgreSQLDB(db_config)
  179. try:
  180. await db.initdb()
  181. print(" ✓ Database initialized")
  182. # Launch 10 concurrent queries
  183. num_queries = 10
  184. print(f" → Launching {num_queries} concurrent queries...")
  185. async def run_query(query_id):
  186. result = await db.query(
  187. f"SELECT {query_id} as id, pg_sleep(0.1)", multirows=False
  188. )
  189. return result.get("id")
  190. start_time = time.time()
  191. tasks = [run_query(i) for i in range(num_queries)]
  192. results = await asyncio.gather(*tasks, return_exceptions=True)
  193. elapsed = time.time() - start_time
  194. # Check results
  195. successful = sum(1 for r in results if not isinstance(r, Exception))
  196. failed = sum(1 for r in results if isinstance(r, Exception))
  197. print(f" → Completed in {elapsed:.2f}s")
  198. print(f" → Results: {successful} successful, {failed} failed")
  199. assert (
  200. successful == num_queries
  201. ), f"All {num_queries} queries should succeed"
  202. assert failed == 0, "No queries should fail"
  203. print("\n✅ Test passed: All concurrent queries succeeded, no deadlocks")
  204. print("=" * 80)
  205. finally:
  206. if db.pool:
  207. await db.pool.close()
  208. @pytest.mark.asyncio
  209. async def test_pool_close_timeout_real(self, db_config):
  210. """
  211. Test pool close timeout protection with real database.
  212. Uses new HA defaults (10 retries, 3s backoff).
  213. """
  214. print("\n" + "=" * 80)
  215. print("INTEGRATION TEST 5: Pool Close Timeout")
  216. print("=" * 80)
  217. db = PostgreSQLDB(db_config)
  218. try:
  219. await db.initdb()
  220. print(" ✓ Database initialized")
  221. # Trigger pool reset (which includes close)
  222. print(" → Triggering pool reset...")
  223. start_time = time.time()
  224. await db._reset_pool()
  225. elapsed = time.time() - start_time
  226. print(f" ✓ Pool reset completed in {elapsed:.2f}s")
  227. assert db.pool is None, "Pool should be None after reset"
  228. assert (
  229. elapsed < db.pool_close_timeout + 1
  230. ), "Reset should complete within timeout"
  231. print("\n✅ Test passed: Pool reset handled correctly")
  232. print("=" * 80)
  233. finally:
  234. # Already closed in test
  235. pass
  236. @pytest.mark.asyncio
  237. async def test_configuration_from_env(self, db_config):
  238. """
  239. Test that configuration is correctly loaded from environment variables.
  240. """
  241. print("\n" + "=" * 80)
  242. print("INTEGRATION TEST 6: Environment Configuration")
  243. print("=" * 80)
  244. db = PostgreSQLDB(db_config)
  245. print(" → Configuration loaded:")
  246. print(f" • Host: {db.host}")
  247. print(f" • Port: {db.port}")
  248. print(f" • Database: {db.database}")
  249. print(f" • User: {db.user}")
  250. print(f" • Workspace: {db.workspace}")
  251. print(f" • Max Connections: {db.max}")
  252. print(f" • Retry Attempts: {db.connection_retry_attempts}")
  253. print(f" • Retry Backoff: {db.connection_retry_backoff}s")
  254. print(f" • Max Backoff: {db.connection_retry_backoff_max}s")
  255. print(f" • Pool Close Timeout: {db.pool_close_timeout}s")
  256. # Verify required fields are present
  257. assert db.host, "Host should be configured"
  258. assert db.port, "Port should be configured"
  259. assert db.user, "User should be configured"
  260. assert db.database, "Database should be configured"
  261. print("\n✅ Test passed: All configuration loaded correctly from .env")
  262. print("=" * 80)
  263. def run_integration_tests():
  264. """Run all integration tests with detailed output."""
  265. print("\n" + "=" * 80)
  266. print("POSTGRESQL RETRY MECHANISM - INTEGRATION TESTS")
  267. print("Testing with REAL database from .env configuration")
  268. print("=" * 80)
  269. # Check if database configuration exists
  270. if not os.getenv("POSTGRES_HOST"):
  271. print("\n⚠️ WARNING: No POSTGRES_HOST in .env file")
  272. print("Please ensure .env file exists with PostgreSQL configuration.")
  273. return
  274. print("\nRunning integration tests...\n")
  275. # Run pytest with verbose output
  276. pytest.main(
  277. [
  278. __file__,
  279. "-v",
  280. "-s", # Don't capture output
  281. "--tb=short", # Short traceback format
  282. "--color=yes",
  283. "-x", # Stop on first failure
  284. ]
  285. )
  286. if __name__ == "__main__":
  287. run_integration_tests()