| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902 |
- import importlib
- import logging
- import os
- import sys
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, Mock, patch
- import pytest
- from fastapi import APIRouter
- from fastapi.testclient import TestClient
- from lightrag.llm.bedrock import (
- bedrock_complete,
- bedrock_complete_if_cache,
- bedrock_embed,
- )
- def _reload_api_modules_if_mocked() -> None:
- """Drop Mock-replaced lightrag.api entries so importlib reloads the real modules.
- Other test files (e.g. test_token_auto_renewal.py) replace
- ``sys.modules["lightrag.api.config"]`` with a Mock at import time. When
- pytest collects those files before ours, any subsequent
- ``from .config import global_args`` inside lightrag_server picks up the
- Mock, which breaks ``create_app`` in create_app_* tests below.
- """
- for modname in (
- "lightrag.api.lightrag_server",
- "lightrag.api.auth",
- "lightrag.api.config",
- ):
- if isinstance(sys.modules.get(modname), Mock):
- sys.modules.pop(modname, None)
- class _FakeBedrockClient:
- def __init__(self, captured_calls: list[dict]):
- self._captured_calls = captured_calls
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc, tb):
- return False
- async def converse(self, **kwargs):
- self._captured_calls.append(kwargs)
- return {
- "output": {
- "message": {
- "content": [
- {
- "text": '{"high_level_keywords":["AI"],"low_level_keywords":["RAG"]}'
- }
- ]
- }
- }
- }
- class _FakeSession:
- def __init__(self, captured_calls: list[dict], client_kwargs_calls: list[dict]):
- self._captured_calls = captured_calls
- self._client_kwargs_calls = client_kwargs_calls
- def client(self, *_args, **kwargs):
- self._client_kwargs_calls.append(dict(kwargs))
- return _FakeBedrockClient(self._captured_calls)
- class _FakeReasoningClient(_FakeBedrockClient):
- async def converse(self, **kwargs):
- self._captured_calls.append(kwargs)
- return {
- "output": {
- "message": {
- "content": [
- {
- "reasoningContent": {
- "reasoningText": {"text": "internal thought"}
- }
- },
- {"text": "final answer"},
- ]
- }
- }
- }
- class _FakeReasoningSession(_FakeSession):
- def client(self, *_args, **kwargs):
- self._client_kwargs_calls.append(dict(kwargs))
- return _FakeReasoningClient(self._captured_calls)
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_complete_skips_reasoning_content_block(monkeypatch):
- monkeypatch.delenv("AWS_REGION", raising=False)
- captured_calls: list[dict] = []
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeReasoningSession(captured_calls, []),
- ):
- result = await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- extra_fields={"reasoning_config": {"type": "enabled"}},
- )
- assert result == "final answer"
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_complete_forwards_keyword_extraction_to_if_cache():
- hashing_kv = SimpleNamespace(global_config={"llm_model_name": "bedrock-model"})
- with patch(
- "lightrag.llm.bedrock.bedrock_complete_if_cache",
- AsyncMock(return_value="{}"),
- ) as mocked_complete:
- await bedrock_complete(
- prompt="hello",
- hashing_kv=hashing_kv,
- keyword_extraction=True,
- )
- assert mocked_complete.await_args.kwargs["keyword_extraction"] is True
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_keyword_extraction_does_not_inject_system_prompt(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, client_kwargs_calls),
- ):
- result = await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- response_format={"type": "json_object"},
- )
- assert result == '{"high_level_keywords":["AI"],"low_level_keywords":["RAG"]}'
- assert len(captured_calls) == 1
- assert "system" not in captured_calls[0]
- assert client_kwargs_calls[-1] == {"region_name": None}
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_default_endpoint_sentinel_uses_sdk_default(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- endpoint_url="DEFAULT_BEDROCK_ENDPOINT",
- )
- assert client_kwargs_calls[-1] == {"region_name": None}
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_empty_endpoint_url_uses_sdk_default(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- endpoint_url="",
- )
- assert client_kwargs_calls[-1] == {"region_name": None}
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_custom_endpoint_url_is_forwarded(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- endpoint_url="https://proxy.example.com",
- )
- assert client_kwargs_calls[-1] == {
- "region_name": None,
- "endpoint_url": "https://proxy.example.com",
- }
- class _FakeEmbeddingBody:
- async def json(self):
- return {"embedding": [0.1] * 1024}
- class _FakeEmbeddingResponse:
- def get(self, key):
- assert key == "body"
- return _FakeEmbeddingBody()
- class _FakeEmbeddingClient(_FakeBedrockClient):
- async def invoke_model(self, **_kwargs):
- return _FakeEmbeddingResponse()
- class _FakeEmbeddingSession(_FakeSession):
- def client(self, *_args, **kwargs):
- self._client_kwargs_calls.append(dict(kwargs))
- return _FakeEmbeddingClient(self._captured_calls)
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_embed_custom_endpoint_url_is_forwarded(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeEmbeddingSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_embed(
- texts=["hello"],
- endpoint_url="https://proxy.example.com",
- )
- assert client_kwargs_calls[-1] == {
- "region_name": None,
- "endpoint_url": "https://proxy.example.com",
- }
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_embed_default_endpoint_sentinel_uses_sdk_default(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeEmbeddingSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_embed(
- texts=["hello"],
- endpoint_url="DEFAULT_BEDROCK_ENDPOINT",
- )
- assert client_kwargs_calls[-1] == {"region_name": None}
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_embed_empty_endpoint_url_uses_sdk_default(monkeypatch):
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- monkeypatch.delenv("AWS_REGION", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeEmbeddingSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_embed(
- texts=["hello"],
- endpoint_url="",
- )
- assert client_kwargs_calls[-1] == {"region_name": None}
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_complete_forwards_explicit_sigv4_client_kwargs(monkeypatch):
- monkeypatch.delenv("AWS_REGION", raising=False)
- captured_calls: list[dict] = []
- client_kwargs_calls: list[dict] = []
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, client_kwargs_calls),
- ):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- aws_region="us-west-2",
- aws_access_key_id="akid",
- aws_secret_access_key="secret",
- aws_session_token="session",
- endpoint_url="https://proxy.example.com",
- )
- assert client_kwargs_calls[-1] == {
- "region_name": "us-west-2",
- "endpoint_url": "https://proxy.example.com",
- "aws_access_key_id": "akid",
- "aws_secret_access_key": "secret",
- "aws_session_token": "session",
- }
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_extra_fields_maps_to_additional_model_request_fields(
- monkeypatch,
- ):
- monkeypatch.delenv("AWS_REGION", raising=False)
- captured_calls: list[dict] = []
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, []),
- ):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- extra_fields={"reasoning_config": {"type": "enabled"}},
- )
- assert captured_calls[-1]["additionalModelRequestFields"] == {
- "reasoning_config": {"type": "enabled"}
- }
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_empty_extra_fields_is_dropped(monkeypatch):
- monkeypatch.delenv("AWS_REGION", raising=False)
- captured_calls: list[dict] = []
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession(captured_calls, []),
- ):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- extra_fields=None,
- )
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- extra_fields={},
- )
- for call in captured_calls:
- assert "additionalModelRequestFields" not in call
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_api_key_is_ignored_and_does_not_mutate_env(monkeypatch):
- monkeypatch.delenv("AWS_REGION", raising=False)
- monkeypatch.setenv("AWS_BEARER_TOKEN_BEDROCK", "absk-from-env")
- monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False)
- monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False)
- monkeypatch.delenv("AWS_SESSION_TOKEN", raising=False)
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeSession([], []),
- ):
- with pytest.warns(DeprecationWarning, match="api_key=.*ignored"):
- await bedrock_complete_if_cache(
- model="bedrock-model",
- prompt="hello",
- api_key="absk-should-be-ignored",
- aws_access_key_id="akid",
- aws_secret_access_key="secret",
- aws_session_token="session",
- )
- assert os.environ.get("AWS_BEARER_TOKEN_BEDROCK") == "absk-from-env"
- assert os.environ.get("AWS_ACCESS_KEY_ID") is None
- assert os.environ.get("AWS_SECRET_ACCESS_KEY") is None
- assert os.environ.get("AWS_SESSION_TOKEN") is None
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_bedrock_embed_forwards_sigv4_and_ignores_api_key(monkeypatch):
- monkeypatch.delenv("AWS_REGION", raising=False)
- monkeypatch.delenv("AWS_BEARER_TOKEN_BEDROCK", raising=False)
- client_kwargs_calls: list[dict] = []
- with patch(
- "lightrag.llm.bedrock.aioboto3.Session",
- return_value=_FakeEmbeddingSession([], client_kwargs_calls),
- ):
- with pytest.warns(DeprecationWarning, match="api_key=.*ignored"):
- await bedrock_embed(
- texts=["hello"],
- api_key="absk-embedding-key",
- aws_region="us-east-1",
- aws_access_key_id="akid",
- aws_secret_access_key="secret",
- aws_session_token="session",
- )
- assert client_kwargs_calls[-1] == {
- "region_name": "us-east-1",
- "aws_access_key_id": "akid",
- "aws_secret_access_key": "secret",
- "aws_session_token": "session",
- }
- assert os.environ.get("AWS_BEARER_TOKEN_BEDROCK") is None
- @pytest.mark.offline
- def test_bedrock_auth_docstrings_describe_generic_api_key_behavior():
- assert "AWS_BEARER_TOKEN_BEDROCK" in bedrock_complete_if_cache.__doc__
- assert "LLM_BINDING_API_KEY" in bedrock_complete_if_cache.__doc__
- assert "EMBEDDING_BINDING_API_KEY" in bedrock_embed.func.__doc__
- class _FakeLightRAG:
- last_init_kwargs = None
- last_instance = None
- def __init__(self, **kwargs):
- type(self).last_init_kwargs = dict(kwargs)
- type(self).last_instance = self
- self.role_config_snapshot = {}
- for role, cfg in (kwargs.get("role_llm_configs") or {}).items():
- metadata = dict(getattr(cfg, "metadata", None) or {})
- self.role_config_snapshot[role] = {
- "binding": metadata.get("binding"),
- "model": metadata.get("model"),
- "host": metadata.get("host"),
- "is_cross_provider": metadata.get("is_cross_provider", False),
- "max_async": getattr(cfg, "max_async", None),
- "timeout": getattr(cfg, "timeout", None),
- "has_model_kwargs": getattr(cfg, "kwargs", None) is not None,
- "metadata": metadata,
- }
- self.queue_status_snapshot = {}
- self.embedding_queue_status_snapshot = {}
- self.rerank_queue_status_snapshot = {}
- def register_role_llm_builder(self, _builder) -> None:
- return None
- def set_role_llm_metadata(self, _role: str, **_metadata) -> None:
- return None
- def get_llm_role_config(self):
- return self.role_config_snapshot
- async def get_llm_queue_status(self, include_base=True):
- return self.queue_status_snapshot
- async def get_embedding_queue_status(self):
- return self.embedding_queue_status_snapshot
- async def get_rerank_queue_status(self):
- return self.rerank_queue_status_snapshot
- class _FakeOllamaAPI:
- def __init__(self, *_args, **_kwargs):
- self.router = APIRouter()
- def _make_args(tmp_path) -> SimpleNamespace:
- return SimpleNamespace(
- host="127.0.0.1",
- port=9621,
- log_level="INFO",
- verbose=False,
- cors_origins="*",
- whitelist_paths="/health,/api/*",
- auth_accounts="",
- token_secret=None,
- token_expire_hours=48,
- guest_token_expire_hours=24,
- jwt_algorithm="HS256",
- token_auto_renew=True,
- token_renew_threshold=0.5,
- llm_binding="bedrock",
- embedding_binding="bedrock",
- llm_binding_host="DEFAULT_BEDROCK_ENDPOINT",
- embedding_binding_host="DEFAULT_BEDROCK_ENDPOINT",
- ssl=False,
- ssl_certfile=None,
- ssl_keyfile=None,
- key=None,
- input_dir=str(tmp_path / "inputs"),
- workspace="",
- working_dir=str(tmp_path / "rag_storage"),
- llm_binding_api_key=None,
- embedding_binding_api_key="",
- aws_region="us-east-1",
- aws_access_key_id="global-akid",
- aws_secret_access_key="global-secret",
- aws_session_token="global-session",
- query_aws_region=None,
- query_aws_access_key_id=None,
- query_aws_secret_access_key=None,
- query_aws_session_token=None,
- llm_model="us.amazon.nova-lite-v1:0",
- embedding_model=None,
- embedding_dim=None,
- embedding_send_dim=False,
- embedding_token_limit=None,
- embedding_document_prefix=None,
- embedding_document_prefix_configured=False,
- embedding_query_prefix=None,
- embedding_query_prefix_configured=False,
- embedding_prefix_no_prefix_sentinel="NO_PREFIX",
- embedding_prefixes_configured=False,
- embedding_asymmetric=False,
- embedding_asymmetric_configured=False,
- max_async=4,
- summary_max_tokens=512,
- summary_context_size=4096,
- force_llm_summary_on_merge=8,
- chunk_size=1200,
- chunk_overlap_size=100,
- kv_storage="JsonKVStorage",
- graph_storage="NetworkXStorage",
- vector_storage="NanoVectorDBStorage",
- doc_status_storage="JsonDocStatusStorage",
- cosine_threshold=0.2,
- enable_llm_cache_for_extract=True,
- enable_llm_cache=True,
- vlm_process_enable=False,
- max_parallel_insert=2,
- max_graph_nodes=1000,
- simulated_model_name="lightrag",
- simulated_model_tag="latest",
- summary_language="English",
- rerank_binding="null",
- rerank_model=None,
- rerank_binding_host=None,
- rerank_binding_api_key=None,
- embedding_func_max_async=8,
- embedding_batch_num=10,
- min_rerank_score=0.0,
- related_chunk_number=5,
- top_k=10,
- llm_timeout=180,
- embedding_timeout=30,
- rerank_max_async=4,
- rerank_timeout=30,
- )
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_create_app_query_role_uses_bedrock_binding(tmp_path, monkeypatch):
- _reload_api_modules_if_mocked()
- monkeypatch.setattr(sys, "argv", ["pytest"])
- config = importlib.import_module("lightrag.api.config")
- config.initialize_config(_make_args(tmp_path), force=True)
- lightrag_server = importlib.import_module("lightrag.api.lightrag_server")
- monkeypatch.setattr(lightrag_server, "LightRAG", _FakeLightRAG)
- monkeypatch.setattr(lightrag_server, "check_frontend_build", lambda: (True, False))
- monkeypatch.setattr(
- lightrag_server, "create_document_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_query_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_graph_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(lightrag_server, "OllamaAPI", _FakeOllamaAPI)
- args = _make_args(tmp_path)
- with (
- patch(
- "lightrag.llm.bedrock.bedrock_complete_if_cache",
- AsyncMock(return_value="bedrock-ok"),
- ) as mocked_bedrock,
- patch(
- "lightrag.llm.openai.openai_complete_if_cache",
- AsyncMock(side_effect=AssertionError("OpenAI fallback should not be used")),
- ) as mocked_openai,
- ):
- lightrag_server.create_app(args)
- query_cfg = _FakeLightRAG.last_init_kwargs["role_llm_configs"]["query"]
- query_func = query_cfg.func
- result = await query_func("hello")
- assert query_cfg.metadata["binding"] == "bedrock"
- assert query_cfg.metadata["model"] == "us.amazon.nova-lite-v1:0"
- assert query_cfg.metadata["host"] == "DEFAULT_BEDROCK_ENDPOINT"
- assert query_cfg.metadata["api_key"] is None
- assert query_cfg.metadata["bedrock_aws_options"]["aws_region"] == "us-east-1"
- assert result == "bedrock-ok"
- assert mocked_openai.await_count == 0
- assert mocked_bedrock.await_count == 1
- assert mocked_bedrock.await_args.args[:2] == ("us.amazon.nova-lite-v1:0", "hello")
- assert "api_key" not in mocked_bedrock.await_args.kwargs
- assert (
- mocked_bedrock.await_args.kwargs["endpoint_url"] == "DEFAULT_BEDROCK_ENDPOINT"
- )
- assert mocked_bedrock.await_args.kwargs["aws_region"] == "us-east-1"
- assert mocked_bedrock.await_args.kwargs["aws_access_key_id"] == "global-akid"
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_create_app_bedrock_query_role_uses_role_sigv4_credentials(
- tmp_path, monkeypatch
- ):
- _reload_api_modules_if_mocked()
- monkeypatch.setattr(sys, "argv", ["pytest"])
- config = importlib.import_module("lightrag.api.config")
- config.initialize_config(_make_args(tmp_path), force=True)
- lightrag_server = importlib.import_module("lightrag.api.lightrag_server")
- monkeypatch.setattr(lightrag_server, "LightRAG", _FakeLightRAG)
- monkeypatch.setattr(lightrag_server, "check_frontend_build", lambda: (True, False))
- monkeypatch.setattr(
- lightrag_server, "create_document_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_query_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_graph_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(lightrag_server, "OllamaAPI", _FakeOllamaAPI)
- args = _make_args(tmp_path)
- args.query_aws_region = "us-west-2"
- args.query_aws_access_key_id = "query-akid"
- args.query_aws_secret_access_key = "query-secret"
- args.query_aws_session_token = "query-session"
- with patch(
- "lightrag.llm.bedrock.bedrock_complete_if_cache",
- AsyncMock(return_value="bedrock-ok"),
- ) as mocked_bedrock:
- lightrag_server.create_app(args)
- query_func = _FakeLightRAG.last_init_kwargs["role_llm_configs"]["query"].func
- await query_func("hello")
- assert mocked_bedrock.await_args.kwargs["aws_region"] == "us-west-2"
- assert mocked_bedrock.await_args.kwargs["aws_access_key_id"] == "query-akid"
- assert mocked_bedrock.await_args.kwargs["aws_secret_access_key"] == "query-secret"
- assert mocked_bedrock.await_args.kwargs["aws_session_token"] == "query-session"
- @pytest.mark.offline
- @pytest.mark.asyncio
- async def test_create_app_keyword_openai_role_forwards_nested_extra_body(
- tmp_path, monkeypatch, caplog
- ):
- _reload_api_modules_if_mocked()
- monkeypatch.setattr(sys, "argv", ["pytest"])
- monkeypatch.setattr(logging.getLogger("lightrag"), "propagate", True)
- monkeypatch.setenv(
- "KEYWORD_OPENAI_LLM_EXTRA_BODY",
- '{"chat_template_kwargs": {"enable_thinking": false}}',
- )
- config = importlib.import_module("lightrag.api.config")
- config.initialize_config(_make_args(tmp_path), force=True)
- lightrag_server = importlib.import_module("lightrag.api.lightrag_server")
- monkeypatch.setattr(lightrag_server, "LightRAG", _FakeLightRAG)
- monkeypatch.setattr(lightrag_server, "check_frontend_build", lambda: (True, False))
- monkeypatch.setattr(
- lightrag_server, "create_document_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_query_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_graph_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(lightrag_server, "OllamaAPI", _FakeOllamaAPI)
- args = _make_args(tmp_path)
- args.keyword_llm_binding = "openai"
- args.keyword_llm_model = "xhd/Qwen3.5-35B-A3B"
- args.keyword_llm_binding_host = "https://keyword.example/v1"
- args.keyword_llm_binding_api_key = "keyword-secret"
- with (
- caplog.at_level("INFO", logger="lightrag"),
- patch(
- "lightrag.llm.openai.openai_complete_if_cache",
- AsyncMock(
- return_value='{"high_level_keywords":[],"low_level_keywords":[]}'
- ),
- ) as mocked_openai,
- ):
- lightrag_server.create_app(args)
- keyword_cfg = _FakeLightRAG.last_init_kwargs["role_llm_configs"]["keyword"]
- result = await keyword_cfg.func(
- "keyword prompt", response_format={"type": "json_object"}
- )
- assert result == '{"high_level_keywords":[],"low_level_keywords":[]}'
- assert keyword_cfg.metadata["binding"] == "openai"
- assert keyword_cfg.metadata["provider_options"]["extra_body"] == {
- "chat_template_kwargs": {"enable_thinking": False}
- }
- assert mocked_openai.await_count == 1
- assert mocked_openai.await_args.args[:2] == (
- "xhd/Qwen3.5-35B-A3B",
- "keyword prompt",
- )
- kwargs = mocked_openai.await_args.kwargs
- assert kwargs["base_url"] == "https://keyword.example/v1"
- assert kwargs["api_key"] == "keyword-secret"
- assert kwargs["response_format"] == {"type": "json_object"}
- assert kwargs["extra_body"] == {"chat_template_kwargs": {"enable_thinking": False}}
- messages = "\n".join(record.getMessage() for record in caplog.records)
- assert "Role LLM Option:" in messages
- assert " - extract: Bedrock {}" in messages
- assert " - keyword: OpenAI {'extra_body':" in messages
- assert " - query: Bedrock {}" in messages
- assert " - vlm: Bedrock {}" in messages
- assert "chat_template_kwargs" in messages
- assert "reasoning_effort" not in messages
- assert "frequency_penalty" not in messages
- assert "keyword-secret" not in messages
- @pytest.mark.offline
- def test_create_app_rejects_bedrock_role_api_key(tmp_path, monkeypatch):
- _reload_api_modules_if_mocked()
- monkeypatch.setattr(sys, "argv", ["pytest"])
- config = importlib.import_module("lightrag.api.config")
- config.initialize_config(_make_args(tmp_path), force=True)
- lightrag_server = importlib.import_module("lightrag.api.lightrag_server")
- monkeypatch.setattr(lightrag_server, "check_frontend_build", lambda: (True, False))
- args = _make_args(tmp_path)
- args.query_llm_binding_api_key = "absk-role"
- with pytest.raises(ValueError, match="does not support role-specific"):
- lightrag_server.create_app(args)
- @pytest.mark.offline
- def test_health_role_llm_config_uses_runtime_snapshot(tmp_path, monkeypatch):
- _reload_api_modules_if_mocked()
- monkeypatch.setattr(sys, "argv", ["pytest"])
- config = importlib.import_module("lightrag.api.config")
- config.initialize_config(_make_args(tmp_path), force=True)
- lightrag_server = importlib.import_module("lightrag.api.lightrag_server")
- monkeypatch.setattr(lightrag_server, "LightRAG", _FakeLightRAG)
- monkeypatch.setattr(lightrag_server, "check_frontend_build", lambda: (True, False))
- monkeypatch.setattr(
- lightrag_server, "create_document_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_query_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_graph_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(lightrag_server, "OllamaAPI", _FakeOllamaAPI)
- monkeypatch.setattr(
- lightrag_server,
- "get_namespace_data",
- AsyncMock(return_value={"busy": False}),
- )
- monkeypatch.setattr(lightrag_server, "get_default_workspace", lambda: "default")
- monkeypatch.setattr(
- lightrag_server,
- "cleanup_keyed_lock",
- lambda: {"cleanup_performed": {}, "current_status": {}},
- )
- app = lightrag_server.create_app(_make_args(tmp_path))
- _FakeLightRAG.last_instance.role_config_snapshot = {
- "query": {
- "binding": "runtime-binding",
- "model": "runtime-model",
- "host": "https://runtime.example/v1",
- "max_async": 9,
- "metadata": {"binding": "runtime-binding"},
- }
- }
- _FakeLightRAG.last_instance.queue_status_snapshot = {
- "query": {"available": True, "rejected_total": 2}
- }
- _FakeLightRAG.last_instance.embedding_queue_status_snapshot = {
- "available": True,
- "running": 1,
- }
- _FakeLightRAG.last_instance.rerank_queue_status_snapshot = {
- "available": False,
- }
- response = TestClient(app).get("/health")
- assert response.status_code == 200
- body = response.json()
- role_cfg = body["configuration"]["role_llm_config"]["query"]
- assert role_cfg["binding"] == "runtime-binding"
- assert role_cfg["model"] == "runtime-model"
- assert role_cfg["host"] == "https://runtime.example/v1"
- assert role_cfg["max_async"] == 9
- assert role_cfg["model"] != "us.amazon.nova-lite-v1:0"
- assert body["llm_queue_status"]["query"]["rejected_total"] == 2
- assert body["embedding_queue_status"]["running"] == 1
- assert body["rerank_queue_status"]["available"] is False
- @pytest.mark.offline
- @pytest.mark.parametrize(
- "pipeline_state, expected_active",
- [
- ({"busy": False}, False),
- ({"busy": True}, True),
- ({"busy": False, "scanning": True}, True),
- ({"busy": False, "destructive_busy": True}, True),
- ({"busy": False, "pending_enqueues": 2}, True),
- (
- {
- "busy": False,
- "scanning": False,
- "destructive_busy": False,
- "pending_enqueues": 0,
- },
- False,
- ),
- ],
- )
- def test_health_pipeline_active_derivation(
- tmp_path, monkeypatch, pipeline_state, expected_active
- ):
- _reload_api_modules_if_mocked()
- monkeypatch.setattr(sys, "argv", ["pytest"])
- config = importlib.import_module("lightrag.api.config")
- config.initialize_config(_make_args(tmp_path), force=True)
- lightrag_server = importlib.import_module("lightrag.api.lightrag_server")
- monkeypatch.setattr(lightrag_server, "LightRAG", _FakeLightRAG)
- monkeypatch.setattr(lightrag_server, "check_frontend_build", lambda: (True, False))
- monkeypatch.setattr(
- lightrag_server, "create_document_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_query_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(
- lightrag_server, "create_graph_routes", lambda *_args, **_kwargs: APIRouter()
- )
- monkeypatch.setattr(lightrag_server, "OllamaAPI", _FakeOllamaAPI)
- monkeypatch.setattr(
- lightrag_server,
- "get_namespace_data",
- AsyncMock(return_value=pipeline_state),
- )
- monkeypatch.setattr(lightrag_server, "get_default_workspace", lambda: "default")
- monkeypatch.setattr(
- lightrag_server,
- "cleanup_keyed_lock",
- lambda: {"cleanup_performed": {}, "current_status": {}},
- )
- app = lightrag_server.create_app(_make_args(tmp_path))
- response = TestClient(app).get("/health")
- assert response.status_code == 200
- body = response.json()
- assert body["pipeline_busy"] is bool(pipeline_state.get("busy", False))
- assert body["pipeline_scanning"] is bool(pipeline_state.get("scanning", False))
- assert body["pipeline_destructive_busy"] is bool(
- pipeline_state.get("destructive_busy", False)
- )
- assert body["pipeline_pending_enqueues"] == int(
- pipeline_state.get("pending_enqueues", 0)
- )
- assert body["pipeline_active"] is expected_active
|