""" Integration test suite for PostgreSQL retry mechanism using real database. This test suite connects to a real PostgreSQL database using credentials from .env and tests the retry mechanism with actual network failures. Prerequisites: 1. PostgreSQL server running and accessible 2. .env file with POSTGRES_* configuration 3. asyncpg installed: pip install asyncpg """ import pytest import asyncio import os import time from dotenv import load_dotenv from unittest.mock import patch from lightrag.kg.postgres_impl import PostgreSQLDB asyncpg = pytest.importorskip("asyncpg") # Load environment variables load_dotenv(dotenv_path=".env", override=False) @pytest.mark.integration @pytest.mark.requires_db class TestPostgresRetryIntegration: """Integration tests for PostgreSQL retry mechanism with real database.""" @pytest.fixture def db_config(self): """Load database configuration from environment variables. Uses new HA-optimized defaults that match postgres_impl.py ClientManager.get_config(): - 10 retry attempts (up from 3) - 3.0s initial backoff (up from 0.5s) - 30.0s max backoff (up from 5.0s) """ return { "host": os.getenv("POSTGRES_HOST", "localhost"), "port": int(os.getenv("POSTGRES_PORT", "5432")), "user": os.getenv("POSTGRES_USER", "postgres"), "password": os.getenv("POSTGRES_PASSWORD", ""), "database": os.getenv("POSTGRES_DATABASE", "postgres"), "workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"), "max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")), # Connection retry configuration - mirrors postgres_impl.py ClientManager.get_config() # NEW DEFAULTS optimized for HA deployments "connection_retry_attempts": min( 100, int(os.getenv("POSTGRES_CONNECTION_RETRIES", "10")), # 3 → 10 ), "connection_retry_backoff": min( 300.0, float( os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "3.0") ), # 0.5 → 3.0 ), "connection_retry_backoff_max": min( 600.0, float( os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "30.0") ), # 5.0 → 30.0 ), "pool_close_timeout": min( 30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0")) ), } @pytest.mark.asyncio async def test_real_connection_success(self, db_config): """ Test successful connection to real PostgreSQL database. This validates that: 1. Database credentials are correct 2. Connection pool initializes properly 3. Basic query works """ print("\n" + "=" * 80) print("INTEGRATION TEST 1: Real Database Connection") print("=" * 80) print( f" → Connecting to {db_config['host']}:{db_config['port']}/{db_config['database']}" ) db = PostgreSQLDB(db_config) try: # Initialize database connection await db.initdb() print(" ✓ Connection successful") # Test simple query result = await db.query("SELECT 1 as test", multirows=False) assert result is not None assert result.get("test") == 1 print(" ✓ Query executed successfully") print("\n✅ Test passed: Real database connection works") print("=" * 80) finally: if db.pool: await db.pool.close() @pytest.mark.asyncio async def test_simulated_transient_error_with_real_db(self, db_config): """ Test retry mechanism with simulated transient errors on real database. Simulates connection failures on first 2 attempts, then succeeds. Uses new HA defaults (10 retries, 3s backoff). """ print("\n" + "=" * 80) print("INTEGRATION TEST 2: Simulated Transient Errors") print("=" * 80) db = PostgreSQLDB(db_config) attempt_count = {"value": 0} # Original create_pool function original_create_pool = asyncpg.create_pool async def mock_create_pool_with_failures(*args, **kwargs): """Mock that fails first 2 times, then calls real create_pool.""" attempt_count["value"] += 1 print(f" → Connection attempt {attempt_count['value']}") if attempt_count["value"] <= 2: print(" ✗ Simulating connection failure") raise asyncpg.exceptions.ConnectionFailureError( f"Simulated failure on attempt {attempt_count['value']}" ) print(" ✓ Allowing real connection") return await original_create_pool(*args, **kwargs) try: # Patch create_pool to simulate failures with patch( "asyncpg.create_pool", side_effect=mock_create_pool_with_failures ): await db.initdb() assert ( attempt_count["value"] == 3 ), f"Expected 3 attempts, got {attempt_count['value']}" assert db.pool is not None, "Pool should be initialized after retries" # Verify database is actually working result = await db.query("SELECT 1 as test", multirows=False) assert result.get("test") == 1 print( f"\n✅ Test passed: Retry mechanism worked, connected after {attempt_count['value']} attempts" ) print("=" * 80) finally: if db.pool: await db.pool.close() @pytest.mark.asyncio async def test_query_retry_with_real_db(self, db_config): """ Test query-level retry with simulated connection issues. Tests that queries retry on transient failures by simulating a temporary database unavailability. Uses new HA defaults (10 retries, 3s backoff). """ print("\n" + "=" * 80) print("INTEGRATION TEST 3: Query-Level Retry") print("=" * 80) db = PostgreSQLDB(db_config) try: # First initialize normally await db.initdb() print(" ✓ Database initialized") # Close the pool to simulate connection loss print(" → Simulating connection loss (closing pool)...") await db.pool.close() db.pool = None # Now query should trigger pool recreation and retry print(" → Attempting query (should auto-reconnect)...") result = await db.query("SELECT 1 as test", multirows=False) assert result.get("test") == 1, "Query should succeed after reconnection" assert db.pool is not None, "Pool should be recreated" print(" ✓ Query succeeded after automatic reconnection") print("\n✅ Test passed: Auto-reconnection works correctly") print("=" * 80) finally: if db.pool: await db.pool.close() @pytest.mark.asyncio async def test_concurrent_queries_with_real_db(self, db_config): """ Test concurrent queries to validate thread safety and connection pooling. Runs multiple concurrent queries to ensure no deadlocks or race conditions. Uses new HA defaults (10 retries, 3s backoff). """ print("\n" + "=" * 80) print("INTEGRATION TEST 4: Concurrent Queries") print("=" * 80) db = PostgreSQLDB(db_config) try: await db.initdb() print(" ✓ Database initialized") # Launch 10 concurrent queries num_queries = 10 print(f" → Launching {num_queries} concurrent queries...") async def run_query(query_id): result = await db.query( f"SELECT {query_id} as id, pg_sleep(0.1)", multirows=False ) return result.get("id") start_time = time.time() tasks = [run_query(i) for i in range(num_queries)] results = await asyncio.gather(*tasks, return_exceptions=True) elapsed = time.time() - start_time # Check results successful = sum(1 for r in results if not isinstance(r, Exception)) failed = sum(1 for r in results if isinstance(r, Exception)) print(f" → Completed in {elapsed:.2f}s") print(f" → Results: {successful} successful, {failed} failed") assert ( successful == num_queries ), f"All {num_queries} queries should succeed" assert failed == 0, "No queries should fail" print("\n✅ Test passed: All concurrent queries succeeded, no deadlocks") print("=" * 80) finally: if db.pool: await db.pool.close() @pytest.mark.asyncio async def test_pool_close_timeout_real(self, db_config): """ Test pool close timeout protection with real database. Uses new HA defaults (10 retries, 3s backoff). """ print("\n" + "=" * 80) print("INTEGRATION TEST 5: Pool Close Timeout") print("=" * 80) db = PostgreSQLDB(db_config) try: await db.initdb() print(" ✓ Database initialized") # Trigger pool reset (which includes close) print(" → Triggering pool reset...") start_time = time.time() await db._reset_pool() elapsed = time.time() - start_time print(f" ✓ Pool reset completed in {elapsed:.2f}s") assert db.pool is None, "Pool should be None after reset" assert ( elapsed < db.pool_close_timeout + 1 ), "Reset should complete within timeout" print("\n✅ Test passed: Pool reset handled correctly") print("=" * 80) finally: # Already closed in test pass @pytest.mark.asyncio async def test_configuration_from_env(self, db_config): """ Test that configuration is correctly loaded from environment variables. """ print("\n" + "=" * 80) print("INTEGRATION TEST 6: Environment Configuration") print("=" * 80) db = PostgreSQLDB(db_config) print(" → Configuration loaded:") print(f" • Host: {db.host}") print(f" • Port: {db.port}") print(f" • Database: {db.database}") print(f" • User: {db.user}") print(f" • Workspace: {db.workspace}") print(f" • Max Connections: {db.max}") print(f" • Retry Attempts: {db.connection_retry_attempts}") print(f" • Retry Backoff: {db.connection_retry_backoff}s") print(f" • Max Backoff: {db.connection_retry_backoff_max}s") print(f" • Pool Close Timeout: {db.pool_close_timeout}s") # Verify required fields are present assert db.host, "Host should be configured" assert db.port, "Port should be configured" assert db.user, "User should be configured" assert db.database, "Database should be configured" print("\n✅ Test passed: All configuration loaded correctly from .env") print("=" * 80) def run_integration_tests(): """Run all integration tests with detailed output.""" print("\n" + "=" * 80) print("POSTGRESQL RETRY MECHANISM - INTEGRATION TESTS") print("Testing with REAL database from .env configuration") print("=" * 80) # Check if database configuration exists if not os.getenv("POSTGRES_HOST"): print("\n⚠️ WARNING: No POSTGRES_HOST in .env file") print("Please ensure .env file exists with PostgreSQL configuration.") return print("\nRunning integration tests...\n") # Run pytest with verbose output pytest.main( [ __file__, "-v", "-s", # Don't capture output "--tb=short", # Short traceback format "--color=yes", "-x", # Stop on first failure ] ) if __name__ == "__main__": run_integration_tests()