import sys
import warnings
from ..utils import verbose_debug
if sys.version_info < (3, 9):
pass
else:
pass
import pipmaster as pm # Pipmaster for dynamic library install
# install specific modules
if not pm.is_installed("zhipuai"):
pm.install("zhipuai")
from openai import (
APIConnectionError,
RateLimitError,
APITimeoutError,
)
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from lightrag.utils import (
wrap_embedding_func_with_attrs,
logger,
)
import numpy as np
from typing import Union, List, Optional, Dict
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(
(RateLimitError, APIConnectionError, APITimeoutError)
),
)
async def zhipu_complete_if_cache(
prompt: Union[str, List[Dict[str, str]]],
model: str = "glm-4-flashx", # The most cost/performance balance model in glm-4 series
api_key: Optional[str] = None,
system_prompt: Optional[str] = None,
history_messages: List[Dict[str, str]] = [],
enable_cot: bool = False, # LightRAG output switch: include reasoning_content as ...
thinking: Optional[
Dict[str, object]
] = None, # Zhipu request param: use {"type": "enabled"} to enable thinking
**kwargs,
) -> str:
"""Call Zhipu chat completions with optional official thinking support.
Parameter roles:
- `thinking`: forwarded to the Zhipu API as-is. To enable thinking output,
pass a config such as `{"type": "enabled"}`.
- `enable_cot`: LightRAG-only formatting switch. When True and the API
returns `reasoning_content`, it is preserved in the final string as
`...`.
- `response_format`: forwarded as Zhipu's OpenAI-compatible structured
output parameter when supplied by callers.
- Deprecated `keyword_extraction` and `entity_extraction` booleans are
compatibility shims; when no explicit `response_format` is supplied,
they are mapped to `{"type": "json_object"}`.
"""
# dynamically load ZhipuAI
try:
from zhipuai import ZhipuAI
except ImportError:
raise ImportError("Please install zhipuai before initialize zhipuai backend.")
if api_key:
client = ZhipuAI(api_key=api_key)
else:
# please set ZHIPUAI_API_KEY in your environment
# os.environ["ZHIPUAI_API_KEY"]
client = ZhipuAI()
messages = []
if not system_prompt:
system_prompt = "You are a helpful assistant. Note that sensitive words in the content should be replaced with ***"
# Add system prompt if provided
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
# Add debug logging
logger.debug("===== Query Input to LLM =====")
logger.debug(f"Query: {prompt}")
verbose_debug(f"System prompt: {system_prompt}")
# Deprecation shims: map legacy extraction booleans to response_format only
# when an explicit response_format was not supplied by the caller. The
# legacy path also forces enable_cot=False so reasoning_content cannot
# corrupt the JSON payload expected by callers relying on it.
keyword_extraction = kwargs.pop("keyword_extraction", False)
entity_extraction = kwargs.pop("entity_extraction", False)
if kwargs.get("response_format") is None:
if entity_extraction:
warnings.warn(
"zhipu_complete_if_cache(entity_extraction=True) is deprecated; "
"pass response_format={'type': 'json_object'} instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs["response_format"] = {"type": "json_object"}
enable_cot = False
elif keyword_extraction:
warnings.warn(
"zhipu_complete_if_cache(keyword_extraction=True) is deprecated; "
"pass response_format={'type': 'json_object'} instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs["response_format"] = {"type": "json_object"}
enable_cot = False
# Structured output and COT are mutually exclusive here because
# reasoning_content would corrupt the JSON payload expected by callers.
if kwargs.get("response_format") is not None:
enable_cot = False
# Remove unsupported kwargs
kwargs = {
k: v
for k, v in kwargs.items()
if k not in ["hashing_kv", "keyword_extraction", "entity_extraction"]
}
# `thinking` is an official Zhipu request field. Example:
# {"type": "enabled"} enables reasoning output on supported models.
if thinking is not None:
kwargs["thinking"] = thinking
response = client.chat.completions.create(model=model, messages=messages, **kwargs)
if not response.choices or response.choices[0].message is None:
return ""
message = response.choices[0].message
content = message.content or ""
reasoning_content = getattr(message, "reasoning_content", "") or ""
if enable_cot and reasoning_content.strip():
if content:
return f"{reasoning_content}{content}"
return f"{reasoning_content}"
return content
async def zhipu_complete(
prompt,
system_prompt=None,
history_messages=[],
keyword_extraction=False,
entity_extraction=False,
enable_cot: bool = False,
**kwargs,
):
"""Zhipu completion wrapper with LightRAG structured-output shims.
Structured output note:
- This adapter accepts OpenAI-style ``response_format`` and forwards it to
Zhipu's compatible chat-completions API.
- Deprecated ``keyword_extraction`` and ``entity_extraction`` booleans are
compatibility shims; when no explicit ``response_format`` is supplied,
they are mapped to ``{"type": "json_object"}``.
"""
# Pop legacy extraction flags from kwargs to avoid passing them downstream.
keyword_extraction = kwargs.pop("keyword_extraction", keyword_extraction)
entity_extraction = kwargs.pop("entity_extraction", entity_extraction)
# Deprecation shims: map legacy boolean flags to response_format only when
# an explicit response_format was not supplied by the caller. The legacy
# path also forces enable_cot=False so that reasoning_content cannot
# corrupt the JSON payload expected by callers that were relying on it.
if kwargs.get("response_format") is None:
if entity_extraction:
warnings.warn(
"zhipu_complete(entity_extraction=True) is deprecated; "
"pass response_format={'type': 'json_object'} instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs["response_format"] = {"type": "json_object"}
enable_cot = False
elif keyword_extraction:
warnings.warn(
"zhipu_complete(keyword_extraction=True) is deprecated; "
"pass response_format={'type': 'json_object'} instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs["response_format"] = {"type": "json_object"}
enable_cot = False
return await zhipu_complete_if_cache(
prompt=prompt,
system_prompt=system_prompt,
history_messages=history_messages,
enable_cot=enable_cot,
**kwargs,
)
@wrap_embedding_func_with_attrs(
embedding_dim=1024, max_token_size=8192, model_name="embedding-3"
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(
(RateLimitError, APIConnectionError, APITimeoutError)
),
)
async def zhipu_embedding(
texts: list[str],
model: str = "embedding-3",
api_key: str = None,
embedding_dim: int | None = None,
**kwargs,
) -> np.ndarray:
# dynamically load ZhipuAI
try:
from zhipuai import ZhipuAI
except ImportError:
raise ImportError("Please install zhipuai before initialize zhipuai backend.")
if api_key:
client = ZhipuAI(api_key=api_key)
else:
# please set ZHIPUAI_API_KEY in your environment
# os.environ["ZHIPUAI_API_KEY"]
client = ZhipuAI()
# Convert single text to list if needed
if isinstance(texts, str):
texts = [texts]
embeddings = []
for text in texts:
try:
request_kwargs = dict(kwargs)
if embedding_dim is not None:
request_kwargs["dimensions"] = embedding_dim
response = client.embeddings.create(
model=model, input=[text], **request_kwargs
)
embeddings.append(response.data[0].embedding)
except Exception as e:
raise Exception(f"Error calling ChatGLM Embedding API: {str(e)}")
return np.array(embeddings)