| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- """
- Integration test suite for PostgreSQL retry mechanism using real database.
- This test suite connects to a real PostgreSQL database using credentials from .env
- and tests the retry mechanism with actual network failures.
- Prerequisites:
- 1. PostgreSQL server running and accessible
- 2. .env file with POSTGRES_* configuration
- 3. asyncpg installed: pip install asyncpg
- """
- import pytest
- import asyncio
- import os
- import time
- from dotenv import load_dotenv
- from unittest.mock import patch
- from lightrag.kg.postgres_impl import PostgreSQLDB
- asyncpg = pytest.importorskip("asyncpg")
- # Load environment variables
- load_dotenv(dotenv_path=".env", override=False)
- @pytest.mark.integration
- @pytest.mark.requires_db
- class TestPostgresRetryIntegration:
- """Integration tests for PostgreSQL retry mechanism with real database."""
- @pytest.fixture
- def db_config(self):
- """Load database configuration from environment variables.
- Uses new HA-optimized defaults that match postgres_impl.py ClientManager.get_config():
- - 10 retry attempts (up from 3)
- - 3.0s initial backoff (up from 0.5s)
- - 30.0s max backoff (up from 5.0s)
- """
- return {
- "host": os.getenv("POSTGRES_HOST", "localhost"),
- "port": int(os.getenv("POSTGRES_PORT", "5432")),
- "user": os.getenv("POSTGRES_USER", "postgres"),
- "password": os.getenv("POSTGRES_PASSWORD", ""),
- "database": os.getenv("POSTGRES_DATABASE", "postgres"),
- "workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"),
- "max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")),
- # Connection retry configuration - mirrors postgres_impl.py ClientManager.get_config()
- # NEW DEFAULTS optimized for HA deployments
- "connection_retry_attempts": min(
- 100,
- int(os.getenv("POSTGRES_CONNECTION_RETRIES", "10")), # 3 → 10
- ),
- "connection_retry_backoff": min(
- 300.0,
- float(
- os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "3.0")
- ), # 0.5 → 3.0
- ),
- "connection_retry_backoff_max": min(
- 600.0,
- float(
- os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "30.0")
- ), # 5.0 → 30.0
- ),
- "pool_close_timeout": min(
- 30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0"))
- ),
- }
- @pytest.mark.asyncio
- async def test_real_connection_success(self, db_config):
- """
- Test successful connection to real PostgreSQL database.
- This validates that:
- 1. Database credentials are correct
- 2. Connection pool initializes properly
- 3. Basic query works
- """
- print("\n" + "=" * 80)
- print("INTEGRATION TEST 1: Real Database Connection")
- print("=" * 80)
- print(
- f" → Connecting to {db_config['host']}:{db_config['port']}/{db_config['database']}"
- )
- db = PostgreSQLDB(db_config)
- try:
- # Initialize database connection
- await db.initdb()
- print(" ✓ Connection successful")
- # Test simple query
- result = await db.query("SELECT 1 as test", multirows=False)
- assert result is not None
- assert result.get("test") == 1
- print(" ✓ Query executed successfully")
- print("\n✅ Test passed: Real database connection works")
- print("=" * 80)
- finally:
- if db.pool:
- await db.pool.close()
- @pytest.mark.asyncio
- async def test_simulated_transient_error_with_real_db(self, db_config):
- """
- Test retry mechanism with simulated transient errors on real database.
- Simulates connection failures on first 2 attempts, then succeeds.
- Uses new HA defaults (10 retries, 3s backoff).
- """
- print("\n" + "=" * 80)
- print("INTEGRATION TEST 2: Simulated Transient Errors")
- print("=" * 80)
- db = PostgreSQLDB(db_config)
- attempt_count = {"value": 0}
- # Original create_pool function
- original_create_pool = asyncpg.create_pool
- async def mock_create_pool_with_failures(*args, **kwargs):
- """Mock that fails first 2 times, then calls real create_pool."""
- attempt_count["value"] += 1
- print(f" → Connection attempt {attempt_count['value']}")
- if attempt_count["value"] <= 2:
- print(" ✗ Simulating connection failure")
- raise asyncpg.exceptions.ConnectionFailureError(
- f"Simulated failure on attempt {attempt_count['value']}"
- )
- print(" ✓ Allowing real connection")
- return await original_create_pool(*args, **kwargs)
- try:
- # Patch create_pool to simulate failures
- with patch(
- "asyncpg.create_pool", side_effect=mock_create_pool_with_failures
- ):
- await db.initdb()
- assert (
- attempt_count["value"] == 3
- ), f"Expected 3 attempts, got {attempt_count['value']}"
- assert db.pool is not None, "Pool should be initialized after retries"
- # Verify database is actually working
- result = await db.query("SELECT 1 as test", multirows=False)
- assert result.get("test") == 1
- print(
- f"\n✅ Test passed: Retry mechanism worked, connected after {attempt_count['value']} attempts"
- )
- print("=" * 80)
- finally:
- if db.pool:
- await db.pool.close()
- @pytest.mark.asyncio
- async def test_query_retry_with_real_db(self, db_config):
- """
- Test query-level retry with simulated connection issues.
- Tests that queries retry on transient failures by simulating
- a temporary database unavailability.
- Uses new HA defaults (10 retries, 3s backoff).
- """
- print("\n" + "=" * 80)
- print("INTEGRATION TEST 3: Query-Level Retry")
- print("=" * 80)
- db = PostgreSQLDB(db_config)
- try:
- # First initialize normally
- await db.initdb()
- print(" ✓ Database initialized")
- # Close the pool to simulate connection loss
- print(" → Simulating connection loss (closing pool)...")
- await db.pool.close()
- db.pool = None
- # Now query should trigger pool recreation and retry
- print(" → Attempting query (should auto-reconnect)...")
- result = await db.query("SELECT 1 as test", multirows=False)
- assert result.get("test") == 1, "Query should succeed after reconnection"
- assert db.pool is not None, "Pool should be recreated"
- print(" ✓ Query succeeded after automatic reconnection")
- print("\n✅ Test passed: Auto-reconnection works correctly")
- print("=" * 80)
- finally:
- if db.pool:
- await db.pool.close()
- @pytest.mark.asyncio
- async def test_concurrent_queries_with_real_db(self, db_config):
- """
- Test concurrent queries to validate thread safety and connection pooling.
- Runs multiple concurrent queries to ensure no deadlocks or race conditions.
- Uses new HA defaults (10 retries, 3s backoff).
- """
- print("\n" + "=" * 80)
- print("INTEGRATION TEST 4: Concurrent Queries")
- print("=" * 80)
- db = PostgreSQLDB(db_config)
- try:
- await db.initdb()
- print(" ✓ Database initialized")
- # Launch 10 concurrent queries
- num_queries = 10
- print(f" → Launching {num_queries} concurrent queries...")
- async def run_query(query_id):
- result = await db.query(
- f"SELECT {query_id} as id, pg_sleep(0.1)", multirows=False
- )
- return result.get("id")
- start_time = time.time()
- tasks = [run_query(i) for i in range(num_queries)]
- results = await asyncio.gather(*tasks, return_exceptions=True)
- elapsed = time.time() - start_time
- # Check results
- successful = sum(1 for r in results if not isinstance(r, Exception))
- failed = sum(1 for r in results if isinstance(r, Exception))
- print(f" → Completed in {elapsed:.2f}s")
- print(f" → Results: {successful} successful, {failed} failed")
- assert (
- successful == num_queries
- ), f"All {num_queries} queries should succeed"
- assert failed == 0, "No queries should fail"
- print("\n✅ Test passed: All concurrent queries succeeded, no deadlocks")
- print("=" * 80)
- finally:
- if db.pool:
- await db.pool.close()
- @pytest.mark.asyncio
- async def test_pool_close_timeout_real(self, db_config):
- """
- Test pool close timeout protection with real database.
- Uses new HA defaults (10 retries, 3s backoff).
- """
- print("\n" + "=" * 80)
- print("INTEGRATION TEST 5: Pool Close Timeout")
- print("=" * 80)
- db = PostgreSQLDB(db_config)
- try:
- await db.initdb()
- print(" ✓ Database initialized")
- # Trigger pool reset (which includes close)
- print(" → Triggering pool reset...")
- start_time = time.time()
- await db._reset_pool()
- elapsed = time.time() - start_time
- print(f" ✓ Pool reset completed in {elapsed:.2f}s")
- assert db.pool is None, "Pool should be None after reset"
- assert (
- elapsed < db.pool_close_timeout + 1
- ), "Reset should complete within timeout"
- print("\n✅ Test passed: Pool reset handled correctly")
- print("=" * 80)
- finally:
- # Already closed in test
- pass
- @pytest.mark.asyncio
- async def test_configuration_from_env(self, db_config):
- """
- Test that configuration is correctly loaded from environment variables.
- """
- print("\n" + "=" * 80)
- print("INTEGRATION TEST 6: Environment Configuration")
- print("=" * 80)
- db = PostgreSQLDB(db_config)
- print(" → Configuration loaded:")
- print(f" • Host: {db.host}")
- print(f" • Port: {db.port}")
- print(f" • Database: {db.database}")
- print(f" • User: {db.user}")
- print(f" • Workspace: {db.workspace}")
- print(f" • Max Connections: {db.max}")
- print(f" • Retry Attempts: {db.connection_retry_attempts}")
- print(f" • Retry Backoff: {db.connection_retry_backoff}s")
- print(f" • Max Backoff: {db.connection_retry_backoff_max}s")
- print(f" • Pool Close Timeout: {db.pool_close_timeout}s")
- # Verify required fields are present
- assert db.host, "Host should be configured"
- assert db.port, "Port should be configured"
- assert db.user, "User should be configured"
- assert db.database, "Database should be configured"
- print("\n✅ Test passed: All configuration loaded correctly from .env")
- print("=" * 80)
- def run_integration_tests():
- """Run all integration tests with detailed output."""
- print("\n" + "=" * 80)
- print("POSTGRESQL RETRY MECHANISM - INTEGRATION TESTS")
- print("Testing with REAL database from .env configuration")
- print("=" * 80)
- # Check if database configuration exists
- if not os.getenv("POSTGRES_HOST"):
- print("\n⚠️ WARNING: No POSTGRES_HOST in .env file")
- print("Please ensure .env file exists with PostgreSQL configuration.")
- return
- print("\nRunning integration tests...\n")
- # Run pytest with verbose output
- pytest.main(
- [
- __file__,
- "-v",
- "-s", # Don't capture output
- "--tb=short", # Short traceback format
- "--color=yes",
- "-x", # Stop on first failure
- ]
- )
- if __name__ == "__main__":
- run_integration_tests()
|