| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- """Recursive character chunking — the ``"R"`` strategy.
- Wraps LangChain's :class:`RecursiveCharacterTextSplitter` and delivers
- output rows in the LightRAG file-chunker schema. The splitter walks the
- ``separators`` list from longest semantic boundary (``\\n\\n`` by default)
- to weakest (the empty string), recursively re-splitting any segment that
- still exceeds the token cap.
- Token accounting goes through the LightRAG :class:`Tokenizer` via the
- ``length_function`` plug-in — without that, ``chunk_size`` would be
- measured in characters and ``chunk_token_size`` would lose its meaning.
- Output cap is *not* enforced internally: oversized segments are produced
- when no separator can break them, and
- :func:`lightrag.utils.enforce_chunk_token_limit_before_embedding` does the
- final hard split before embedding.
- """
- from __future__ import annotations
- from typing import Any
- from lightrag.utils import Tokenizer, logger
- try:
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- _LANGCHAIN_TEXT_SPLITTERS_AVAILABLE = True
- except ImportError:
- _LANGCHAIN_TEXT_SPLITTERS_AVAILABLE = False
- RecursiveCharacterTextSplitter = None # type: ignore[assignment]
- def chunking_by_recursive_character(
- tokenizer: Tokenizer,
- content: str,
- chunk_token_size: int = 1200,
- *,
- chunk_overlap_token_size: int = 100,
- separators: list[str] | None = None,
- ) -> list[dict[str, Any]]:
- """Recursive character splitter — the ``"R"`` chunking strategy.
- Args:
- tokenizer: LightRAG tokenizer; used as the length function so
- ``chunk_token_size`` and ``chunk_overlap_token_size`` are
- interpreted in tokens, not characters.
- content: Text to split.
- chunk_token_size: Hard target size for each chunk (tokens).
- chunk_overlap_token_size: Token overlap between adjacent chunks.
- separators: Cascade of split candidates. ``None`` defers to
- LangChain's defaults: ``["\\n\\n", "\\n", " ", ""]``.
- Returns:
- Ordered list of ``{"tokens", "content", "chunk_order_index"}``
- dicts.
- """
- if not _LANGCHAIN_TEXT_SPLITTERS_AVAILABLE:
- raise ImportError(
- "langchain-text-splitters is required for the 'R' chunking "
- "strategy; install with `pip install langchain-text-splitters>=0.3`."
- )
- if not content or not content.strip():
- return []
- splitter_kwargs: dict[str, Any] = {
- "chunk_size": max(int(chunk_token_size), 1),
- "chunk_overlap": max(int(chunk_overlap_token_size), 0),
- "length_function": lambda s: len(tokenizer.encode(s)),
- }
- if separators is not None:
- splitter_kwargs["separators"] = list(separators)
- splitter = RecursiveCharacterTextSplitter(**splitter_kwargs)
- pieces = splitter.split_text(content)
- results: list[dict[str, Any]] = []
- for piece in pieces:
- body = piece.strip()
- if not body:
- continue
- results.append(
- {
- "tokens": len(tokenizer.encode(body)),
- "content": body,
- "chunk_order_index": len(results),
- }
- )
- if not results:
- # Defensive: splitter returned only whitespace fragments. Fall
- # through with a single chunk of stripped content so downstream
- # callers always receive at least one row when input is non-empty.
- logger.warning(
- "[recursive_character] splitter produced no non-empty chunks "
- "for %d-char input; emitting single fallback chunk.",
- len(content),
- )
- body = content.strip()
- if body:
- results.append(
- {
- "tokens": len(tokenizer.encode(body)),
- "content": body,
- "chunk_order_index": 0,
- }
- )
- return results
|