| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554 |
- """Tests for the ``chunk_options`` snapshot mechanism.
- Three properties under test:
- 1. **env-driven snapshot**: env vars (CHUNK_R_OVERLAP_SIZE,
- CHUNK_V_BREAKPOINT_THRESHOLD_TYPE, …) flow into
- ``addon_params['chunker']`` via
- :func:`lightrag.parser.routing.default_chunker_config`, then into
- ``full_docs[doc_id]['chunk_options']`` at enqueue time via
- :func:`lightrag.parser.routing.resolve_chunk_options`.
- 2. **caller-supplied chunk_options**: an explicit ``chunk_options``
- kwarg passed to ``apipeline_enqueue_documents`` is persisted
- verbatim and reaches the dispatched chunker as keyword args.
- 3. **per-file chunk_options as a list**: when chunk_options is a
- ``list[dict]`` aligned with ``input``, each doc gets its own
- independent persisted snapshot.
- """
- import asyncio
- from pathlib import Path
- import numpy as np
- import pytest
- from lightrag import LightRAG, ROLES, RoleLLMConfig
- from lightrag.constants import DEFAULT_R_SEPARATORS
- from lightrag.utils import EmbeddingFunc, Tokenizer, TokenizerInterface
- class _SimpleTokenizerImpl(TokenizerInterface):
- def encode(self, content: str):
- return [ord(ch) for ch in content]
- def decode(self, tokens):
- return "".join(chr(t) for t in tokens)
- async def _mock_embedding(texts: list[str]) -> np.ndarray:
- return np.random.rand(len(texts), 32)
- async def _mock_llm(prompt, **kwargs):
- return '{"name":"x","summary":"s","detail_description":"d"}'
- _ROLE_FIELD_SUFFIXES = (
- ("_llm_model_func", "func"),
- ("_llm_model_kwargs", "kwargs"),
- ("_llm_model_max_async", "max_async"),
- ("_llm_timeout", "timeout"),
- )
- def _new_rag(tmp_path: Path, **kwargs) -> LightRAG:
- role_configs: dict[str, RoleLLMConfig] = {}
- for spec in ROLES:
- bucket = {}
- for suffix, target in _ROLE_FIELD_SUFFIXES:
- key = f"{spec.name}{suffix}"
- if key in kwargs:
- bucket[target] = kwargs.pop(key)
- if bucket:
- role_configs[spec.name] = RoleLLMConfig(**bucket)
- if role_configs:
- kwargs["role_llm_configs"] = role_configs
- return LightRAG(
- working_dir=str(tmp_path),
- workspace=f"chunkopts-{tmp_path.name}",
- llm_model_func=_mock_llm,
- embedding_func=EmbeddingFunc(
- embedding_dim=32,
- max_token_size=4096,
- func=_mock_embedding,
- ),
- tokenizer=Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()),
- **kwargs,
- )
- @pytest.mark.offline
- def test_env_driven_snapshot_persisted_in_full_docs(tmp_path, monkeypatch):
- """Env vars + ainsert split args land in ``full_docs.chunk_options``.
- The persisted snapshot is slim — only the strategy slot selected by
- ``process_options`` survives — so each strategy is verified through
- its own enqueue with the matching selector.
- """
- monkeypatch.setenv("CHUNK_R_OVERLAP_SIZE", "42")
- monkeypatch.setenv("CHUNK_V_BREAKPOINT_THRESHOLD_TYPE", "interquartile")
- monkeypatch.setenv("CHUNK_V_BUFFER_SIZE", "3")
- async def _run():
- from lightrag.parser.routing import resolve_chunk_options
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- # F slot — mirror what ``LightRAG.ainsert`` does: build the
- # F-scoped chunk_options snapshot from addon_params plus
- # F-strategy runtime args, then hand it to enqueue.
- chunk_opts_f = resolve_chunk_options(
- rag.addon_params,
- process_options="F",
- split_by_character="\n\n",
- split_by_character_only=True,
- )
- await rag.apipeline_enqueue_documents(
- "Body for F-strategy snapshot test.",
- ids=["doc-snap-f"],
- file_paths="snap-f.txt",
- track_id="track-snap-f",
- chunk_options=chunk_opts_f,
- )
- row_f = await rag.full_docs.get_by_id("doc-snap-f")
- # R slot — env-driven CHUNK_R_OVERLAP_SIZE flows through
- # addon_params['chunker'] into the persisted snapshot.
- await rag.apipeline_enqueue_documents(
- "Body for R-strategy snapshot test.",
- ids=["doc-snap-r"],
- file_paths="snap-r.[native-R].txt",
- track_id="track-snap-r",
- process_options="R",
- )
- row_r = await rag.full_docs.get_by_id("doc-snap-r")
- # V slot — env-driven CHUNK_V_* params likewise.
- await rag.apipeline_enqueue_documents(
- "Body for V-strategy snapshot test.",
- ids=["doc-snap-v"],
- file_paths="snap-v.[native-V].txt",
- track_id="track-snap-v",
- process_options="V",
- )
- row_v = await rag.full_docs.get_by_id("doc-snap-v")
- finally:
- await rag.finalize_storages()
- return row_f, row_r, row_v
- row_f, row_r, row_v = asyncio.run(_run())
- assert row_f is not None and row_r is not None and row_v is not None
- f_opts = row_f["chunk_options"]
- assert f_opts["fixed_token"]["split_by_character"] == "\n\n"
- assert f_opts["fixed_token"]["split_by_character_only"] is True
- # Slim contract: only the active strategy survives.
- assert "recursive_character" not in f_opts
- assert "semantic_vector" not in f_opts
- assert "paragraph_semantic" not in f_opts
- r_opts = row_r["chunk_options"]
- assert r_opts["recursive_character"]["chunk_overlap_token_size"] == 42
- assert "fixed_token" not in r_opts
- v_opts = row_v["chunk_options"]
- assert v_opts["semantic_vector"]["breakpoint_threshold_type"] == "interquartile"
- assert v_opts["semantic_vector"]["buffer_size"] == 3
- assert "fixed_token" not in v_opts
- @pytest.mark.offline
- def test_caller_supplied_chunk_options_reach_chunker(tmp_path, monkeypatch):
- """A caller-supplied ``chunk_options`` dict is persisted verbatim
- and the dispatcher splats it into the chunker call."""
- pytest.importorskip("langchain_text_splitters")
- import lightrag.chunker as chunker_pkg
- custom_options = {
- "chunk_token_size": 100,
- "fixed_token": {
- "chunk_overlap_token_size": 5,
- "split_by_character": None,
- "split_by_character_only": False,
- },
- "recursive_character": {
- "chunk_overlap_token_size": 0,
- "separators": ["|", ""],
- },
- "semantic_vector": {
- "breakpoint_threshold_type": "percentile",
- "breakpoint_threshold_amount": None,
- "buffer_size": 1,
- },
- "paragraph_semantic": {},
- }
- captured: dict = {}
- def _r_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- captured["kwargs"] = dict(kwargs)
- return [
- {"tokens": 5, "content": "stub", "chunk_order_index": 0},
- ]
- monkeypatch.setattr(chunker_pkg, "chunking_by_recursive_character", _r_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "alpha|beta|gamma|delta",
- file_paths="caller.[native-R].txt",
- track_id="track-caller",
- process_options="R",
- chunk_options=custom_options,
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert (
- captured.get("chunk_token_size") == 100
- ), f"R chunker must receive caller-supplied chunk_token_size; got {captured!r}"
- assert captured["kwargs"]["separators"] == ["|", ""]
- assert captured["kwargs"]["chunk_overlap_token_size"] == 0
- @pytest.mark.offline
- def test_per_file_chunk_options_list(tmp_path, monkeypatch):
- """A ``chunk_options`` list aligned with ``input`` writes
- independent snapshots per doc.
- The two docs use ``process_options="R"`` so the slim snapshot
- keeps their distinct R-strategy params; F/V/P sub-dicts in the
- caller-supplied input are dropped by design.
- """
- opts_a = {
- "chunk_token_size": 1200,
- "fixed_token": {
- "chunk_overlap_token_size": 100,
- "split_by_character": None,
- "split_by_character_only": False,
- },
- "recursive_character": {
- "chunk_overlap_token_size": 100,
- "separators": ["A_SEP"],
- },
- "semantic_vector": {
- "breakpoint_threshold_type": "percentile",
- "breakpoint_threshold_amount": None,
- "buffer_size": 1,
- },
- "paragraph_semantic": {},
- }
- opts_b = {
- "chunk_token_size": 1200,
- "fixed_token": {
- "chunk_overlap_token_size": 100,
- "split_by_character": None,
- "split_by_character_only": False,
- },
- "recursive_character": {
- "chunk_overlap_token_size": 100,
- "separators": ["B_SEP"],
- },
- "semantic_vector": {
- "breakpoint_threshold_type": "percentile",
- "breakpoint_threshold_amount": None,
- "buffer_size": 1,
- },
- "paragraph_semantic": {},
- }
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- ["doc one body", "doc two body"],
- ids=["doc-aaaaa-list", "doc-bbbbb-list"],
- file_paths=["a.[native-R].txt", "b.[native-R].txt"],
- track_id="track-list",
- process_options=["R", "R"],
- chunk_options=[opts_a, opts_b],
- )
- row_a = await rag.full_docs.get_by_id("doc-aaaaa-list")
- row_b = await rag.full_docs.get_by_id("doc-bbbbb-list")
- finally:
- await rag.finalize_storages()
- return row_a, row_b
- row_a, row_b = asyncio.run(_run())
- assert row_a is not None and row_b is not None
- sep_a = row_a["chunk_options"]["recursive_character"]["separators"]
- sep_b = row_b["chunk_options"]["recursive_character"]["separators"]
- assert sep_a == ["A_SEP"]
- assert sep_b == ["B_SEP"]
- # Independence: mutating one snapshot must not bleed into the other.
- sep_a.append("MUT")
- assert "MUT" not in row_b["chunk_options"]["recursive_character"]["separators"]
- # Slim contract: non-R strategy slots are dropped from the persisted
- # snapshot since they would never be consumed at process time.
- assert "fixed_token" not in row_a["chunk_options"]
- assert "semantic_vector" not in row_a["chunk_options"]
- assert "paragraph_semantic" not in row_a["chunk_options"]
- @pytest.mark.offline
- def test_constructor_chunk_size_overlays_addon_params(tmp_path, monkeypatch):
- """``LightRAG(chunk_token_size=N, chunk_overlap_token_size=M)`` must
- actually take effect — the per-doc snapshot is built from
- ``addon_params['chunker']``, so the constructor values have to be
- overlaid onto it (otherwise env-driven defaults would silently win).
- """
- # Set env vars to non-default values so the env path would be
- # observably different from the constructor path.
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- monkeypatch.setenv("CHUNK_OVERLAP_SIZE", "100")
- async def _run():
- rag = _new_rag(
- tmp_path,
- chunk_token_size=7,
- chunk_overlap_token_size=2,
- )
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "Body for constructor overlay test.",
- ids=["doc-ctor-overlay"],
- file_paths="ctor.txt",
- track_id="track-ctor",
- )
- row = await rag.full_docs.get_by_id("doc-ctor-overlay")
- finally:
- await rag.finalize_storages()
- return row, rag.addon_params
- row, addon_params = asyncio.run(_run())
- assert row is not None
- chunk_opts = row["chunk_options"]
- # Top-level chunk_token_size carries the constructor value.
- assert chunk_opts["chunk_token_size"] == 7
- # Default-F doc: the persisted slim snapshot only carries F's slot.
- assert chunk_opts["fixed_token"]["chunk_overlap_token_size"] == 2
- assert "recursive_character" not in chunk_opts
- assert "semantic_vector" not in chunk_opts
- assert "paragraph_semantic" not in chunk_opts
- # addon_params still reflects the constructor overlay across every
- # strategy so subsequent enqueues with other selectors pick up the
- # same baseline. V doesn't have chunk_overlap_token_size and must
- # remain unchanged.
- assert addon_params["chunker"]["chunk_token_size"] == 7
- assert addon_params["chunker"]["fixed_token"]["chunk_overlap_token_size"] == 2
- assert (
- addon_params["chunker"]["recursive_character"]["chunk_overlap_token_size"] == 2
- )
- assert (
- addon_params["chunker"]["paragraph_semantic"]["chunk_overlap_token_size"] == 2
- )
- assert "chunk_overlap_token_size" not in addon_params["chunker"]["semantic_vector"]
- @pytest.mark.offline
- def test_addon_params_chunker_wins_when_constructor_field_unset(tmp_path):
- """If the constructor field is left at its default (``None``), an
- explicit ``addon_params={'chunker': {...}}`` must NOT be clobbered.
- """
- async def _run():
- rag = _new_rag(
- tmp_path,
- addon_params={
- "chunker": {
- "chunk_token_size": 5000,
- "fixed_token": {
- "chunk_overlap_token_size": 250,
- "split_by_character": None,
- "split_by_character_only": False,
- },
- "recursive_character": {
- "chunk_overlap_token_size": 250,
- "separators": ["\n\n", "\n", " ", ""],
- },
- "semantic_vector": {
- "breakpoint_threshold_type": "percentile",
- "breakpoint_threshold_amount": None,
- "buffer_size": 1,
- },
- "paragraph_semantic": {},
- },
- },
- )
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "Body for addon-only overlay test.",
- ids=["doc-addon-only"],
- file_paths="addon.txt",
- track_id="track-addon",
- )
- row = await rag.full_docs.get_by_id("doc-addon-only")
- finally:
- await rag.finalize_storages()
- return row, rag.chunk_token_size, rag.chunk_overlap_token_size
- row, ctor_size, ctor_overlap = asyncio.run(_run())
- assert row is not None
- assert row["chunk_options"]["chunk_token_size"] == 5000
- assert row["chunk_options"]["fixed_token"]["chunk_overlap_token_size"] == 250
- # Legacy instance fields back-fill from addon_params (not env defaults).
- assert ctor_size == 5000
- assert ctor_overlap == 250
- @pytest.mark.offline
- def test_strategy_env_wins_over_legacy_ctor_field(tmp_path, monkeypatch):
- """Specificity-ordered precedence: strategy-specific env vars beat
- the strategy-agnostic legacy constructor field.
- Setup: ``CHUNK_R_OVERLAP_SIZE=42`` is strategy-specific for R.
- ``LightRAG(chunk_overlap_token_size=2)`` is the legacy
- strategy-agnostic field. R must end up at 42 (env wins on its own
- strategy slot), F at 2 (no F-specific env, so legacy field fills).
- """
- monkeypatch.setenv("CHUNK_R_OVERLAP_SIZE", "42")
- monkeypatch.delenv("CHUNK_F_OVERLAP_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_OVERLAP_SIZE", raising=False)
- async def _run():
- rag = _new_rag(tmp_path, chunk_overlap_token_size=2)
- await rag.initialize_storages()
- try:
- # R-strategy doc — strategy-specific env wins.
- await rag.apipeline_enqueue_documents(
- "Body for R precedence test.",
- ids=["doc-prec-r"],
- file_paths="prec-r.[native-R].txt",
- track_id="track-prec-r",
- process_options="R",
- )
- row_r = await rag.full_docs.get_by_id("doc-prec-r")
- # F-strategy doc — no F-specific env, ctor field fills.
- await rag.apipeline_enqueue_documents(
- "Body for F precedence test.",
- ids=["doc-prec-f"],
- file_paths="prec-f.txt",
- track_id="track-prec-f",
- )
- row_f = await rag.full_docs.get_by_id("doc-prec-f")
- # P-strategy doc — no P-specific env, ctor field fills.
- await rag.apipeline_enqueue_documents(
- "Body for P precedence test.",
- ids=["doc-prec-p"],
- file_paths="prec-p.[native-P].txt",
- track_id="track-prec-p",
- process_options="P",
- )
- row_p = await rag.full_docs.get_by_id("doc-prec-p")
- finally:
- await rag.finalize_storages()
- return row_r, row_f, row_p, rag.chunk_overlap_token_size
- row_r, row_f, row_p, ctor_field = asyncio.run(_run())
- assert (
- row_r["chunk_options"]["recursive_character"]["chunk_overlap_token_size"] == 42
- ), (
- "Strategy-specific CHUNK_R_OVERLAP_SIZE must win over the "
- "legacy chunk_overlap_token_size constructor field."
- )
- assert row_f["chunk_options"]["fixed_token"]["chunk_overlap_token_size"] == 2, (
- "Without a CHUNK_F_OVERLAP_SIZE override, the F slot falls back "
- "to the legacy constructor field."
- )
- assert row_p["chunk_options"]["paragraph_semantic"]["chunk_overlap_token_size"] == 2
- # self.chunk_overlap_token_size mirrors the F-strategy resolved value.
- assert ctor_field == 2
- @pytest.mark.offline
- def test_legacy_env_is_final_fallback(tmp_path, monkeypatch):
- """When neither a strategy env nor the legacy ctor field is set,
- the legacy ``CHUNK_OVERLAP_SIZE`` env is the final fallback for
- every per-strategy overlap slot."""
- monkeypatch.delenv("CHUNK_F_OVERLAP_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_R_OVERLAP_SIZE", raising=False)
- monkeypatch.setenv("CHUNK_OVERLAP_SIZE", "77")
- async def _run():
- rag = _new_rag(tmp_path) # no chunk_overlap_token_size kwarg
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- ["F body", "R body", "P body"],
- ids=["doc-legacy-f", "doc-legacy-r", "doc-legacy-p"],
- file_paths=[
- "legacy-f.txt",
- "legacy-r.[native-R].txt",
- "legacy-p.[native-P].txt",
- ],
- track_id="track-legacy",
- process_options=["", "R", "P"],
- )
- row_f = await rag.full_docs.get_by_id("doc-legacy-f")
- row_r = await rag.full_docs.get_by_id("doc-legacy-r")
- row_p = await rag.full_docs.get_by_id("doc-legacy-p")
- finally:
- await rag.finalize_storages()
- return row_f, row_r, row_p, rag.chunk_overlap_token_size
- row_f, row_r, row_p, ctor_field = asyncio.run(_run())
- assert row_f["chunk_options"]["fixed_token"]["chunk_overlap_token_size"] == 77
- assert (
- row_r["chunk_options"]["recursive_character"]["chunk_overlap_token_size"] == 77
- )
- assert (
- row_p["chunk_options"]["paragraph_semantic"]["chunk_overlap_token_size"] == 77
- )
- assert ctor_field == 77
- # Mixed case: F-specific env set, legacy still acts as R's fallback.
- monkeypatch.setenv("CHUNK_F_OVERLAP_SIZE", "10")
- async def _run_mixed():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- ["F mixed body", "R mixed body", "P mixed body"],
- ids=["doc-mixed-f", "doc-mixed-r", "doc-mixed-p"],
- file_paths=[
- "mixed-f.txt",
- "mixed-r.[native-R].txt",
- "mixed-p.[native-P].txt",
- ],
- track_id="track-mixed",
- process_options=["", "R", "P"],
- )
- row_f = await rag.full_docs.get_by_id("doc-mixed-f")
- row_r = await rag.full_docs.get_by_id("doc-mixed-r")
- row_p = await rag.full_docs.get_by_id("doc-mixed-p")
- finally:
- await rag.finalize_storages()
- return row_f, row_r, row_p
- row_f, row_r, row_p = asyncio.run(_run_mixed())
- assert row_f["chunk_options"]["fixed_token"]["chunk_overlap_token_size"] == 10
- assert (
- row_r["chunk_options"]["recursive_character"]["chunk_overlap_token_size"] == 77
- )
- assert (
- row_p["chunk_options"]["paragraph_semantic"]["chunk_overlap_token_size"] == 77
- )
- @pytest.mark.offline
- def test_p_strategy_uses_dedicated_chunk_size_env(tmp_path, monkeypatch):
- """``CHUNK_P_SIZE`` must give P its own ``chunk_token_size``,
- decoupled from the global ``CHUNK_SIZE`` shared by F/R/V."""
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- monkeypatch.setenv("CHUNK_P_SIZE", "999")
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- captured["blocks_path"] = blocks_path
- captured["kwargs"] = dict(kwargs)
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "stand-in body for paragraph-semantic chunker",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-size",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 999, (
- "P chunker must receive CHUNK_P_SIZE-derived chunk_token_size, "
- f"not the global CHUNK_SIZE; got {captured!r}"
- )
- # And the dispatcher must not double-pass chunk_token_size as kwarg.
- assert "chunk_token_size" not in captured["kwargs"]
- @pytest.mark.offline
- def test_p_strategy_defaults_to_dedicated_size_when_env_unset(tmp_path, monkeypatch):
- """When ``CHUNK_P_SIZE`` is unset, P uses ``DEFAULT_CHUNK_P_SIZE``
- rather than inheriting the global ``CHUNK_SIZE`` or
- ``LightRAG(chunk_token_size=…)``. Paragraph-semantic merging needs
- more headroom than the global default to keep related paragraphs
- together; silently inheriting the smaller global ceiling defeats
- the strategy's purpose."""
- from lightrag.constants import DEFAULT_CHUNK_P_SIZE
- monkeypatch.delenv("CHUNK_P_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- # Pass an explicit ctor chunk_token_size that differs from the
- # P default — proves P is decoupled from the global chain.
- rag = _new_rag(tmp_path, chunk_token_size=333)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "fallback body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-fallback",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == DEFAULT_CHUNK_P_SIZE
- @pytest.mark.offline
- def test_p_strategy_default_size_survives_partial_addon_params(tmp_path, monkeypatch):
- """When the caller hands in a partial ``addon_params['chunker']``
- that lacks ``paragraph_semantic.chunk_token_size``,
- ``normalize_addon_params`` does NOT re-run ``default_chunker_config``,
- so the slot would silently fall back to the top-level resolved
- chunk size in the pipeline. ``_apply_chunk_size_overlay`` backfills
- ``DEFAULT_CHUNK_P_SIZE`` as the last guard."""
- from lightrag.constants import DEFAULT_CHUNK_P_SIZE
- monkeypatch.delenv("CHUNK_P_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(
- tmp_path,
- chunk_token_size=333,
- addon_params={"chunker": {"paragraph_semantic": {}}},
- )
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "partial addon body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-partial",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == DEFAULT_CHUNK_P_SIZE, (
- "P chunker must use DEFAULT_CHUNK_P_SIZE even when caller passes "
- "a partial addon_params chunker dict; got "
- f"{captured.get('chunk_token_size')!r}"
- )
- @pytest.mark.offline
- def test_p_strategy_partial_addon_params_still_picks_up_env(tmp_path, monkeypatch):
- """When the caller hands in a partial ``addon_params['chunker']``
- that lacks ``paragraph_semantic.chunk_token_size`` AND
- ``CHUNK_P_SIZE`` env IS set, the overlay must pick up the env
- value rather than skipping straight to ``DEFAULT_CHUNK_P_SIZE``.
- Precedence: explicit addon_params > CHUNK_P_SIZE env >
- DEFAULT_CHUNK_P_SIZE. Without env-aware backfill the partial-
- addon-params path silently ignores deployment .env settings."""
- monkeypatch.setenv("CHUNK_P_SIZE", "4096")
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(
- tmp_path,
- chunk_token_size=333,
- addon_params={"chunker": {"paragraph_semantic": {}}},
- )
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "partial addon body with env",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-partial-env",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 4096, (
- "Partial addon_params must not mask CHUNK_P_SIZE env; got "
- f"{captured.get('chunk_token_size')!r}"
- )
- @pytest.mark.offline
- def test_p_strategy_runtime_chunker_mutation_picks_up_env(tmp_path, monkeypatch):
- """Runtime mutation via ``rag.addon_params["chunker"] = {...}``
- triggers ``ObservableAddonParams.__setitem__`` which only marks
- addon_params dirty — it does NOT re-run
- ``_apply_chunk_size_overlay``. ``resolve_chunk_options`` is the
- last chokepoint and must backfill P's chunk_token_size from
- ``CHUNK_P_SIZE`` env (or ``DEFAULT_CHUNK_P_SIZE``) when the
- mutation left the slot empty.
- Without that backfill, P silently inherits the top-level
- ``chunk_token_size`` (here ``333``) — the exact failure mode the
- dedicated default exists to prevent."""
- monkeypatch.setenv("CHUNK_P_SIZE", "4096")
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=333)
- await rag.initialize_storages()
- try:
- # Subscript assignment — bypasses _apply_chunk_size_overlay.
- rag.addon_params["chunker"] = {"paragraph_semantic": {}}
- await rag.apipeline_enqueue_documents(
- "runtime mutation body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-runtime",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 4096, (
- "Runtime chunker mutation must not let P inherit the top-level "
- f"chunk_token_size; got {captured.get('chunk_token_size')!r}"
- )
- @pytest.mark.offline
- def test_p_strategy_runtime_chunker_mutation_uses_default_when_env_unset(
- tmp_path, monkeypatch
- ):
- """Sibling of the env-aware case: with ``CHUNK_P_SIZE`` unset,
- runtime-mutation enqueue still gets ``DEFAULT_CHUNK_P_SIZE``
- rather than the top-level ``chunk_token_size``."""
- from lightrag.constants import DEFAULT_CHUNK_P_SIZE
- monkeypatch.delenv("CHUNK_P_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=333)
- await rag.initialize_storages()
- try:
- rag.addon_params["chunker"] = {"paragraph_semantic": {}}
- await rag.apipeline_enqueue_documents(
- "runtime mutation default body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-runtime-default",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == DEFAULT_CHUNK_P_SIZE
- @pytest.mark.offline
- def test_p_strategy_caller_chunk_options_picks_up_env(tmp_path, monkeypatch):
- """``apipeline_enqueue_documents(..., chunk_options=...)`` skips
- ``resolve_chunk_options`` and goes through ``slim_chunk_options``
- directly. The P backfill must still kick in there so an
- explicit ``chunk_options`` that omits the P slot does not let P
- fall back to the top-level ``chunk_token_size``."""
- monkeypatch.setenv("CHUNK_P_SIZE", "4096")
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=333)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "caller chunk_options body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-caller-chunkopts",
- process_options="P",
- # Explicit kwarg path — bypasses resolve_chunk_options.
- # Also includes a top-level chunk_token_size to verify
- # P does NOT inherit it.
- chunk_options={
- "chunk_token_size": 333,
- "paragraph_semantic": {},
- },
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 4096, (
- "P must not inherit caller-supplied top-level chunk_token_size; "
- f"got {captured.get('chunk_token_size')!r}"
- )
- @pytest.mark.offline
- def test_p_strategy_caller_chunk_options_uses_default_when_env_unset(
- tmp_path, monkeypatch
- ):
- """Sibling of the env-aware case: with ``CHUNK_P_SIZE`` unset and
- a caller-supplied ``chunk_options`` that omits the P slot, the
- P backfill resolves to ``DEFAULT_CHUNK_P_SIZE`` — not the
- caller's top-level ``chunk_token_size``."""
- from lightrag.constants import DEFAULT_CHUNK_P_SIZE
- monkeypatch.delenv("CHUNK_P_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=333)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "caller chunk_options default body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-caller-default",
- process_options="P",
- chunk_options={
- "chunk_token_size": 333,
- "paragraph_semantic": {},
- },
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == DEFAULT_CHUNK_P_SIZE
- @pytest.mark.offline
- def test_p_strategy_caller_chunk_options_respects_explicit_p_size(
- tmp_path, monkeypatch
- ):
- """Caller-supplied ``chunk_options`` carrying an explicit
- ``paragraph_semantic.chunk_token_size`` must win over both env
- and ``DEFAULT_CHUNK_P_SIZE``."""
- monkeypatch.setenv("CHUNK_P_SIZE", "4096")
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "caller chunk_options explicit P size body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-caller-explicit",
- process_options="P",
- chunk_options={
- "paragraph_semantic": {"chunk_token_size": 8192},
- },
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 8192
- @pytest.mark.offline
- def test_p_strategy_respects_explicit_addon_params_chunk_size(tmp_path, monkeypatch):
- """``setdefault`` must not clobber an explicit
- ``paragraph_semantic.chunk_token_size`` the caller did provide."""
- monkeypatch.delenv("CHUNK_P_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(
- tmp_path,
- addon_params={
- "chunker": {"paragraph_semantic": {"chunk_token_size": 4096}}
- },
- )
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "explicit addon body",
- file_paths="ctor.[native-P].txt",
- track_id="track-p-explicit",
- process_options="P",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 4096
- @pytest.mark.offline
- def test_p_strategy_uses_dedicated_overlap_env(tmp_path, monkeypatch):
- monkeypatch.setenv("CHUNK_OVERLAP_SIZE", "11")
- monkeypatch.setenv("CHUNK_P_OVERLAP_SIZE", "66")
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _p_spy(tokenizer, content, chunk_token_size, *, blocks_path=None, **kwargs):
- captured["kwargs"] = dict(kwargs)
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_paragraph_semantic", _p_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "P overlap body",
- ids=["doc-p-overlap"],
- file_paths="ctor.[native-P].txt",
- track_id="track-p-overlap",
- process_options="P",
- )
- row = await rag.full_docs.get_by_id("doc-p-overlap")
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- return row
- row = asyncio.run(_run())
- assert row["chunk_options"]["paragraph_semantic"]["chunk_overlap_token_size"] == 66
- assert captured["kwargs"]["chunk_overlap_token_size"] == 66
- @pytest.mark.offline
- def test_addon_params_strategy_wins_over_strategy_env(tmp_path, monkeypatch):
- """Highest tier check: a value sitting in
- ``addon_params['chunker'][<strategy>]['chunk_overlap_token_size']``
- must beat even a strategy-specific env."""
- monkeypatch.setenv("CHUNK_R_OVERLAP_SIZE", "42")
- async def _run():
- rag = _new_rag(
- tmp_path,
- addon_params={
- "chunker": {
- "recursive_character": {
- "chunk_overlap_token_size": 999,
- "separators": ["\n\n", "\n", " ", ""],
- },
- },
- },
- )
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "Body for addon-vs-env precedence test.",
- ids=["doc-addon-vs-env"],
- file_paths="addon.[native-R].txt",
- track_id="track-addon",
- process_options="R",
- )
- row = await rag.full_docs.get_by_id("doc-addon-vs-env")
- finally:
- await rag.finalize_storages()
- return row
- row = asyncio.run(_run())
- chunk_opts = row["chunk_options"]
- assert (
- chunk_opts["recursive_character"]["chunk_overlap_token_size"] == 999
- ), "addon_params explicit value must beat strategy-specific env."
- @pytest.mark.offline
- def test_runtime_addon_params_mutation_affects_subsequent_enqueue(tmp_path):
- """Mutating ``rag.addon_params['chunker']`` after construction must
- take effect for documents enqueued *after* the mutation, while
- documents enqueued *before* keep their frozen snapshot.
- """
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- # Doc A enqueued under default config (R strategy so the
- # mutated separators land in the persisted slim snapshot).
- await rag.apipeline_enqueue_documents(
- "first doc body",
- ids=["doc-pre-mutation"],
- file_paths=["pre.[native-R].txt"],
- track_id="track-pre",
- process_options="R",
- )
- row_pre = await rag.full_docs.get_by_id("doc-pre-mutation")
- sep_pre = list(
- row_pre["chunk_options"]["recursive_character"]["separators"]
- )
- # Mutate the runtime defaults.
- rag.addon_params["chunker"]["recursive_character"]["separators"] = [
- "##",
- "\n",
- ]
- # Doc B enqueued under the mutated defaults.
- await rag.apipeline_enqueue_documents(
- "second doc body",
- ids=["doc-post-mutation"],
- file_paths=["post.[native-R].txt"],
- track_id="track-post",
- process_options="R",
- )
- row_post = await rag.full_docs.get_by_id("doc-post-mutation")
- finally:
- await rag.finalize_storages()
- return sep_pre, row_post
- sep_pre, row_post = asyncio.run(_run())
- # Pre-mutation doc keeps the env-driven default cascade.
- assert sep_pre == list(DEFAULT_R_SEPARATORS)
- # Post-mutation doc reflects the runtime change.
- assert row_post["chunk_options"]["recursive_character"]["separators"] == [
- "##",
- "\n",
- ]
- @pytest.mark.offline
- def test_r_strategy_uses_dedicated_chunk_size_env(tmp_path, monkeypatch):
- """``CHUNK_R_SIZE`` must give R its own ``chunk_token_size``,
- decoupled from the global ``CHUNK_SIZE`` shared by F/V."""
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- monkeypatch.setenv("CHUNK_R_SIZE", "777")
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _r_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- captured["kwargs"] = dict(kwargs)
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_recursive_character", _r_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "stand-in body for recursive-character chunker",
- file_paths="ctor.[native-R].txt",
- track_id="track-r-size",
- process_options="R",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 777, (
- "R chunker must receive CHUNK_R_SIZE-derived chunk_token_size, "
- f"not the global CHUNK_SIZE; got {captured!r}"
- )
- # Dispatcher must not double-pass chunk_token_size as kwarg.
- assert "chunk_token_size" not in captured["kwargs"]
- @pytest.mark.offline
- def test_r_strategy_falls_back_to_global_chunk_size(tmp_path, monkeypatch):
- """When ``CHUNK_R_SIZE`` is unset and no per-doc R override is
- supplied, R inherits the top-level ``chunk_token_size`` resolved
- from the standard chain (here: ``LightRAG(chunk_token_size=…)``)."""
- monkeypatch.delenv("CHUNK_R_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _r_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_recursive_character", _r_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=444)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "fallback body",
- file_paths="ctor.[native-R].txt",
- track_id="track-r-fallback",
- process_options="R",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 444
- @pytest.mark.offline
- def test_v_strategy_uses_dedicated_chunk_size_env(tmp_path, monkeypatch):
- """``CHUNK_V_SIZE`` must give V its own ``chunk_token_size`` advisory
- ceiling, decoupled from the global ``CHUNK_SIZE`` shared by F/R."""
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- monkeypatch.setenv("CHUNK_V_SIZE", "2500")
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- async def _v_spy(
- tokenizer, content, chunk_token_size, *, embedding_func=None, **kwargs
- ):
- captured["chunk_token_size"] = chunk_token_size
- captured["kwargs"] = dict(kwargs)
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_semantic_vector", _v_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "stand-in body for semantic-vector chunker",
- file_paths="ctor.[native-V].txt",
- track_id="track-v-size",
- process_options="V",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 2500, (
- "V chunker must receive CHUNK_V_SIZE-derived chunk_token_size, "
- f"not the global CHUNK_SIZE; got {captured!r}"
- )
- # Dispatcher must not double-pass chunk_token_size as kwarg.
- assert "chunk_token_size" not in captured["kwargs"]
- @pytest.mark.offline
- def test_v_strategy_falls_back_to_global_chunk_size(tmp_path, monkeypatch):
- """When ``CHUNK_V_SIZE`` is unset and no per-doc V override is
- supplied, V inherits the top-level ``chunk_token_size`` resolved
- from the standard chain (here: ``LightRAG(chunk_token_size=…)``)."""
- monkeypatch.delenv("CHUNK_V_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- async def _v_spy(
- tokenizer, content, chunk_token_size, *, embedding_func=None, **kwargs
- ):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_semantic_vector", _v_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=555)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "fallback body",
- file_paths="ctor.[native-V].txt",
- track_id="track-v-fallback",
- process_options="V",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 555
- @pytest.mark.offline
- def test_f_strategy_honors_subdict_chunk_size(tmp_path, monkeypatch):
- """After the F cleanup, F honors a per-doc
- ``fixed_token.chunk_token_size`` override (caller-supplied
- chunk_options) instead of being locked to the top-level/global size —
- matching R/V/P. Pre-cleanup this slot could not exist: ``**f_opts``
- would collide with the positional ``chunk_token_size`` and TypeError.
- """
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _f_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- captured["kwargs"] = dict(kwargs)
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_fixed_token", _f_spy)
- custom_options = {
- # top-level global fallback — must be overridden by the sub-dict
- "chunk_token_size": 1200,
- "fixed_token": {
- "chunk_token_size": 333,
- "chunk_overlap_token_size": 7,
- "split_by_character": None,
- "split_by_character_only": False,
- },
- }
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "stand-in body for fixed-token chunker",
- file_paths="ctor-f.txt",
- track_id="track-f-size",
- process_options="F",
- chunk_options=custom_options,
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 333, (
- "F chunker must receive the fixed_token.chunk_token_size override, "
- f"not the top-level/global size; got {captured!r}"
- )
- # Dispatcher must pop it so it isn't also splatted as a kwarg (TypeError).
- assert "chunk_token_size" not in captured["kwargs"]
- assert captured["kwargs"]["chunk_overlap_token_size"] == 7
- @pytest.mark.offline
- def test_f_strategy_falls_back_to_top_level_size(tmp_path, monkeypatch):
- """When the F sub-dict carries no ``chunk_token_size``, F still inherits
- the top-level resolved size (here from ``LightRAG(chunk_token_size=…)``) —
- the cleanup must not regress the existing global-size fallback."""
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _f_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_fixed_token", _f_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=456)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "fallback body",
- file_paths="ctor-f.txt",
- track_id="track-f-fallback",
- process_options="F",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 456
- @pytest.mark.offline
- def test_f_strategy_uses_dedicated_chunk_size_env(tmp_path, monkeypatch):
- """``CHUNK_F_SIZE`` gives F its own ``chunk_token_size``, decoupled from
- the global ``CHUNK_SIZE`` shared as the fallback — symmetric with
- ``CHUNK_R_SIZE`` / ``CHUNK_V_SIZE``."""
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- monkeypatch.setenv("CHUNK_F_SIZE", "777")
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _f_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- captured["kwargs"] = dict(kwargs)
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_fixed_token", _f_spy)
- async def _run():
- rag = _new_rag(tmp_path)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "stand-in body for fixed-token chunker",
- file_paths="ctor-f.txt",
- track_id="track-f-size",
- process_options="F",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 777, (
- "F chunker must receive CHUNK_F_SIZE-derived chunk_token_size, "
- f"not the global CHUNK_SIZE; got {captured!r}"
- )
- # Dispatcher must not double-pass chunk_token_size as kwarg.
- assert "chunk_token_size" not in captured["kwargs"]
- @pytest.mark.offline
- def test_f_strategy_env_size_wins_over_legacy_ctor_field(tmp_path, monkeypatch):
- """Specificity-ordered precedence: ``CHUNK_F_SIZE`` (strategy env, tier 2)
- beats the strategy-agnostic legacy constructor field (tier 3)."""
- monkeypatch.setenv("CHUNK_F_SIZE", "640")
- monkeypatch.delenv("CHUNK_SIZE", raising=False)
- import lightrag.chunker as chunker_pkg
- captured: dict = {}
- def _f_spy(tokenizer, content, chunk_token_size, **kwargs):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- monkeypatch.setattr(chunker_pkg, "chunking_by_fixed_token", _f_spy)
- async def _run():
- rag = _new_rag(tmp_path, chunk_token_size=999)
- await rag.initialize_storages()
- try:
- await rag.apipeline_enqueue_documents(
- "precedence body",
- file_paths="ctor-f.txt",
- track_id="track-f-prec",
- process_options="F",
- )
- await rag.apipeline_process_enqueue_documents()
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 640
- @pytest.mark.offline
- def test_ainsert_legacy_path_honors_f_size_env(tmp_path, monkeypatch):
- """``rag.ainsert()`` intentionally does NOT pass a ``process_options``
- selector, so it runs the legacy ``chunking_func`` branch (preserving any
- user-supplied chunking_func). That branch must still honor ``CHUNK_F_SIZE``
- (i.e. ``fixed_token.chunk_token_size``) instead of only the global
- ``CHUNK_SIZE`` — otherwise the SDK path would silently ignore it.
- """
- monkeypatch.setenv("CHUNK_SIZE", "1200")
- monkeypatch.setenv("CHUNK_F_SIZE", "640")
- captured: dict = {}
- def _chunking_func_spy(
- tokenizer,
- content,
- split_by_character,
- split_by_character_only,
- overlap,
- chunk_token_size,
- ):
- captured["chunk_token_size"] = chunk_token_size
- return [{"tokens": 5, "content": "stub", "chunk_order_index": 0}]
- async def _run():
- rag = _new_rag(tmp_path)
- # Override the legacy 6-arg chunking_func to observe the size it gets.
- rag.chunking_func = _chunking_func_spy
- await rag.initialize_storages()
- try:
- await rag.ainsert("legacy path body", file_paths="legacy-f.txt")
- finally:
- await rag.finalize_storages()
- asyncio.run(_run())
- assert captured.get("chunk_token_size") == 640, (
- "ainsert legacy chunking_func must receive CHUNK_F_SIZE-derived size, "
- f"not the global CHUNK_SIZE; got {captured!r}"
- )
- @pytest.mark.offline
- def test_partial_chunker_config_still_picks_up_size_env(tmp_path, monkeypatch):
- """A partial ``addon_params['chunker']`` skips ``default_chunker_config``
- (``normalize_addon_params`` only defaults the whole ``chunker`` key when
- absent), so ``_apply_chunk_size_overlay`` must mirror the strategy
- size-env seeding — otherwise ``CHUNK_F_SIZE`` / ``CHUNK_R_SIZE`` /
- ``CHUNK_V_SIZE`` are silently ignored for partial configs.
- """
- monkeypatch.setenv("CHUNK_F_SIZE", "640")
- monkeypatch.setenv("CHUNK_R_SIZE", "777")
- monkeypatch.setenv("CHUNK_V_SIZE", "888")
- # Partial config: only F's split_by_character is supplied; every
- # chunk_token_size slot is absent and must be backfilled from env.
- rag = _new_rag(
- tmp_path,
- addon_params={"chunker": {"fixed_token": {"split_by_character": "\n"}}},
- )
- chunker = rag.addon_params["chunker"]
- assert chunker["fixed_token"]["chunk_token_size"] == 640
- # Explicit caller value preserved alongside the env-backfilled size.
- assert chunker["fixed_token"]["split_by_character"] == "\n"
- assert chunker["recursive_character"]["chunk_token_size"] == 777
- assert chunker["semantic_vector"]["chunk_token_size"] == 888
- @pytest.mark.offline
- def test_partial_chunker_config_explicit_size_beats_env(tmp_path, monkeypatch):
- """An explicit ``fixed_token.chunk_token_size`` in a partial config wins
- over ``CHUNK_F_SIZE`` (tier 1 > tier 2)."""
- monkeypatch.setenv("CHUNK_F_SIZE", "640")
- rag = _new_rag(
- tmp_path,
- addon_params={"chunker": {"fixed_token": {"chunk_token_size": 320}}},
- )
- assert rag.addon_params["chunker"]["fixed_token"]["chunk_token_size"] == 320
- @pytest.mark.offline
- def test_partial_chunker_config_no_size_env_leaves_slot_absent(tmp_path, monkeypatch):
- """Without a size env, the slot stays absent so the strategy inherits the
- top-level chunk_token_size at consumption time (no behavior change)."""
- monkeypatch.delenv("CHUNK_F_SIZE", raising=False)
- monkeypatch.delenv("CHUNK_R_SIZE", raising=False)
- rag = _new_rag(
- tmp_path,
- addon_params={"chunker": {"recursive_character": {"separators": ["X"]}}},
- )
- chunker = rag.addon_params["chunker"]
- assert "chunk_token_size" not in chunker["recursive_character"]
- assert "chunk_token_size" not in chunker["fixed_token"]
|