semantic_vector.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. """Semantic vector chunking — the ``"V"`` strategy.
  2. Wraps LangChain's :class:`SemanticChunker` (from ``langchain-experimental``)
  3. which splits text by sentence embeddings: it first segments the input into
  4. sentences, embeds each sentence (in adjacent windows of ``buffer_size``),
  5. and finds breakpoints where the cosine distance between consecutive
  6. windows crosses a threshold derived from the chosen distribution
  7. (``percentile`` / ``standard_deviation`` / ``interquartile`` /
  8. ``gradient``).
  9. The chunker exposed here is ``async`` because LightRAG's
  10. :class:`EmbeddingFunc` is async. Internally we call SemanticChunker
  11. synchronously inside :func:`asyncio.to_thread` and bridge the embedding
  12. calls back to the main event loop via
  13. :func:`asyncio.run_coroutine_threadsafe`.
  14. Caveats:
  15. - SemanticChunker does NOT enforce a maximum chunk size; the caller's
  16. ``chunk_token_size`` is *advisory* here. Oversized chunks will be
  17. hard-split before embedding by
  18. :func:`lightrag.utils.enforce_chunk_token_limit_before_embedding`.
  19. - When ``embedding_func`` is ``None`` we log a warning and fall back to
  20. :func:`lightrag.chunker.chunking_by_recursive_character` — V's only
  21. differentiator is embeddings, and R is the closest structural-only
  22. alternative.
  23. """
  24. from __future__ import annotations
  25. import asyncio
  26. from typing import Any
  27. from lightrag.constants import DEFAULT_SENTENCE_SPLIT_REGEX
  28. from lightrag.utils import EmbeddingFunc, Tokenizer, logger
  29. try:
  30. from langchain_core.embeddings import Embeddings
  31. from langchain_experimental.text_splitter import SemanticChunker
  32. _LANGCHAIN_EXPERIMENTAL_AVAILABLE = True
  33. except ImportError:
  34. _LANGCHAIN_EXPERIMENTAL_AVAILABLE = False
  35. Embeddings = object # type: ignore[assignment,misc]
  36. SemanticChunker = None # type: ignore[assignment]
  37. class _AsyncEmbeddingFuncAdapter(Embeddings):
  38. """Bridge a LightRAG :class:`EmbeddingFunc` (async) to LangChain's
  39. sync :class:`Embeddings` interface used by ``SemanticChunker``.
  40. The adapter must be constructed inside the running event loop so it
  41. can capture the loop reference; the blocking ``embed_documents`` /
  42. ``embed_query`` calls are then made from a worker thread (via
  43. :func:`asyncio.to_thread` in the public chunker) and bounce back to
  44. the captured loop with :func:`asyncio.run_coroutine_threadsafe`.
  45. """
  46. def __init__(
  47. self,
  48. embedding_func: EmbeddingFunc,
  49. loop: asyncio.AbstractEventLoop,
  50. ) -> None:
  51. self._embedding_func = embedding_func
  52. self._loop = loop
  53. def _run(self, texts: list[str], context: str) -> list[list[float]]:
  54. future = asyncio.run_coroutine_threadsafe(
  55. self._embedding_func(texts, context=context),
  56. self._loop,
  57. )
  58. result = future.result()
  59. return [list(map(float, vec)) for vec in result]
  60. def embed_documents(self, texts: list[str]) -> list[list[float]]:
  61. return self._run(list(texts), context="document")
  62. def embed_query(self, text: str) -> list[float]:
  63. return self._run([text], context="query")[0]
  64. async def chunking_by_semantic_vector(
  65. tokenizer: Tokenizer,
  66. content: str,
  67. chunk_token_size: int = 1200,
  68. *,
  69. embedding_func: EmbeddingFunc | None = None,
  70. breakpoint_threshold_type: str = "percentile",
  71. breakpoint_threshold_amount: float | None = None,
  72. buffer_size: int = 1,
  73. sentence_split_regex: str = DEFAULT_SENTENCE_SPLIT_REGEX,
  74. ) -> list[dict[str, Any]]:
  75. """Semantic vector chunker — the ``"V"`` chunking strategy.
  76. Args:
  77. tokenizer: LightRAG tokenizer (used for output token counts).
  78. content: Text to split.
  79. chunk_token_size: Hard upper bound (tokens). SemanticChunker does
  80. NOT enforce a maximum natively, so any piece that exceeds
  81. this value is re-split via
  82. :func:`chunking_by_recursive_character` before being emitted.
  83. embedding_func: LightRAG :class:`EmbeddingFunc`. When ``None``
  84. this chunker logs a warning and falls back to
  85. :func:`chunking_by_recursive_character`.
  86. breakpoint_threshold_type: ``percentile`` | ``standard_deviation``
  87. | ``interquartile`` | ``gradient`` (LangChain default:
  88. ``percentile``).
  89. breakpoint_threshold_amount: Threshold magnitude. ``None`` lets
  90. LangChain pick the per-type default (e.g. 95 for percentile).
  91. buffer_size: Number of adjacent sentences combined when computing
  92. distances (LangChain default: 1).
  93. sentence_split_regex: Pattern fed to LangChain's
  94. :class:`SemanticChunker` for the initial sentence split.
  95. Default extends the upstream English-only pattern with
  96. Chinese sentence terminators ``。?!`` so mixed-language and
  97. pure-Chinese inputs split correctly.
  98. Returns:
  99. Ordered list of ``{"tokens", "content", "chunk_order_index"}``
  100. dicts.
  101. """
  102. if not content or not content.strip():
  103. return []
  104. if embedding_func is None:
  105. # V's only differentiator is embeddings — without them the
  106. # closest neighbour is R's structural splitting. V chunks are
  107. # non-overlapping by design (semantic boundaries), so the
  108. # fallback uses ``chunk_overlap_token_size=0`` to preserve that
  109. # semantic and avoid LangChain's "overlap > chunk_size" guard
  110. # for very small ``chunk_token_size``.
  111. logger.warning(
  112. "[semantic_vector] embedding_func is None; falling back to "
  113. "recursive-character chunking."
  114. )
  115. from lightrag.chunker.recursive_character import (
  116. chunking_by_recursive_character,
  117. )
  118. return chunking_by_recursive_character(
  119. tokenizer,
  120. content,
  121. chunk_token_size,
  122. chunk_overlap_token_size=0,
  123. )
  124. if not _LANGCHAIN_EXPERIMENTAL_AVAILABLE:
  125. raise ImportError(
  126. "langchain-experimental is required for the 'V' chunking "
  127. "strategy; install with `pip install langchain-experimental>=0.3`."
  128. )
  129. loop = asyncio.get_running_loop()
  130. adapter = _AsyncEmbeddingFuncAdapter(embedding_func, loop)
  131. chunker_kwargs: dict[str, Any] = {
  132. "embeddings": adapter,
  133. "buffer_size": int(buffer_size),
  134. "breakpoint_threshold_type": breakpoint_threshold_type,
  135. "sentence_split_regex": sentence_split_regex,
  136. }
  137. if breakpoint_threshold_amount is not None:
  138. chunker_kwargs["breakpoint_threshold_amount"] = float(
  139. breakpoint_threshold_amount
  140. )
  141. splitter = SemanticChunker(**chunker_kwargs)
  142. pieces = await asyncio.to_thread(splitter.split_text, content)
  143. # SemanticChunker has no internal size cap; oversized pieces here
  144. # would otherwise rely on the embedding-time hard fallback (which
  145. # uses ``embedding_token_limit``, not ``chunk_token_size``) to split
  146. # them. Enforce ``chunk_token_size`` directly via R for any piece
  147. # that exceeds it so the user-configured size is actually honored.
  148. # Lazy import dodges the recursive_character ↔ semantic_vector
  149. # circular dependency (same pattern as the embedding-None fallback
  150. # above).
  151. from lightrag.chunker.recursive_character import (
  152. chunking_by_recursive_character,
  153. )
  154. target_max = max(int(chunk_token_size), 1)
  155. results: list[dict[str, Any]] = []
  156. for piece in pieces:
  157. body = piece.strip()
  158. if not body:
  159. continue
  160. piece_tokens = len(tokenizer.encode(body))
  161. if piece_tokens <= target_max:
  162. results.append(
  163. {
  164. "tokens": piece_tokens,
  165. "content": body,
  166. "chunk_order_index": len(results),
  167. }
  168. )
  169. continue
  170. # Oversized semantic piece: re-split via R while preserving the
  171. # surrounding chunk order. ``chunk_overlap_token_size=0`` keeps
  172. # V's non-overlapping semantics.
  173. sub_pieces = chunking_by_recursive_character(
  174. tokenizer,
  175. body,
  176. target_max,
  177. chunk_overlap_token_size=0,
  178. )
  179. for sub in sub_pieces:
  180. sub_body = sub.get("content", "")
  181. if not sub_body:
  182. continue
  183. results.append(
  184. {
  185. "tokens": sub.get("tokens", len(tokenizer.encode(sub_body))),
  186. "content": sub_body,
  187. "chunk_order_index": len(results),
  188. }
  189. )
  190. return results