| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import copy
- import os
- import warnings
- from functools import lru_cache
- import pipmaster as pm # Pipmaster for dynamic library install
- # install specific modules
- if not pm.is_installed("transformers"):
- pm.install("transformers")
- if not pm.is_installed("torch"):
- pm.install("torch")
- if not pm.is_installed("numpy"):
- pm.install("numpy")
- from transformers import AutoTokenizer, AutoModelForCausalLM
- from tenacity import (
- retry,
- stop_after_attempt,
- wait_exponential,
- retry_if_exception_type,
- )
- from lightrag.exceptions import (
- APIConnectionError,
- RateLimitError,
- APITimeoutError,
- )
- import torch
- import numpy as np
- from lightrag.utils import wrap_embedding_func_with_attrs
- os.environ["TOKENIZERS_PARALLELISM"] = "false"
- @lru_cache(maxsize=1)
- def initialize_hf_model(model_name):
- hf_tokenizer = AutoTokenizer.from_pretrained(
- model_name, device_map="auto", trust_remote_code=True
- )
- hf_model = AutoModelForCausalLM.from_pretrained(
- model_name, device_map="auto", trust_remote_code=True
- )
- if hf_tokenizer.pad_token is None:
- hf_tokenizer.pad_token = hf_tokenizer.eos_token
- return hf_model, hf_tokenizer
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type(
- (RateLimitError, APIConnectionError, APITimeoutError)
- ),
- )
- async def hf_model_if_cache(
- model,
- prompt,
- system_prompt=None,
- history_messages=[],
- enable_cot: bool = False,
- **kwargs,
- ) -> str:
- if enable_cot:
- from lightrag.utils import logger
- logger.debug(
- "enable_cot=True is not supported for Hugging Face local models and will be ignored."
- )
- model_name = model
- hf_model, hf_tokenizer = initialize_hf_model(model_name)
- messages = []
- if system_prompt:
- messages.append({"role": "system", "content": system_prompt})
- messages.extend(history_messages)
- messages.append({"role": "user", "content": prompt})
- kwargs.pop("hashing_kv", None)
- input_prompt = ""
- try:
- input_prompt = hf_tokenizer.apply_chat_template(
- messages, tokenize=False, add_generation_prompt=True
- )
- except Exception:
- try:
- ori_message = copy.deepcopy(messages)
- if messages[0]["role"] == "system":
- messages[1]["content"] = (
- "<system>"
- + messages[0]["content"]
- + "</system>\n"
- + messages[1]["content"]
- )
- messages = messages[1:]
- input_prompt = hf_tokenizer.apply_chat_template(
- messages, tokenize=False, add_generation_prompt=True
- )
- except Exception:
- len_message = len(ori_message)
- for msgid in range(len_message):
- input_prompt = (
- input_prompt
- + "<"
- + ori_message[msgid]["role"]
- + ">"
- + ori_message[msgid]["content"]
- + "</"
- + ori_message[msgid]["role"]
- + ">\n"
- )
- input_ids = hf_tokenizer(
- input_prompt, return_tensors="pt", padding=True, truncation=True
- ).to("cuda")
- inputs = {k: v.to(hf_model.device) for k, v in input_ids.items()}
- output = hf_model.generate(
- **input_ids, max_new_tokens=512, num_return_sequences=1, early_stopping=True
- )
- response_text = hf_tokenizer.decode(
- output[0][len(inputs["input_ids"][0]) :], skip_special_tokens=True
- )
- return response_text
- async def hf_model_complete(
- prompt,
- system_prompt=None,
- history_messages=[],
- keyword_extraction=False,
- entity_extraction=False,
- enable_cot: bool = False,
- **kwargs,
- ) -> str:
- """Run local Hugging Face inference with LightRAG-compatible shims.
- Structured output note:
- - This adapter does not support OpenAI-style ``response_format`` JSON mode.
- - If callers pass ``response_format``, it is stripped before generation.
- - Deprecated ``keyword_extraction`` and ``entity_extraction`` booleans are
- accepted only as compatibility shims; they emit warnings and are ignored.
- """
- # HuggingFace local inference has no JSON mode; drop response_format and
- # warn when legacy shim flags are set.
- if kwargs.pop("keyword_extraction", False) or keyword_extraction:
- warnings.warn(
- "hf_model_complete(keyword_extraction=True) is deprecated; "
- "pass response_format={'type': 'json_object'} instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- if kwargs.pop("entity_extraction", False) or entity_extraction:
- warnings.warn(
- "hf_model_complete(entity_extraction=True) is deprecated; "
- "pass response_format={'type': 'json_object'} instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- kwargs.pop("response_format", None)
- model_name = kwargs["hashing_kv"].global_config["llm_model_name"]
- result = await hf_model_if_cache(
- model_name,
- prompt,
- system_prompt=system_prompt,
- history_messages=history_messages,
- enable_cot=enable_cot,
- **kwargs,
- )
- return result
- @wrap_embedding_func_with_attrs(
- embedding_dim=1024,
- max_token_size=8192,
- model_name="hf_embedding_model",
- supports_asymmetric=True,
- )
- async def hf_embed(
- texts: list[str],
- tokenizer,
- embed_model,
- context: str = "document",
- query_prefix: str | None = None,
- document_prefix: str | None = None,
- ) -> np.ndarray:
- """Generate embeddings for a list of texts using a Hugging Face model.
- Args:
- texts (list[str]): List of input texts to embed.
- tokenizer: Hugging Face tokenizer.
- embed_model: Hugging Face model for generating embeddings.
- context (str): Context indicating whether the texts are "query" or "document".
- query_prefix (str | None): Optional prefix to add to query texts.
- document_prefix (str | None): Optional prefix to add to document texts.
- Returns:
- np.ndarray: Array of embeddings.
- """
- # Detect the appropriate device
- if torch.cuda.is_available():
- device = next(embed_model.parameters()).device # Use CUDA if available
- elif torch.backends.mps.is_available():
- device = torch.device("mps") # Use MPS for Apple Silicon
- else:
- device = torch.device("cpu") # Fallback to CPU
- # Move the model to the detected device
- embed_model = embed_model.to(device)
- # Apply context-based prefixes if provided
- if context == "query" and query_prefix:
- texts = [query_prefix + text for text in texts]
- elif context == "document" and document_prefix:
- texts = [document_prefix + text for text in texts]
- # Tokenize the input texts and move them to the same device
- encoded_texts = tokenizer(
- texts, return_tensors="pt", padding=True, truncation=True
- ).to(device)
- # Perform inference
- with torch.no_grad():
- outputs = embed_model(
- input_ids=encoded_texts["input_ids"],
- attention_mask=encoded_texts["attention_mask"],
- )
- embeddings = outputs.last_hidden_state.mean(dim=1)
- # Convert embeddings to NumPy
- if embeddings.dtype == torch.bfloat16:
- return embeddings.detach().to(torch.float32).cpu().numpy()
- else:
- return embeddings.detach().cpu().numpy()
|