test_postgres_performance_timing.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import importlib
  2. from unittest.mock import AsyncMock, MagicMock, patch
  3. import pytest
  4. import lightrag.utils as utils_module
  5. from lightrag.kg.postgres_impl import PGGraphStorage, PostgreSQLDB
  6. from lightrag.namespace import NameSpace
  7. def make_db() -> PostgreSQLDB:
  8. return PostgreSQLDB(
  9. {
  10. "host": "localhost",
  11. "port": 5432,
  12. "user": "postgres",
  13. "password": "postgres",
  14. "database": "postgres",
  15. "workspace": "test_ws",
  16. "max_connections": 10,
  17. "connection_retry_attempts": 3,
  18. "connection_retry_backoff": 0,
  19. "connection_retry_backoff_max": 0,
  20. "pool_close_timeout": 5.0,
  21. }
  22. )
  23. @pytest.mark.asyncio
  24. async def test_execute_timing_logs_success():
  25. db = make_db()
  26. async def fake_run_with_retry(operation, **kwargs):
  27. conn = AsyncMock()
  28. conn.execute = AsyncMock(return_value="INSERT 0 1")
  29. await operation(conn)
  30. db._run_with_retry = AsyncMock(side_effect=fake_run_with_retry)
  31. with patch("lightrag.kg.postgres_impl.performance_timing_log") as timing_log:
  32. await db.execute("SELECT 1", timing_label="test label")
  33. assert any(
  34. "connection.execute completed" in call.args[0]
  35. for call in timing_log.call_args_list
  36. )
  37. @pytest.mark.asyncio
  38. async def test_execute_timing_logs_failure():
  39. db = make_db()
  40. async def fake_run_with_retry(operation, **kwargs):
  41. conn = AsyncMock()
  42. conn.execute = AsyncMock(side_effect=RuntimeError("boom"))
  43. await operation(conn)
  44. db._run_with_retry = AsyncMock(side_effect=fake_run_with_retry)
  45. with patch("lightrag.kg.postgres_impl.performance_timing_log") as timing_log:
  46. with pytest.raises(RuntimeError, match="boom"):
  47. await db.execute("SELECT 1", timing_label="test label")
  48. assert any(
  49. "connection.execute failed" in call.args[0]
  50. for call in timing_log.call_args_list
  51. )
  52. @pytest.mark.asyncio
  53. async def test_graph_upsert_node_passes_timing_label():
  54. storage = PGGraphStorage(
  55. namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
  56. workspace="test_ws",
  57. global_config={},
  58. embedding_func=AsyncMock(),
  59. )
  60. storage.graph_name = "test_graph"
  61. storage._query = AsyncMock(return_value=[])
  62. await storage.upsert_node(
  63. "node-1",
  64. {
  65. "entity_id": "node-1",
  66. "description": "desc",
  67. },
  68. )
  69. assert storage._query.await_args.kwargs["timing_label"] == (
  70. "test_ws PGGraphStorage.upsert_node"
  71. )
  72. @pytest.mark.asyncio
  73. async def test_graph_upsert_edge_passes_timing_label():
  74. storage = PGGraphStorage(
  75. namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
  76. workspace="test_ws",
  77. global_config={},
  78. embedding_func=AsyncMock(),
  79. )
  80. storage.graph_name = "test_graph"
  81. # upsert_edge drives the lock + cypher via db._run_with_retry, not _query.
  82. storage.db = MagicMock()
  83. storage.db._run_with_retry = AsyncMock(return_value=None)
  84. await storage.upsert_edge(
  85. "node-1",
  86. "node-2",
  87. {
  88. "weight": 1.0,
  89. "description": "desc",
  90. },
  91. )
  92. assert storage.db._run_with_retry.await_args.kwargs["timing_label"] == (
  93. "test_ws PGGraphStorage.upsert_edge"
  94. )
  95. def test_performance_timing_logs_reads_new_env_only(monkeypatch):
  96. with monkeypatch.context() as m:
  97. m.setenv("LIGHTRAG_DOC_QUERY_TIMING_LOGS", "false")
  98. m.setenv("LIGHTRAG_PERFORMANCE_TIMING_LOGS", "true")
  99. reloaded = importlib.reload(utils_module)
  100. assert reloaded.PERFORMANCE_TIMING_LOGS is True
  101. importlib.reload(utils_module)
  102. def test_performance_timing_logs_ignores_old_env(monkeypatch):
  103. with monkeypatch.context() as m:
  104. m.setenv("LIGHTRAG_DOC_QUERY_TIMING_LOGS", "true")
  105. m.setenv("LIGHTRAG_PERFORMANCE_TIMING_LOGS", "false")
  106. reloaded = importlib.reload(utils_module)
  107. assert reloaded.PERFORMANCE_TIMING_LOGS is False
  108. importlib.reload(utils_module)