anthropic.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. from ..utils import verbose_debug, VERBOSE_DEBUG
  2. import sys
  3. import os
  4. import logging
  5. import warnings
  6. from typing import Any, Union, AsyncIterator
  7. import pipmaster as pm # Pipmaster for dynamic library install
  8. if sys.version_info < (3, 9):
  9. from typing import AsyncIterator
  10. else:
  11. from collections.abc import AsyncIterator
  12. # Install Anthropic SDK if not present
  13. if not pm.is_installed("anthropic"):
  14. pm.install("anthropic")
  15. from anthropic import (
  16. AsyncAnthropic,
  17. APIConnectionError,
  18. RateLimitError,
  19. APITimeoutError,
  20. )
  21. from tenacity import (
  22. retry,
  23. stop_after_attempt,
  24. wait_exponential,
  25. retry_if_exception_type,
  26. )
  27. from lightrag.utils import (
  28. safe_unicode_decode,
  29. logger,
  30. )
  31. from lightrag.api import __api_version__
  32. # Custom exception for retry mechanism
  33. class InvalidResponseError(Exception):
  34. """Custom exception class for triggering retry mechanism"""
  35. pass
  36. # Core Anthropic completion function with retry
  37. @retry(
  38. stop=stop_after_attempt(3),
  39. wait=wait_exponential(multiplier=1, min=4, max=10),
  40. retry=retry_if_exception_type(
  41. (RateLimitError, APIConnectionError, APITimeoutError, InvalidResponseError)
  42. ),
  43. )
  44. async def anthropic_complete_if_cache(
  45. model: str,
  46. prompt: str,
  47. system_prompt: str | None = None,
  48. history_messages: list[dict[str, Any]] | None = None,
  49. enable_cot: bool = False,
  50. base_url: str | None = None,
  51. api_key: str | None = None,
  52. image_inputs: list[Any] | None = None,
  53. **kwargs: Any,
  54. ) -> Union[str, AsyncIterator[str]]:
  55. """Call Anthropic Messages API with LightRAG-compatible shims.
  56. Structured output note:
  57. - This adapter does not support OpenAI-style ``response_format`` JSON mode.
  58. - If callers pass ``response_format``, it is stripped before the request.
  59. - Deprecated ``keyword_extraction`` and ``entity_extraction`` booleans are
  60. accepted only as compatibility shims; they emit warnings and are ignored.
  61. """
  62. if history_messages is None:
  63. history_messages = []
  64. if enable_cot:
  65. logger.debug(
  66. "enable_cot=True is not supported for the Anthropic API and will be ignored."
  67. )
  68. if not api_key:
  69. api_key = os.environ.get("ANTHROPIC_API_KEY")
  70. default_headers = {
  71. "User-Agent": f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_8) LightRAG/{__api_version__}",
  72. "Content-Type": "application/json",
  73. }
  74. # Set logger level to INFO when VERBOSE_DEBUG is off
  75. if not VERBOSE_DEBUG and logger.level == logging.DEBUG:
  76. logging.getLogger("anthropic").setLevel(logging.INFO)
  77. kwargs.pop("hashing_kv", None)
  78. # Anthropic Messages API has no JSON mode; drop legacy flags and
  79. # response_format. Emit DeprecationWarning when the booleans were set.
  80. if kwargs.pop("keyword_extraction", False):
  81. warnings.warn(
  82. "anthropic_complete_if_cache(keyword_extraction=True) is deprecated; "
  83. "pass response_format={'type': 'json_object'} instead.",
  84. DeprecationWarning,
  85. stacklevel=2,
  86. )
  87. if kwargs.pop("entity_extraction", False):
  88. warnings.warn(
  89. "anthropic_complete_if_cache(entity_extraction=True) is deprecated; "
  90. "pass response_format={'type': 'json_object'} instead.",
  91. DeprecationWarning,
  92. stacklevel=2,
  93. )
  94. kwargs.pop("response_format", None)
  95. timeout = kwargs.pop("timeout", None)
  96. # Require max_tokens; the Anthropic SDK errors if it's missing
  97. kwargs.setdefault("max_tokens", 8192)
  98. # Pop stream from kwargs so it doesn't leak into create_params;
  99. # default to False (non-streaming) for consistency with other providers
  100. stream = kwargs.pop("stream", False)
  101. anthropic_async_client = (
  102. AsyncAnthropic(
  103. default_headers=default_headers, api_key=api_key, timeout=timeout
  104. )
  105. if base_url is None
  106. else AsyncAnthropic(
  107. base_url=base_url,
  108. default_headers=default_headers,
  109. api_key=api_key,
  110. timeout=timeout,
  111. )
  112. )
  113. messages: list[dict[str, Any]] = []
  114. messages.extend(history_messages)
  115. if image_inputs:
  116. from lightrag.llm._vision_utils import normalize_image_inputs
  117. normalized_images = normalize_image_inputs(image_inputs)
  118. user_content: list[dict[str, Any]] = []
  119. for img in normalized_images:
  120. user_content.append(
  121. {
  122. "type": "image",
  123. "source": {
  124. "type": "base64",
  125. "media_type": img.mime_type,
  126. "data": img.base64_str,
  127. },
  128. }
  129. )
  130. user_content.append({"type": "text", "text": prompt})
  131. messages.append({"role": "user", "content": user_content})
  132. else:
  133. messages.append({"role": "user", "content": prompt})
  134. logger.debug("===== Sending Query to Anthropic LLM =====")
  135. logger.debug(f"Model: {model} Base URL: {base_url}")
  136. logger.debug(f"Additional kwargs: {kwargs}")
  137. verbose_debug(f"Query: {prompt}")
  138. verbose_debug(f"System prompt: {system_prompt}")
  139. try:
  140. create_params = {
  141. "model": model,
  142. "messages": messages,
  143. "stream": stream,
  144. **kwargs,
  145. }
  146. if system_prompt:
  147. create_params["system"] = system_prompt
  148. response = await anthropic_async_client.messages.create(**create_params)
  149. except APIConnectionError as e:
  150. logger.error(f"Anthropic API Connection Error: {e}")
  151. raise
  152. except RateLimitError as e:
  153. logger.error(f"Anthropic API Rate Limit Error: {e}")
  154. raise
  155. except APITimeoutError as e:
  156. logger.error(f"Anthropic API Timeout Error: {e}")
  157. raise
  158. except Exception as e:
  159. body = getattr(e, "body", None)
  160. request_id = getattr(e, "request_id", None)
  161. req = getattr(e, "request", None)
  162. extra_parts = []
  163. if body:
  164. extra_parts.append(f"Response body: {body}")
  165. if request_id:
  166. extra_parts.append(f"Request ID: {request_id}")
  167. if req is not None:
  168. extra_parts.append(f"Request URL: {req.url}")
  169. extra = ("\n" + "\n".join(extra_parts)) if extra_parts else ""
  170. logger.error(
  171. f"Anthropic API Call Failed,\nModel: {model},\nParams: {kwargs}, Got: {e}{extra}"
  172. )
  173. raise
  174. if not stream:
  175. return response.content[0].text
  176. async def stream_response():
  177. try:
  178. async for event in response:
  179. content = (
  180. event.delta.text
  181. if hasattr(event, "delta")
  182. and hasattr(event.delta, "text")
  183. and event.delta.text
  184. else None
  185. )
  186. if content is None:
  187. continue
  188. if r"\u" in content:
  189. content = safe_unicode_decode(content.encode("utf-8"))
  190. yield content
  191. except Exception as e:
  192. logger.error(f"Error in stream response: {str(e)}")
  193. raise
  194. return stream_response()
  195. # Generic Anthropic completion function
  196. async def anthropic_complete(
  197. prompt: str,
  198. system_prompt: str | None = None,
  199. history_messages: list[dict[str, Any]] | None = None,
  200. enable_cot: bool = False,
  201. **kwargs: Any,
  202. ) -> Union[str, AsyncIterator[str]]:
  203. if history_messages is None:
  204. history_messages = []
  205. model_name = kwargs["hashing_kv"].global_config["llm_model_name"]
  206. return await anthropic_complete_if_cache(
  207. model_name,
  208. prompt,
  209. system_prompt=system_prompt,
  210. history_messages=history_messages,
  211. enable_cot=enable_cot,
  212. **kwargs,
  213. )
  214. # Claude 3 Opus specific completion
  215. async def claude_3_opus_complete(
  216. prompt: str,
  217. system_prompt: str | None = None,
  218. history_messages: list[dict[str, Any]] | None = None,
  219. enable_cot: bool = False,
  220. **kwargs: Any,
  221. ) -> Union[str, AsyncIterator[str]]:
  222. if history_messages is None:
  223. history_messages = []
  224. return await anthropic_complete_if_cache(
  225. "claude-3-opus-20240229",
  226. prompt,
  227. system_prompt=system_prompt,
  228. history_messages=history_messages,
  229. enable_cot=enable_cot,
  230. **kwargs,
  231. )
  232. # Claude 3 Sonnet specific completion
  233. async def claude_3_sonnet_complete(
  234. prompt: str,
  235. system_prompt: str | None = None,
  236. history_messages: list[dict[str, Any]] | None = None,
  237. enable_cot: bool = False,
  238. **kwargs: Any,
  239. ) -> Union[str, AsyncIterator[str]]:
  240. if history_messages is None:
  241. history_messages = []
  242. return await anthropic_complete_if_cache(
  243. "claude-3-sonnet-20240229",
  244. prompt,
  245. system_prompt=system_prompt,
  246. history_messages=history_messages,
  247. enable_cot=enable_cot,
  248. **kwargs,
  249. )
  250. # Claude 3 Haiku specific completion
  251. async def claude_3_haiku_complete(
  252. prompt: str,
  253. system_prompt: str | None = None,
  254. history_messages: list[dict[str, Any]] | None = None,
  255. enable_cot: bool = False,
  256. **kwargs: Any,
  257. ) -> Union[str, AsyncIterator[str]]:
  258. if history_messages is None:
  259. history_messages = []
  260. return await anthropic_complete_if_cache(
  261. "claude-3-haiku-20240307",
  262. prompt,
  263. system_prompt=system_prompt,
  264. history_messages=history_messages,
  265. enable_cot=enable_cot,
  266. **kwargs,
  267. )
  268. # Backward-compatibility shim: the previous embedding implementation lived in
  269. # this module under the (misleading) name ``anthropic_embed`` even though it
  270. # called Voyage AI under the hood. The real implementation now lives in
  271. # ``lightrag.llm.voyageai.voyageai_embed``. Keep the old name importable for one
  272. # release cycle so downstream users get a clear deprecation warning instead of
  273. # an ImportError. Remove in a future major version.
  274. def anthropic_embed(*args, **kwargs):
  275. """Deprecated alias for :func:`lightrag.llm.voyageai.voyageai_embed`.
  276. This shim accepts the same arguments as the original ``anthropic_embed``
  277. function (which was always backed by VoyageAI) and forwards them to
  278. :func:`voyageai_embed`. It will be removed in a future release.
  279. """
  280. warnings.warn(
  281. "lightrag.llm.anthropic.anthropic_embed is deprecated and will be "
  282. "removed in a future release. Import "
  283. "lightrag.llm.voyageai.voyageai_embed instead.",
  284. DeprecationWarning,
  285. stacklevel=2,
  286. )
  287. from lightrag.llm.voyageai import voyageai_embed
  288. return voyageai_embed.func(*args, **kwargs)