| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- import sys
- import warnings
- from ..utils import verbose_debug
- if sys.version_info < (3, 9):
- pass
- else:
- pass
- import pipmaster as pm # Pipmaster for dynamic library install
- # install specific modules
- if not pm.is_installed("zhipuai"):
- pm.install("zhipuai")
- from openai import (
- APIConnectionError,
- RateLimitError,
- APITimeoutError,
- )
- from tenacity import (
- retry,
- stop_after_attempt,
- wait_exponential,
- retry_if_exception_type,
- )
- from lightrag.utils import (
- wrap_embedding_func_with_attrs,
- logger,
- )
- import numpy as np
- from typing import Union, List, Optional, Dict
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type(
- (RateLimitError, APIConnectionError, APITimeoutError)
- ),
- )
- async def zhipu_complete_if_cache(
- prompt: Union[str, List[Dict[str, str]]],
- model: str = "glm-4-flashx", # The most cost/performance balance model in glm-4 series
- api_key: Optional[str] = None,
- system_prompt: Optional[str] = None,
- history_messages: List[Dict[str, str]] = [],
- enable_cot: bool = False, # LightRAG output switch: include reasoning_content as <think>...</think>
- thinking: Optional[
- Dict[str, object]
- ] = None, # Zhipu request param: use {"type": "enabled"} to enable thinking
- **kwargs,
- ) -> str:
- """Call Zhipu chat completions with optional official thinking support.
- Parameter roles:
- - `thinking`: forwarded to the Zhipu API as-is. To enable thinking output,
- pass a config such as `{"type": "enabled"}`.
- - `enable_cot`: LightRAG-only formatting switch. When True and the API
- returns `reasoning_content`, it is preserved in the final string as
- `<think>...</think>`.
- - `response_format`: forwarded as Zhipu's OpenAI-compatible structured
- output parameter when supplied by callers.
- - Deprecated `keyword_extraction` and `entity_extraction` booleans are
- compatibility shims; when no explicit `response_format` is supplied,
- they are mapped to `{"type": "json_object"}`.
- """
- # dynamically load ZhipuAI
- try:
- from zhipuai import ZhipuAI
- except ImportError:
- raise ImportError("Please install zhipuai before initialize zhipuai backend.")
- if api_key:
- client = ZhipuAI(api_key=api_key)
- else:
- # please set ZHIPUAI_API_KEY in your environment
- # os.environ["ZHIPUAI_API_KEY"]
- client = ZhipuAI()
- messages = []
- if not system_prompt:
- system_prompt = "You are a helpful assistant. Note that sensitive words in the content should be replaced with ***"
- # Add system prompt if provided
- if system_prompt:
- messages.append({"role": "system", "content": system_prompt})
- messages.extend(history_messages)
- messages.append({"role": "user", "content": prompt})
- # Add debug logging
- logger.debug("===== Query Input to LLM =====")
- logger.debug(f"Query: {prompt}")
- verbose_debug(f"System prompt: {system_prompt}")
- # Deprecation shims: map legacy extraction booleans to response_format only
- # when an explicit response_format was not supplied by the caller. The
- # legacy path also forces enable_cot=False so reasoning_content cannot
- # corrupt the JSON payload expected by callers relying on it.
- keyword_extraction = kwargs.pop("keyword_extraction", False)
- entity_extraction = kwargs.pop("entity_extraction", False)
- if kwargs.get("response_format") is None:
- if entity_extraction:
- warnings.warn(
- "zhipu_complete_if_cache(entity_extraction=True) is deprecated; "
- "pass response_format={'type': 'json_object'} instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- kwargs["response_format"] = {"type": "json_object"}
- enable_cot = False
- elif keyword_extraction:
- warnings.warn(
- "zhipu_complete_if_cache(keyword_extraction=True) is deprecated; "
- "pass response_format={'type': 'json_object'} instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- kwargs["response_format"] = {"type": "json_object"}
- enable_cot = False
- # Structured output and COT are mutually exclusive here because
- # reasoning_content would corrupt the JSON payload expected by callers.
- if kwargs.get("response_format") is not None:
- enable_cot = False
- # Remove unsupported kwargs
- kwargs = {
- k: v
- for k, v in kwargs.items()
- if k not in ["hashing_kv", "keyword_extraction", "entity_extraction"]
- }
- # `thinking` is an official Zhipu request field. Example:
- # {"type": "enabled"} enables reasoning output on supported models.
- if thinking is not None:
- kwargs["thinking"] = thinking
- response = client.chat.completions.create(model=model, messages=messages, **kwargs)
- if not response.choices or response.choices[0].message is None:
- return ""
- message = response.choices[0].message
- content = message.content or ""
- reasoning_content = getattr(message, "reasoning_content", "") or ""
- if enable_cot and reasoning_content.strip():
- if content:
- return f"<think>{reasoning_content}</think>{content}"
- return f"<think>{reasoning_content}</think>"
- return content
- async def zhipu_complete(
- prompt,
- system_prompt=None,
- history_messages=[],
- keyword_extraction=False,
- entity_extraction=False,
- enable_cot: bool = False,
- **kwargs,
- ):
- """Zhipu completion wrapper with LightRAG structured-output shims.
- Structured output note:
- - This adapter accepts OpenAI-style ``response_format`` and forwards it to
- Zhipu's compatible chat-completions API.
- - Deprecated ``keyword_extraction`` and ``entity_extraction`` booleans are
- compatibility shims; when no explicit ``response_format`` is supplied,
- they are mapped to ``{"type": "json_object"}``.
- """
- # Pop legacy extraction flags from kwargs to avoid passing them downstream.
- keyword_extraction = kwargs.pop("keyword_extraction", keyword_extraction)
- entity_extraction = kwargs.pop("entity_extraction", entity_extraction)
- # Deprecation shims: map legacy boolean flags to response_format only when
- # an explicit response_format was not supplied by the caller. The legacy
- # path also forces enable_cot=False so that reasoning_content cannot
- # corrupt the JSON payload expected by callers that were relying on it.
- if kwargs.get("response_format") is None:
- if entity_extraction:
- warnings.warn(
- "zhipu_complete(entity_extraction=True) is deprecated; "
- "pass response_format={'type': 'json_object'} instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- kwargs["response_format"] = {"type": "json_object"}
- enable_cot = False
- elif keyword_extraction:
- warnings.warn(
- "zhipu_complete(keyword_extraction=True) is deprecated; "
- "pass response_format={'type': 'json_object'} instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- kwargs["response_format"] = {"type": "json_object"}
- enable_cot = False
- return await zhipu_complete_if_cache(
- prompt=prompt,
- system_prompt=system_prompt,
- history_messages=history_messages,
- enable_cot=enable_cot,
- **kwargs,
- )
- @wrap_embedding_func_with_attrs(
- embedding_dim=1024, max_token_size=8192, model_name="embedding-3"
- )
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=60),
- retry=retry_if_exception_type(
- (RateLimitError, APIConnectionError, APITimeoutError)
- ),
- )
- async def zhipu_embedding(
- texts: list[str],
- model: str = "embedding-3",
- api_key: str = None,
- embedding_dim: int | None = None,
- **kwargs,
- ) -> np.ndarray:
- # dynamically load ZhipuAI
- try:
- from zhipuai import ZhipuAI
- except ImportError:
- raise ImportError("Please install zhipuai before initialize zhipuai backend.")
- if api_key:
- client = ZhipuAI(api_key=api_key)
- else:
- # please set ZHIPUAI_API_KEY in your environment
- # os.environ["ZHIPUAI_API_KEY"]
- client = ZhipuAI()
- # Convert single text to list if needed
- if isinstance(texts, str):
- texts = [texts]
- embeddings = []
- for text in texts:
- try:
- request_kwargs = dict(kwargs)
- if embedding_dim is not None:
- request_kwargs["dimensions"] = embedding_dim
- response = client.embeddings.create(
- model=model, input=[text], **request_kwargs
- )
- embeddings.append(response.data[0].embedding)
- except Exception as e:
- raise Exception(f"Error calling ChatGLM Embedding API: {str(e)}")
- return np.array(embeddings)
|