"""Regression tests for paragraph-semantic Stage B oversized-table handling.""" import json import pytest from lightrag.chunker.paragraph_semantic import ( _detect_table_format, _expand_block_with_table_splits, _split_html_rows, _split_long_block, _split_rows_by_tokens, _split_table_text, chunking_by_paragraph_semantic, ) from lightrag.utils import Tokenizer, TokenizerInterface class _CharTokenizer(TokenizerInterface): """1:1 character-to-token mapping — keeps math obvious in assertions.""" def encode(self, content: str): return [ord(ch) for ch in content] def decode(self, tokens): return "".join(chr(t) for t in tokens) def _make_tokenizer() -> Tokenizer: return Tokenizer(model_name="char", tokenizer=_CharTokenizer()) @pytest.mark.offline def test_split_rows_by_tokens_few_rows_huge_total_no_empty_slice(): # Reproduces the bug where target_chunks > len(rows) made target_rows # < 1, so int((i+1)*target_rows) collapsed to start and the loop # appended empty slices (which would later serialise as []…). tokenizer = _make_tokenizer() # 3 rows that each individually exceed target_max — forces # math.ceil(total/target_ideal) and math.ceil(total/target_max) to # both be much greater than len(rows). rows = [ [{"col": "x" * 800}], [{"col": "y" * 800}], [{"col": "z" * 800}], ] chunks = _split_rows_by_tokens( rows, tokenizer, target_max=200, target_ideal=150, last_min=64, ) assert chunks, "expected at least one chunk" for chunk in chunks: assert chunk, "Stage B must never emit an empty row slice" # Concatenation preserves all rows in order. flat: list = [] for chunk in chunks: flat.extend(chunk) assert flat == rows @pytest.mark.offline def test_split_rows_by_tokens_balanced_split_yields_one_row_per_chunk(): # When target_chunks gets capped at len(rows), each chunk holds one # row — verifies the cap kicks in and forward progress is preserved. tokenizer = _make_tokenizer() rows = [[{"col": "a" * 300}] for _ in range(4)] chunks = _split_rows_by_tokens( rows, tokenizer, target_max=200, target_ideal=150, last_min=10, # low enough that the tail-merge step doesn't fire ) assert all(chunk for chunk in chunks) # Each row appears exactly once across the chunks. flat: list = [] for chunk in chunks: flat.extend(chunk) assert flat == rows def _build_oversized_table_text(num_rows: int, row_payload_size: int) -> str: rows = [[f"r{idx}-" + "x" * row_payload_size] for idx in range(num_rows)] return f'
{json.dumps(rows)}
' def _write_blocks_jsonl(tmp_path, content: str) -> str: path = tmp_path / "doc.blocks.jsonl" row = { "type": "content", "heading": "Section", "parent_headings": [], "level": 2, "content": content, } path.write_text(json.dumps(row, ensure_ascii=False), encoding="utf-8") return str(path) @pytest.mark.offline def test_expand_block_assigns_first_and_last_roles_to_glued_blocks(): # An oversized table sandwiched between leading and trailing paragraphs # produces three slices: "first" (glued with leading paras), # "middle" (standalone), "last" (glued with trailing paras). Before # the fix, the first/last blocks defaulted to "none" and lost their # directional merge-protection. tokenizer = _make_tokenizer() table_text = _build_oversized_table_text(num_rows=6, row_payload_size=200) block = { "heading": "Section", "parent_headings": ["Doc"], "level": 2, "paragraphs": [ {"text": "lead paragraph", "is_table": False}, {"text": table_text, "is_table": True}, {"text": "trailing paragraph", "is_table": False}, ], } out = _expand_block_with_table_splits( block, tokenizer=tokenizer, table_max=400, table_ideal=300, table_min_last=128, ) roles = [b["table_chunk_role"] for b in out] assert roles[0] == "first", f"expected leading block role=first, got {roles}" assert roles[-1] == "last", f"expected trailing block role=last, got {roles}" assert all( r == "middle" for r in roles[1:-1] ), f"expected middle slices between first/last, got {roles}" # Boundary glue still works: leading text sits inside the first block, # trailing text sits inside the last block. assert any( p["text"] == "lead paragraph" for p in out[0]["paragraphs"] ), "leading paragraph must glue with the first table slice" assert any( p["text"] == "trailing paragraph" for p in out[-1]["paragraphs"] ), "trailing paragraph must glue with the last table slice" assert all( "表格片段" not in b["heading"] for b in out ), "Stage B should not expose legacy table-fragment heading suffixes" @pytest.mark.offline def test_expand_block_two_oversized_tables_separates_last_and_first_roles(): # Two oversized tables in the same heading block: the tail of the first # split must carry role="last" and not be silently merged into the # head of the second split (which must carry role="first"). tokenizer = _make_tokenizer() block = { "heading": "Section", "parent_headings": [], "level": 2, "paragraphs": [ { "text": _build_oversized_table_text(num_rows=4, row_payload_size=200), "is_table": True, }, {"text": "between tables", "is_table": False}, { "text": _build_oversized_table_text(num_rows=4, row_payload_size=200), "is_table": True, }, ], } out = _expand_block_with_table_splits( block, tokenizer=tokenizer, table_max=400, table_ideal=300, table_min_last=128, ) roles = [b["table_chunk_role"] for b in out] # We expect the role sequence to start with "first", end with "last", # and contain at least one "last" -> "first" transition (the boundary # between the two oversized tables) without any boundary block losing # its role. assert roles[0] == "first" assert roles[-1] == "last" assert "last" in roles # The transition: there must be a "last" immediately followed by a # "first" somewhere in the middle of the role sequence. transitions = list(zip(roles, roles[1:])) assert ( ("last", "first") in transitions ), f"expected a last->first boundary between the two split tables, got {roles}" @pytest.mark.offline def test_expand_block_duplicates_short_text_between_oversized_tables(): tokenizer = _make_tokenizer() bridge = "between tables" block = { "heading": "Section", "parent_headings": [], "level": 2, "paragraphs": [ { "text": _build_oversized_table_text(num_rows=4, row_payload_size=200), "is_table": True, }, {"text": bridge, "is_table": False}, { "text": _build_oversized_table_text(num_rows=4, row_payload_size=200), "is_table": True, }, ], } out = _expand_block_with_table_splits( block, tokenizer=tokenizer, table_max=400, table_ideal=300, table_min_last=128, target_max=800, chunk_overlap_token_size=100, ) roles = [b["table_chunk_role"] for b in out] boundary_idx = next( i for i, (left, right) in enumerate(zip(roles, roles[1:])) if (left, right) == ("last", "first") ) assert bridge in out[boundary_idx]["content"] assert bridge in out[boundary_idx + 1]["content"] @pytest.mark.offline def test_expand_block_emits_middle_text_when_table_bridge_is_long(): tokenizer = _make_tokenizer() bridge = ("A" * 45) + ("B" * 50) + ("C" * 45) block = { "heading": "Section", "parent_headings": [], "level": 2, "paragraphs": [ { "text": _build_oversized_table_text(num_rows=6, row_payload_size=120), "is_table": True, }, {"text": bridge, "is_table": False}, { "text": _build_oversized_table_text(num_rows=6, row_payload_size=120), "is_table": True, }, ], } out = _expand_block_with_table_splits( block, tokenizer=tokenizer, table_max=260, table_ideal=180, table_min_last=32, target_max=400, chunk_overlap_token_size=45, ) middle_idx = next( i for i, blk in enumerate(out) if blk["table_chunk_role"] == "none" and blk["content"] == "B" * 50 ) assert out[middle_idx - 1]["table_chunk_role"] == "last" assert "A" * 45 in out[middle_idx - 1]["content"] assert "B" * 50 not in out[middle_idx - 1]["content"] assert out[middle_idx + 1]["table_chunk_role"] == "first" assert out[middle_idx + 1]["content"].startswith("C" * 45) assert "B" * 50 not in out[middle_idx + 1]["content"] assert all(b["tokens"] <= 400 for b in out), [b["tokens"] for b in out] @pytest.mark.offline def test_public_chunking_adds_part_suffixes_to_all_table_split_fragments(tmp_path): tokenizer = _make_tokenizer() body = "\n".join( [ "lead paragraph", _build_oversized_table_text(num_rows=6, row_payload_size=200), "trailing paragraph", ] ) blocks_path = _write_blocks_jsonl(tmp_path, body) chunks = chunking_by_paragraph_semantic( tokenizer, body, chunk_token_size=800, blocks_path=blocks_path, chunk_overlap_token_size=0, ) assert len(chunks) > 1 assert [chunk["heading"]["heading"] for chunk in chunks] == [ f"Section [part {idx}]" for idx in range(1, len(chunks) + 1) ] assert all("表格片段" not in chunk["heading"]["heading"] for chunk in chunks) # --------------------------------------------------------------------------- # Table-aware fallback tests (row-boundary first, character last). # --------------------------------------------------------------------------- @pytest.mark.offline def test_detect_table_format_explicit_attr(): assert _detect_table_format('id="t1" format="json"', "[]") == "json" assert _detect_table_format("format='html'", "") == "html" # Unknown formats fall through (force the caller to use char fallback). assert _detect_table_format('format="markdown"', "...") is None @pytest.mark.offline def test_detect_table_format_sniff_when_attrs_silent(): assert _detect_table_format("", '[{"a":1}]') == "json" assert _detect_table_format("", "x") == "html" # Body that doesn't look like JSON or HTML → unknown. assert _detect_table_format("", "plain text rows") is None @pytest.mark.offline def test_split_html_rows_extracts_tr_elements(): body = ( "h" "ab" ) rows = _split_html_rows(body) assert rows is not None assert len(rows) == 3 # Each row carries its parent wrapper so the chunk serialiser can # rebuild / instead of dropping them silently. assert [w for w, _ in rows] == ["thead", "tbody", "tbody"] assert all(tr.startswith("") for _, tr in rows) @pytest.mark.offline def test_split_html_rows_no_tr_returns_none(): assert _split_html_rows("just text, no rows") is None assert _split_html_rows("") is None @pytest.mark.offline def test_split_table_text_single_row_oversized_falls_to_character_split(): # A 1-row table whose single cell is huge cannot be reduced via row # boundary — the function must fall to character splitting and respect # target_max on every output piece. tokenizer = _make_tokenizer() rows = [[{"col": "x" * 2000}]] table_text = f'{json.dumps(rows)}
' pieces = _split_table_text( table_text, tokenizer=tokenizer, target_max=500, target_ideal=350, last_min=128, ) assert len(pieces) >= 2, "single-row oversized table must produce multiple pieces" # Every piece honors the cap (this is the contract violation the user # reported when the previous code emitted a single 2000-token table). assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces) @pytest.mark.offline def test_split_table_text_multirow_one_huge_row_mixed_output(): # A multi-row table where most rows fit but one row is itself huge. # The fit-able rows must keep ...
wrapping; the huge # row's chunk falls to character splitting. tokenizer = _make_tokenizer() small_row = [{"col": "ok"}] huge_row = [{"col": "z" * 2000}] rows = [small_row, huge_row, small_row] table_text = f'{json.dumps(rows)}
' pieces = _split_table_text( table_text, tokenizer=tokenizer, target_max=500, target_ideal=350, last_min=64, ) assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces) # At least one fragment for the small rows must survive as legal markup. table_pieces = [p for p in pieces if p.startswith("-wrapped piece for fit-able rows" # The huge row must produce non-table text fragments (character split). text_pieces = [p for p in pieces if not p.startswith("
...; each output fragment must # remain a legal
{rows}
string. tokenizer = _make_tokenizer() body = "".join(f"{'r' * 200}" for _ in range(5)) table_text = f'{body}
' pieces = _split_table_text( table_text, tokenizer=tokenizer, target_max=500, target_ideal=350, last_min=64, ) assert len(pieces) >= 2 # All pieces should be legal ...
fragments (none of the # rows individually exceeds target_max, so no character fallback). assert all(p.startswith("") for p in pieces) assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces) @pytest.mark.offline def test_split_table_text_html_preserves_thead_tbody_wrappers(): # When an HTML table mixes and , the row splitter # used to drop the wrappers entirely — the chunked output came back # as bare sequences. The fix re-emits each wrapper around its # rows in every chunk so the table structure survives splitting. tokenizer = _make_tokenizer() head_row = "" body_rows = "".join(f"" for i in range(4)) body = f"{head_row}{body_rows}" table_text = f'
" + ("h" * 80) + "
{'b' * 80}{i}
{body}
' pieces = _split_table_text( table_text, tokenizer=tokenizer, target_max=400, target_ideal=280, last_min=64, ) # Multiple chunks expected and every chunk must remain a legal # -wrapped fragment. assert len(pieces) >= 2 assert all(p.startswith("
") for p in pieces) # Every chunk that contains the header row must still wrap it in # ...; every chunk with body rows must wrap them in # .... Before the fix, both wrappers vanished. for piece in pieces: if "" in piece and "" in piece, piece if "" in piece and "" in piece, piece # Round-trip: concatenating just the row payloads from every chunk # recovers the original row sequence in order. extracted_rows: list[str] = [] import re for piece in pieces: extracted_rows.extend( re.findall(r"]*>.*?", piece, re.DOTALL | re.IGNORECASE) ) expected_rows = re.findall(r"]*>.*?", body, re.DOTALL | re.IGNORECASE) assert extracted_rows == expected_rows @pytest.mark.offline def test_split_table_text_unknown_format_falls_to_character(): # No format attr, body that doesn't look like JSON/HTML → unknown. tokenizer = _make_tokenizer() table_text = '
" in piece: assert "
" in piece: assert "
' + ("plain row text " * 300) + "
" pieces = _split_table_text( table_text, tokenizer=tokenizer, target_max=500, target_ideal=350, last_min=64, ) assert len(pieces) >= 2 assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces) @pytest.mark.offline def test_expand_block_single_row_table_no_longer_left_intact(): # Stage B integration: previously a single-row oversized table was # appended back to cur_paras unchanged, leading the block to reach # Stage C with the table whole and the character fallback shredding # the tag. After the fix, Stage B itself produces multiple # pieces for such a table. tokenizer = _make_tokenizer() rows = [[{"col": "x" * 2000}]] # single huge row table_text = f'
{json.dumps(rows)}
' block = { "heading": "Section", "parent_headings": [], "level": 2, "paragraphs": [ {"text": "lead", "is_table": False}, {"text": table_text, "is_table": True}, {"text": "trail", "is_table": False}, ], } out = _expand_block_with_table_splits( block, tokenizer=tokenizer, table_max=400, table_ideal=300, table_min_last=128, ) # Multiple sub-blocks must be produced; the oversized table no longer # passes through whole. assert len(out) >= 2 # First/last role protection still fires when the table was reduced. roles = [b["table_chunk_role"] for b in out] assert ( "first" in roles or "last" in roles ), f"expected first/last role assignment after table split, got {roles}" @pytest.mark.offline def test_split_long_block_table_dominant_no_anchor_keeps_some_table_markup(): # Stage C integration: a block dominated by an oversized table with no # anchor candidates used to be character-split end-to-end, destroying # the tag. After the fix, at least some output sub-blocks # retain legal
...
markup for the rows that fit. tokenizer = _make_tokenizer() # Many small rows -> row-boundary split produces multiple legal # fragments, none of which individually exceed target_max. rows = [[{"col": f"r{i}-" + "v" * 200}] for i in range(8)] table_text = f'
{json.dumps(rows)}
' paragraphs = [ {"text": "Sufficiently long lead paragraph " * 30, "is_table": False}, {"text": table_text, "is_table": True}, ] sub_blocks = _split_long_block( paragraphs, heading="Heading", parent_headings=[], level=2, table_chunk_role="none", tokenizer=tokenizer, target_max=600, target_ideal=450, ) # Every sub-block respects the cap. assert all(b["tokens"] <= 600 for b in sub_blocks) # At least one sub-block keeps an unbroken fragment somewhere # in its content (proof that row-boundary preservation kicked in). contents = [b["content"] for b in sub_blocks] assert any( ("
" in c) for c in contents ), "expected at least one sub-block to retain a legal
fragment" @pytest.mark.offline def test_split_table_text_budgets_wrapper_overhead_for_target_max(): # ``_split_rows_by_tokens`` measures only the body (json.dumps(rows)); # the surrounding ``
`` wrapper costs tokens too. # Without wrapper-aware budgeting, a chunk whose body just fits # target_max would overflow once wrapped and trigger character # fallback — shredding the row structure for no good reason. tokenizer = _make_tokenizer() # A long attrs string forces a non-trivial wrapper overhead so the # body-only budget previously chosen (==target_max) overflows when # the wrapper is added back in. attrs_padding = "x" * 80 rows = [[{"col": "y" * 80}] for _ in range(4)] table_text = f'{json.dumps(rows)}
' pieces = _split_table_text( table_text, tokenizer=tokenizer, target_max=250, target_ideal=180, last_min=64, ) # Every output piece honors the cap. assert all(_count_tokens(tokenizer, p) <= 250 for p in pieces), [ _count_tokens(tokenizer, p) for p in pieces ] # Row structure preserved — none of the pieces fell back to # character fragments because of accidental wrapper overflow. assert all(p.startswith("") for p in pieces) def _count_tokens(tokenizer: Tokenizer, text: str) -> int: return len(tokenizer.encode(text))