| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- """
- Offline tests for OpenSearch support in LLM cache tools.
- """
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, patch
- import pytest
- pytest.importorskip(
- "opensearchpy",
- reason="opensearchpy is required for OpenSearch tool tests",
- )
- from lightrag.tools.clean_llm_query_cache import CleanupStats, CleanupTool
- from lightrag.tools.migrate_llm_cache import MigrationTool
- pytestmark = pytest.mark.offline
- class FakeOpenSearchStorage:
- def __init__(self, batches, workspace="test-workspace"):
- self._batches = batches
- self.workspace = workspace
- self.deleted_batches = []
- async def _iter_raw_docs(self, batch_size=1000):
- for batch in self._batches:
- yield batch
- async def delete(self, ids):
- self.deleted_batches.append(list(ids))
- def _flatten(batches):
- return [item for batch in batches for item in batch]
- class TestCleanupToolOpenSearch:
- @pytest.mark.asyncio
- async def test_count_query_caches_opensearch(self):
- tool = CleanupTool()
- storage = FakeOpenSearchStorage(
- [
- [
- {"_id": "mix:query:1", "_source": {}},
- {"_id": "mix:keywords:1", "_source": {}},
- {"_id": "default:extract:1", "_source": {}},
- ],
- [
- {"_id": "hybrid:query:1", "_source": {}},
- {"_id": "local:keywords:1", "_source": {}},
- {"_id": "other:key:1", "_source": {}},
- ],
- ]
- )
- counts = await tool.count_query_caches(storage, "OpenSearchKVStorage")
- assert counts["mix"] == {"query": 1, "keywords": 1}
- assert counts["hybrid"] == {"query": 1, "keywords": 0}
- assert counts["local"] == {"query": 0, "keywords": 1}
- assert counts["global"] == {"query": 0, "keywords": 0}
- @pytest.mark.asyncio
- @pytest.mark.parametrize(
- ("cleanup_type", "expected_ids"),
- [
- (
- "all",
- [
- "mix:query:1",
- "mix:keywords:1",
- "global:query:1",
- "local:keywords:1",
- ],
- ),
- ("query", ["mix:query:1", "global:query:1"]),
- ("keywords", ["mix:keywords:1", "local:keywords:1"]),
- ],
- )
- async def test_delete_query_caches_opensearch(self, cleanup_type, expected_ids):
- tool = CleanupTool()
- tool.batch_size = 2
- storage = FakeOpenSearchStorage(
- [
- [
- {"_id": "mix:query:1", "_source": {}},
- {"_id": "mix:keywords:1", "_source": {}},
- ],
- [
- {"_id": "global:query:1", "_source": {}},
- {"_id": "local:keywords:1", "_source": {}},
- {"_id": "default:extract:1", "_source": {}},
- ],
- ]
- )
- stats = CleanupStats()
- await tool.delete_query_caches(
- storage, "OpenSearchKVStorage", cleanup_type, stats
- )
- assert _flatten(storage.deleted_batches) == expected_ids
- assert all(len(batch) <= 2 for batch in storage.deleted_batches)
- assert stats.successfully_deleted == len(expected_ids)
- assert stats.successful_batches == len(storage.deleted_batches)
- def test_check_config_ini_for_storage_opensearch(self, tmp_path, monkeypatch):
- monkeypatch.chdir(tmp_path)
- (tmp_path / "config.ini").write_text("[opensearch]\nhosts = localhost:9200\n")
- assert CleanupTool().check_config_ini_for_storage("OpenSearchKVStorage")
- def test_get_storage_class_opensearch(self):
- cleanup_cls = CleanupTool().get_storage_class("OpenSearchKVStorage")
- migrate_cls = MigrationTool().get_storage_class("OpenSearchKVStorage")
- assert cleanup_cls.__name__ == "OpenSearchKVStorage"
- assert migrate_cls.__name__ == "OpenSearchKVStorage"
- class TestMigrationToolOpenSearch:
- @pytest.mark.asyncio
- async def test_count_and_stream_default_caches_opensearch(self):
- tool = MigrationTool()
- storage = FakeOpenSearchStorage(
- [
- [
- {"_id": "default:extract:1", "_source": {"return": "a"}},
- {"_id": "mix:query:1", "_source": {"return": "ignored"}},
- ],
- [
- {"_id": "default:summary:1", "_source": {"return": "b"}},
- {"_id": "default:extract:2", "_source": {"return": "c"}},
- ],
- ]
- )
- count = await tool.count_default_caches(storage, "OpenSearchKVStorage")
- streamed = [
- batch
- async for batch in tool.stream_default_caches(
- storage, "OpenSearchKVStorage", batch_size=2
- )
- ]
- assert count == 3
- assert streamed == [
- {
- "default:extract:1": {"return": "a"},
- "default:summary:1": {"return": "b"},
- },
- {"default:extract:2": {"return": "c"}},
- ]
- def test_count_available_storage_types_includes_opensearch(
- self, tmp_path, monkeypatch
- ):
- monkeypatch.chdir(tmp_path)
- (tmp_path / "config.ini").write_text("[opensearch]\nhosts = localhost:9200\n")
- with patch.dict("os.environ", {}, clear=True):
- assert MigrationTool().count_available_storage_types() == 2
- @pytest.mark.asyncio
- async def test_setup_storage_returns_effective_workspace(self, monkeypatch):
- tool = MigrationTool()
- fake_storage = SimpleNamespace(workspace="forced-workspace")
- monkeypatch.setattr(tool, "check_env_vars", lambda _: True)
- monkeypatch.setattr(
- tool, "initialize_storage", AsyncMock(return_value=fake_storage)
- )
- monkeypatch.setattr(tool, "count_default_caches", AsyncMock(return_value=3))
- with patch("builtins.input", return_value="5"):
- storage, storage_name, workspace, total_count = await tool.setup_storage(
- "Source", use_streaming=True
- )
- assert storage is fake_storage
- assert storage_name == "OpenSearchKVStorage"
- assert workspace == "forced-workspace"
- assert total_count == 3
|