test_llm_cache_tools_opensearch.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. """
  2. Offline tests for OpenSearch support in LLM cache tools.
  3. """
  4. from types import SimpleNamespace
  5. from unittest.mock import AsyncMock, patch
  6. import pytest
  7. pytest.importorskip(
  8. "opensearchpy",
  9. reason="opensearchpy is required for OpenSearch tool tests",
  10. )
  11. from lightrag.tools.clean_llm_query_cache import CleanupStats, CleanupTool
  12. from lightrag.tools.migrate_llm_cache import MigrationTool
  13. pytestmark = pytest.mark.offline
  14. class FakeOpenSearchStorage:
  15. def __init__(self, batches, workspace="test-workspace"):
  16. self._batches = batches
  17. self.workspace = workspace
  18. self.deleted_batches = []
  19. async def _iter_raw_docs(self, batch_size=1000):
  20. for batch in self._batches:
  21. yield batch
  22. async def delete(self, ids):
  23. self.deleted_batches.append(list(ids))
  24. def _flatten(batches):
  25. return [item for batch in batches for item in batch]
  26. class TestCleanupToolOpenSearch:
  27. @pytest.mark.asyncio
  28. async def test_count_query_caches_opensearch(self):
  29. tool = CleanupTool()
  30. storage = FakeOpenSearchStorage(
  31. [
  32. [
  33. {"_id": "mix:query:1", "_source": {}},
  34. {"_id": "mix:keywords:1", "_source": {}},
  35. {"_id": "default:extract:1", "_source": {}},
  36. ],
  37. [
  38. {"_id": "hybrid:query:1", "_source": {}},
  39. {"_id": "local:keywords:1", "_source": {}},
  40. {"_id": "other:key:1", "_source": {}},
  41. ],
  42. ]
  43. )
  44. counts = await tool.count_query_caches(storage, "OpenSearchKVStorage")
  45. assert counts["mix"] == {"query": 1, "keywords": 1}
  46. assert counts["hybrid"] == {"query": 1, "keywords": 0}
  47. assert counts["local"] == {"query": 0, "keywords": 1}
  48. assert counts["global"] == {"query": 0, "keywords": 0}
  49. @pytest.mark.asyncio
  50. @pytest.mark.parametrize(
  51. ("cleanup_type", "expected_ids"),
  52. [
  53. (
  54. "all",
  55. [
  56. "mix:query:1",
  57. "mix:keywords:1",
  58. "global:query:1",
  59. "local:keywords:1",
  60. ],
  61. ),
  62. ("query", ["mix:query:1", "global:query:1"]),
  63. ("keywords", ["mix:keywords:1", "local:keywords:1"]),
  64. ],
  65. )
  66. async def test_delete_query_caches_opensearch(self, cleanup_type, expected_ids):
  67. tool = CleanupTool()
  68. tool.batch_size = 2
  69. storage = FakeOpenSearchStorage(
  70. [
  71. [
  72. {"_id": "mix:query:1", "_source": {}},
  73. {"_id": "mix:keywords:1", "_source": {}},
  74. ],
  75. [
  76. {"_id": "global:query:1", "_source": {}},
  77. {"_id": "local:keywords:1", "_source": {}},
  78. {"_id": "default:extract:1", "_source": {}},
  79. ],
  80. ]
  81. )
  82. stats = CleanupStats()
  83. await tool.delete_query_caches(
  84. storage, "OpenSearchKVStorage", cleanup_type, stats
  85. )
  86. assert _flatten(storage.deleted_batches) == expected_ids
  87. assert all(len(batch) <= 2 for batch in storage.deleted_batches)
  88. assert stats.successfully_deleted == len(expected_ids)
  89. assert stats.successful_batches == len(storage.deleted_batches)
  90. def test_check_config_ini_for_storage_opensearch(self, tmp_path, monkeypatch):
  91. monkeypatch.chdir(tmp_path)
  92. (tmp_path / "config.ini").write_text("[opensearch]\nhosts = localhost:9200\n")
  93. assert CleanupTool().check_config_ini_for_storage("OpenSearchKVStorage")
  94. def test_get_storage_class_opensearch(self):
  95. cleanup_cls = CleanupTool().get_storage_class("OpenSearchKVStorage")
  96. migrate_cls = MigrationTool().get_storage_class("OpenSearchKVStorage")
  97. assert cleanup_cls.__name__ == "OpenSearchKVStorage"
  98. assert migrate_cls.__name__ == "OpenSearchKVStorage"
  99. class TestMigrationToolOpenSearch:
  100. @pytest.mark.asyncio
  101. async def test_count_and_stream_default_caches_opensearch(self):
  102. tool = MigrationTool()
  103. storage = FakeOpenSearchStorage(
  104. [
  105. [
  106. {"_id": "default:extract:1", "_source": {"return": "a"}},
  107. {"_id": "mix:query:1", "_source": {"return": "ignored"}},
  108. ],
  109. [
  110. {"_id": "default:summary:1", "_source": {"return": "b"}},
  111. {"_id": "default:extract:2", "_source": {"return": "c"}},
  112. ],
  113. ]
  114. )
  115. count = await tool.count_default_caches(storage, "OpenSearchKVStorage")
  116. streamed = [
  117. batch
  118. async for batch in tool.stream_default_caches(
  119. storage, "OpenSearchKVStorage", batch_size=2
  120. )
  121. ]
  122. assert count == 3
  123. assert streamed == [
  124. {
  125. "default:extract:1": {"return": "a"},
  126. "default:summary:1": {"return": "b"},
  127. },
  128. {"default:extract:2": {"return": "c"}},
  129. ]
  130. def test_count_available_storage_types_includes_opensearch(
  131. self, tmp_path, monkeypatch
  132. ):
  133. monkeypatch.chdir(tmp_path)
  134. (tmp_path / "config.ini").write_text("[opensearch]\nhosts = localhost:9200\n")
  135. with patch.dict("os.environ", {}, clear=True):
  136. assert MigrationTool().count_available_storage_types() == 2
  137. @pytest.mark.asyncio
  138. async def test_setup_storage_returns_effective_workspace(self, monkeypatch):
  139. tool = MigrationTool()
  140. fake_storage = SimpleNamespace(workspace="forced-workspace")
  141. monkeypatch.setattr(tool, "check_env_vars", lambda _: True)
  142. monkeypatch.setattr(
  143. tool, "initialize_storage", AsyncMock(return_value=fake_storage)
  144. )
  145. monkeypatch.setattr(tool, "count_default_caches", AsyncMock(return_value=3))
  146. with patch("builtins.input", return_value="5"):
  147. storage, storage_name, workspace, total_count = await tool.setup_storage(
  148. "Source", use_streaming=True
  149. )
  150. assert storage is fake_storage
  151. assert storage_name == "OpenSearchKVStorage"
  152. assert workspace == "forced-workspace"
  153. assert total_count == 3