| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- """Regression tests for paragraph-semantic Stage B oversized-table handling."""
- import json
- import pytest
- from lightrag.chunker.paragraph_semantic import (
- _detect_table_format,
- _expand_block_with_table_splits,
- _split_html_rows,
- _split_long_block,
- _split_rows_by_tokens,
- _split_table_text,
- chunking_by_paragraph_semantic,
- )
- from lightrag.utils import Tokenizer, TokenizerInterface
- class _CharTokenizer(TokenizerInterface):
- """1:1 character-to-token mapping — keeps math obvious in assertions."""
- def encode(self, content: str):
- return [ord(ch) for ch in content]
- def decode(self, tokens):
- return "".join(chr(t) for t in tokens)
- def _make_tokenizer() -> Tokenizer:
- return Tokenizer(model_name="char", tokenizer=_CharTokenizer())
- @pytest.mark.offline
- def test_split_rows_by_tokens_few_rows_huge_total_no_empty_slice():
- # Reproduces the bug where target_chunks > len(rows) made target_rows
- # < 1, so int((i+1)*target_rows) collapsed to start and the loop
- # appended empty slices (which would later serialise as <table>[]…).
- tokenizer = _make_tokenizer()
- # 3 rows that each individually exceed target_max — forces
- # math.ceil(total/target_ideal) and math.ceil(total/target_max) to
- # both be much greater than len(rows).
- rows = [
- [{"col": "x" * 800}],
- [{"col": "y" * 800}],
- [{"col": "z" * 800}],
- ]
- chunks = _split_rows_by_tokens(
- rows,
- tokenizer,
- target_max=200,
- target_ideal=150,
- last_min=64,
- )
- assert chunks, "expected at least one chunk"
- for chunk in chunks:
- assert chunk, "Stage B must never emit an empty row slice"
- # Concatenation preserves all rows in order.
- flat: list = []
- for chunk in chunks:
- flat.extend(chunk)
- assert flat == rows
- @pytest.mark.offline
- def test_split_rows_by_tokens_balanced_split_yields_one_row_per_chunk():
- # When target_chunks gets capped at len(rows), each chunk holds one
- # row — verifies the cap kicks in and forward progress is preserved.
- tokenizer = _make_tokenizer()
- rows = [[{"col": "a" * 300}] for _ in range(4)]
- chunks = _split_rows_by_tokens(
- rows,
- tokenizer,
- target_max=200,
- target_ideal=150,
- last_min=10, # low enough that the tail-merge step doesn't fire
- )
- assert all(chunk for chunk in chunks)
- # Each row appears exactly once across the chunks.
- flat: list = []
- for chunk in chunks:
- flat.extend(chunk)
- assert flat == rows
- def _build_oversized_table_text(num_rows: int, row_payload_size: int) -> str:
- rows = [[f"r{idx}-" + "x" * row_payload_size] for idx in range(num_rows)]
- return f'<table id="tb-1" format="json">{json.dumps(rows)}</table>'
- def _write_blocks_jsonl(tmp_path, content: str) -> str:
- path = tmp_path / "doc.blocks.jsonl"
- row = {
- "type": "content",
- "heading": "Section",
- "parent_headings": [],
- "level": 2,
- "content": content,
- }
- path.write_text(json.dumps(row, ensure_ascii=False), encoding="utf-8")
- return str(path)
- @pytest.mark.offline
- def test_expand_block_assigns_first_and_last_roles_to_glued_blocks():
- # An oversized table sandwiched between leading and trailing paragraphs
- # produces three slices: "first" (glued with leading paras),
- # "middle" (standalone), "last" (glued with trailing paras). Before
- # the fix, the first/last blocks defaulted to "none" and lost their
- # directional merge-protection.
- tokenizer = _make_tokenizer()
- table_text = _build_oversized_table_text(num_rows=6, row_payload_size=200)
- block = {
- "heading": "Section",
- "parent_headings": ["Doc"],
- "level": 2,
- "paragraphs": [
- {"text": "lead paragraph", "is_table": False},
- {"text": table_text, "is_table": True},
- {"text": "trailing paragraph", "is_table": False},
- ],
- }
- out = _expand_block_with_table_splits(
- block,
- tokenizer=tokenizer,
- table_max=400,
- table_ideal=300,
- table_min_last=128,
- )
- roles = [b["table_chunk_role"] for b in out]
- assert roles[0] == "first", f"expected leading block role=first, got {roles}"
- assert roles[-1] == "last", f"expected trailing block role=last, got {roles}"
- assert all(
- r == "middle" for r in roles[1:-1]
- ), f"expected middle slices between first/last, got {roles}"
- # Boundary glue still works: leading text sits inside the first block,
- # trailing text sits inside the last block.
- assert any(
- p["text"] == "lead paragraph" for p in out[0]["paragraphs"]
- ), "leading paragraph must glue with the first table slice"
- assert any(
- p["text"] == "trailing paragraph" for p in out[-1]["paragraphs"]
- ), "trailing paragraph must glue with the last table slice"
- assert all(
- "表格片段" not in b["heading"] for b in out
- ), "Stage B should not expose legacy table-fragment heading suffixes"
- @pytest.mark.offline
- def test_expand_block_two_oversized_tables_separates_last_and_first_roles():
- # Two oversized tables in the same heading block: the tail of the first
- # split must carry role="last" and not be silently merged into the
- # head of the second split (which must carry role="first").
- tokenizer = _make_tokenizer()
- block = {
- "heading": "Section",
- "parent_headings": [],
- "level": 2,
- "paragraphs": [
- {
- "text": _build_oversized_table_text(num_rows=4, row_payload_size=200),
- "is_table": True,
- },
- {"text": "between tables", "is_table": False},
- {
- "text": _build_oversized_table_text(num_rows=4, row_payload_size=200),
- "is_table": True,
- },
- ],
- }
- out = _expand_block_with_table_splits(
- block,
- tokenizer=tokenizer,
- table_max=400,
- table_ideal=300,
- table_min_last=128,
- )
- roles = [b["table_chunk_role"] for b in out]
- # We expect the role sequence to start with "first", end with "last",
- # and contain at least one "last" -> "first" transition (the boundary
- # between the two oversized tables) without any boundary block losing
- # its role.
- assert roles[0] == "first"
- assert roles[-1] == "last"
- assert "last" in roles
- # The transition: there must be a "last" immediately followed by a
- # "first" somewhere in the middle of the role sequence.
- transitions = list(zip(roles, roles[1:]))
- assert (
- ("last", "first") in transitions
- ), f"expected a last->first boundary between the two split tables, got {roles}"
- @pytest.mark.offline
- def test_expand_block_duplicates_short_text_between_oversized_tables():
- tokenizer = _make_tokenizer()
- bridge = "between tables"
- block = {
- "heading": "Section",
- "parent_headings": [],
- "level": 2,
- "paragraphs": [
- {
- "text": _build_oversized_table_text(num_rows=4, row_payload_size=200),
- "is_table": True,
- },
- {"text": bridge, "is_table": False},
- {
- "text": _build_oversized_table_text(num_rows=4, row_payload_size=200),
- "is_table": True,
- },
- ],
- }
- out = _expand_block_with_table_splits(
- block,
- tokenizer=tokenizer,
- table_max=400,
- table_ideal=300,
- table_min_last=128,
- target_max=800,
- chunk_overlap_token_size=100,
- )
- roles = [b["table_chunk_role"] for b in out]
- boundary_idx = next(
- i
- for i, (left, right) in enumerate(zip(roles, roles[1:]))
- if (left, right) == ("last", "first")
- )
- assert bridge in out[boundary_idx]["content"]
- assert bridge in out[boundary_idx + 1]["content"]
- @pytest.mark.offline
- def test_expand_block_emits_middle_text_when_table_bridge_is_long():
- tokenizer = _make_tokenizer()
- bridge = ("A" * 45) + ("B" * 50) + ("C" * 45)
- block = {
- "heading": "Section",
- "parent_headings": [],
- "level": 2,
- "paragraphs": [
- {
- "text": _build_oversized_table_text(num_rows=6, row_payload_size=120),
- "is_table": True,
- },
- {"text": bridge, "is_table": False},
- {
- "text": _build_oversized_table_text(num_rows=6, row_payload_size=120),
- "is_table": True,
- },
- ],
- }
- out = _expand_block_with_table_splits(
- block,
- tokenizer=tokenizer,
- table_max=260,
- table_ideal=180,
- table_min_last=32,
- target_max=400,
- chunk_overlap_token_size=45,
- )
- middle_idx = next(
- i
- for i, blk in enumerate(out)
- if blk["table_chunk_role"] == "none" and blk["content"] == "B" * 50
- )
- assert out[middle_idx - 1]["table_chunk_role"] == "last"
- assert "A" * 45 in out[middle_idx - 1]["content"]
- assert "B" * 50 not in out[middle_idx - 1]["content"]
- assert out[middle_idx + 1]["table_chunk_role"] == "first"
- assert out[middle_idx + 1]["content"].startswith("C" * 45)
- assert "B" * 50 not in out[middle_idx + 1]["content"]
- assert all(b["tokens"] <= 400 for b in out), [b["tokens"] for b in out]
- @pytest.mark.offline
- def test_public_chunking_adds_part_suffixes_to_all_table_split_fragments(tmp_path):
- tokenizer = _make_tokenizer()
- body = "\n".join(
- [
- "lead paragraph",
- _build_oversized_table_text(num_rows=6, row_payload_size=200),
- "trailing paragraph",
- ]
- )
- blocks_path = _write_blocks_jsonl(tmp_path, body)
- chunks = chunking_by_paragraph_semantic(
- tokenizer,
- body,
- chunk_token_size=800,
- blocks_path=blocks_path,
- chunk_overlap_token_size=0,
- )
- assert len(chunks) > 1
- assert [chunk["heading"]["heading"] for chunk in chunks] == [
- f"Section [part {idx}]" for idx in range(1, len(chunks) + 1)
- ]
- assert all("表格片段" not in chunk["heading"]["heading"] for chunk in chunks)
- # ---------------------------------------------------------------------------
- # Table-aware fallback tests (row-boundary first, character last).
- # ---------------------------------------------------------------------------
- @pytest.mark.offline
- def test_detect_table_format_explicit_attr():
- assert _detect_table_format('id="t1" format="json"', "[]") == "json"
- assert _detect_table_format("format='html'", "<tr></tr>") == "html"
- # Unknown formats fall through (force the caller to use char fallback).
- assert _detect_table_format('format="markdown"', "...") is None
- @pytest.mark.offline
- def test_detect_table_format_sniff_when_attrs_silent():
- assert _detect_table_format("", '[{"a":1}]') == "json"
- assert _detect_table_format("", "<tr><td>x</td></tr>") == "html"
- # Body that doesn't look like JSON or HTML → unknown.
- assert _detect_table_format("", "plain text rows") is None
- @pytest.mark.offline
- def test_split_html_rows_extracts_tr_elements():
- body = (
- "<thead><tr><th>h</th></tr></thead>"
- "<tbody><tr><td>a</td></tr><tr><td>b</td></tr></tbody>"
- )
- rows = _split_html_rows(body)
- assert rows is not None
- assert len(rows) == 3
- # Each row carries its parent wrapper so the chunk serialiser can
- # rebuild <thead>/<tbody> instead of dropping them silently.
- assert [w for w, _ in rows] == ["thead", "tbody", "tbody"]
- assert all(tr.startswith("<tr") and tr.endswith("</tr>") for _, tr in rows)
- @pytest.mark.offline
- def test_split_html_rows_no_tr_returns_none():
- assert _split_html_rows("just text, no rows") is None
- assert _split_html_rows("") is None
- @pytest.mark.offline
- def test_split_table_text_single_row_oversized_falls_to_character_split():
- # A 1-row table whose single cell is huge cannot be reduced via row
- # boundary — the function must fall to character splitting and respect
- # target_max on every output piece.
- tokenizer = _make_tokenizer()
- rows = [[{"col": "x" * 2000}]]
- table_text = f'<table id="tb-1" format="json">{json.dumps(rows)}</table>'
- pieces = _split_table_text(
- table_text,
- tokenizer=tokenizer,
- target_max=500,
- target_ideal=350,
- last_min=128,
- )
- assert len(pieces) >= 2, "single-row oversized table must produce multiple pieces"
- # Every piece honors the cap (this is the contract violation the user
- # reported when the previous code emitted a single 2000-token table).
- assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces)
- @pytest.mark.offline
- def test_split_table_text_multirow_one_huge_row_mixed_output():
- # A multi-row table where most rows fit but one row is itself huge.
- # The fit-able rows must keep <table>...</table> wrapping; the huge
- # row's chunk falls to character splitting.
- tokenizer = _make_tokenizer()
- small_row = [{"col": "ok"}]
- huge_row = [{"col": "z" * 2000}]
- rows = [small_row, huge_row, small_row]
- table_text = f'<table id="tb-1" format="json">{json.dumps(rows)}</table>'
- pieces = _split_table_text(
- table_text,
- tokenizer=tokenizer,
- target_max=500,
- target_ideal=350,
- last_min=64,
- )
- assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces)
- # At least one fragment for the small rows must survive as legal markup.
- table_pieces = [p for p in pieces if p.startswith("<table ")]
- assert table_pieces, "expected at least one <table>-wrapped piece for fit-able rows"
- # The huge row must produce non-table text fragments (character split).
- text_pieces = [p for p in pieces if not p.startswith("<table ")]
- assert text_pieces, "huge row must yield character-split text fragments"
- @pytest.mark.offline
- def test_split_table_text_html_table_split_by_tr():
- # HTML-format table: rows are <tr>...</tr>; each output fragment must
- # remain a legal <table {attrs}>{rows}</table> string.
- tokenizer = _make_tokenizer()
- body = "".join(f"<tr><td>{'r' * 200}</td></tr>" for _ in range(5))
- table_text = f'<table id="tb-h1" format="html">{body}</table>'
- pieces = _split_table_text(
- table_text,
- tokenizer=tokenizer,
- target_max=500,
- target_ideal=350,
- last_min=64,
- )
- assert len(pieces) >= 2
- # All pieces should be legal <table>...</table> fragments (none of the
- # rows individually exceeds target_max, so no character fallback).
- assert all(p.startswith("<table ") and p.endswith("</table>") for p in pieces)
- assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces)
- @pytest.mark.offline
- def test_split_table_text_html_preserves_thead_tbody_wrappers():
- # When an HTML table mixes <thead> and <tbody>, the row splitter
- # used to drop the wrappers entirely — the chunked output came back
- # as bare <tr> sequences. The fix re-emits each wrapper around its
- # rows in every chunk so the table structure survives splitting.
- tokenizer = _make_tokenizer()
- head_row = "<tr><th>" + ("h" * 80) + "</th></tr>"
- body_rows = "".join(f"<tr><td>{'b' * 80}{i}</td></tr>" for i in range(4))
- body = f"<thead>{head_row}</thead><tbody>{body_rows}</tbody>"
- table_text = f'<table id="tb-mixed" format="html">{body}</table>'
- pieces = _split_table_text(
- table_text,
- tokenizer=tokenizer,
- target_max=400,
- target_ideal=280,
- last_min=64,
- )
- # Multiple chunks expected and every chunk must remain a legal
- # <table>-wrapped fragment.
- assert len(pieces) >= 2
- assert all(p.startswith("<table ") and p.endswith("</table>") for p in pieces)
- # Every chunk that contains the header row must still wrap it in
- # <thead>...</thead>; every chunk with body rows must wrap them in
- # <tbody>...</tbody>. Before the fix, both wrappers vanished.
- for piece in pieces:
- if "<th>" in piece:
- assert "<thead>" in piece and "</thead>" in piece, piece
- if "<td>" in piece:
- assert "<tbody>" in piece and "</tbody>" in piece, piece
- # Round-trip: concatenating just the row payloads from every chunk
- # recovers the original row sequence in order.
- extracted_rows: list[str] = []
- import re
- for piece in pieces:
- extracted_rows.extend(
- re.findall(r"<tr\b[^>]*>.*?</tr>", piece, re.DOTALL | re.IGNORECASE)
- )
- expected_rows = re.findall(r"<tr\b[^>]*>.*?</tr>", body, re.DOTALL | re.IGNORECASE)
- assert extracted_rows == expected_rows
- @pytest.mark.offline
- def test_split_table_text_unknown_format_falls_to_character():
- # No format attr, body that doesn't look like JSON/HTML → unknown.
- tokenizer = _make_tokenizer()
- table_text = '<table id="weird">' + ("plain row text " * 300) + "</table>"
- pieces = _split_table_text(
- table_text,
- tokenizer=tokenizer,
- target_max=500,
- target_ideal=350,
- last_min=64,
- )
- assert len(pieces) >= 2
- assert all(_count_tokens(tokenizer, p) <= 500 for p in pieces)
- @pytest.mark.offline
- def test_expand_block_single_row_table_no_longer_left_intact():
- # Stage B integration: previously a single-row oversized table was
- # appended back to cur_paras unchanged, leading the block to reach
- # Stage C with the table whole and the character fallback shredding
- # the <table> tag. After the fix, Stage B itself produces multiple
- # pieces for such a table.
- tokenizer = _make_tokenizer()
- rows = [[{"col": "x" * 2000}]] # single huge row
- table_text = f'<table id="tb-1" format="json">{json.dumps(rows)}</table>'
- block = {
- "heading": "Section",
- "parent_headings": [],
- "level": 2,
- "paragraphs": [
- {"text": "lead", "is_table": False},
- {"text": table_text, "is_table": True},
- {"text": "trail", "is_table": False},
- ],
- }
- out = _expand_block_with_table_splits(
- block,
- tokenizer=tokenizer,
- table_max=400,
- table_ideal=300,
- table_min_last=128,
- )
- # Multiple sub-blocks must be produced; the oversized table no longer
- # passes through whole.
- assert len(out) >= 2
- # First/last role protection still fires when the table was reduced.
- roles = [b["table_chunk_role"] for b in out]
- assert (
- "first" in roles or "last" in roles
- ), f"expected first/last role assignment after table split, got {roles}"
- @pytest.mark.offline
- def test_split_long_block_table_dominant_no_anchor_keeps_some_table_markup():
- # Stage C integration: a block dominated by an oversized table with no
- # anchor candidates used to be character-split end-to-end, destroying
- # the <table> tag. After the fix, at least some output sub-blocks
- # retain legal <table>...</table> markup for the rows that fit.
- tokenizer = _make_tokenizer()
- # Many small rows -> row-boundary split produces multiple legal
- # <table> fragments, none of which individually exceed target_max.
- rows = [[{"col": f"r{i}-" + "v" * 200}] for i in range(8)]
- table_text = f'<table id="tb-1" format="json">{json.dumps(rows)}</table>'
- paragraphs = [
- {"text": "Sufficiently long lead paragraph " * 30, "is_table": False},
- {"text": table_text, "is_table": True},
- ]
- sub_blocks = _split_long_block(
- paragraphs,
- heading="Heading",
- parent_headings=[],
- level=2,
- table_chunk_role="none",
- tokenizer=tokenizer,
- target_max=600,
- target_ideal=450,
- )
- # Every sub-block respects the cap.
- assert all(b["tokens"] <= 600 for b in sub_blocks)
- # At least one sub-block keeps an unbroken <table> fragment somewhere
- # in its content (proof that row-boundary preservation kicked in).
- contents = [b["content"] for b in sub_blocks]
- assert any(
- ("<table " in c and "</table>" in c) for c in contents
- ), "expected at least one sub-block to retain a legal <table> fragment"
- @pytest.mark.offline
- def test_split_table_text_budgets_wrapper_overhead_for_target_max():
- # ``_split_rows_by_tokens`` measures only the body (json.dumps(rows));
- # the surrounding ``<table {attrs}></table>`` wrapper costs tokens too.
- # Without wrapper-aware budgeting, a chunk whose body just fits
- # target_max would overflow once wrapped and trigger character
- # fallback — shredding the row structure for no good reason.
- tokenizer = _make_tokenizer()
- # A long attrs string forces a non-trivial wrapper overhead so the
- # body-only budget previously chosen (==target_max) overflows when
- # the wrapper is added back in.
- attrs_padding = "x" * 80
- rows = [[{"col": "y" * 80}] for _ in range(4)]
- table_text = f'<table id="{attrs_padding}" format="json">{json.dumps(rows)}</table>'
- pieces = _split_table_text(
- table_text,
- tokenizer=tokenizer,
- target_max=250,
- target_ideal=180,
- last_min=64,
- )
- # Every output piece honors the cap.
- assert all(_count_tokens(tokenizer, p) <= 250 for p in pieces), [
- _count_tokens(tokenizer, p) for p in pieces
- ]
- # Row structure preserved — none of the pieces fell back to
- # character fragments because of accidental wrapper overflow.
- assert all(p.startswith("<table ") and p.endswith("</table>") for p in pieces)
- def _count_tokens(tokenizer: Tokenizer, text: str) -> int:
- return len(tokenizer.encode(text))
|