"""Shared helpers for parsing and re-emitting ``
`` markup.
These primitives are used by the paragraph-semantic chunker (Stage B
oversized-table re-split) and by the native multimodal surrounding-context
extractor. Both call sites need to:
* recognise a post-rewrite ```` tag,
* decide whether the body is JSON or HTML,
* enumerate row-level units (JSON list items or HTML ```` rows along
with their ```` / ``
`` / ```` wrappers), and
* re-serialise a subset of rows while preserving the structural wrappers.
Keeping the regexes and helpers in one place avoids subtle drift when
either consumer evolves.
"""
from __future__ import annotations
import json
import re
from typing import Any
# Strict regex for a post-rewrite table tag emitted by the sidecar
# writer (``lightrag.sidecar.writer``):
#
# blocks.jsonl invariants guarantee the tag has no embedded newlines.
TABLE_TAG_RE = re.compile(
r"",
re.DOTALL,
)
# Format detection regex inside the attrs string, e.g. format="json".
_TABLE_FORMAT_RE = re.compile(r"""format\s*=\s*["'](?P[^"']+)["']""")
# HTML ...
row extractor. Standard HTML disallows nested ,
# so a non-greedy match is sufficient for well-formed input.
HTML_TR_RE = re.compile(r"
]*>.*?
", re.DOTALL | re.IGNORECASE)
# Combined scanner for row-grouping wrappers and rows themselves. Used
# to attribute each to its surrounding /
/ so
# the wrapper can be reconstructed around chunk boundaries instead of
# being silently dropped during row-level table splitting.
HTML_ROW_PARTS_RE = re.compile(
r"(?P?(?:thead|tbody|tfoot)\b[^>]*>)" r"|(?P|
]*>.*?
)",
re.DOTALL | re.IGNORECASE,
)
HTML_WRAPPER_TAG_RE = re.compile(
r"<(?P/?)(?Pthead|tbody|tfoot)\b", re.IGNORECASE
)
def detect_table_format(attrs: str, body: str) -> str | None:
"""Return ``"json"``, ``"html"`` or ``None`` for a parsed ```` tag.
Prefers an explicit ``format="…"`` attribute. When silent, sniffs
the body: a leading ``[`` / ``{`` (after whitespace) implies JSON;
the presence of any `` tuple[str, list[Any]] | None:
"""Parse a JSON ````.
Returns ``(attrs_str, rows)`` or ``None`` if the tag is malformed
(does not match ``TABLE_TAG_RE``, body is not JSON, or body decodes
to something other than a list).
"""
match = TABLE_TAG_RE.match((text or "").strip())
if not match:
return None
body = match.group("body")
try:
rows = json.loads(body)
except json.JSONDecodeError:
return None
if not isinstance(rows, list):
return None
return match.group("attrs"), rows
def split_html_rows(body: str) -> list[tuple[str, str]] | None:
"""Extract ``
...
`` rows tagged with their wrapper context.
Returns a list of ``(wrapper_name, tr_str)`` tuples where
``wrapper_name`` is ``"thead"`` / ``"tbody"`` / ``"tfoot"`` (lower-
cased) for rows that sit inside the corresponding wrapper, or ``""``
for rows outside any of those wrappers. ``None`` signals "no row
found" so the caller falls through to character splitting.
Whitespace, captions, comments, ```` and any other text
outside the recognised row-wrappers is dropped — this is a regex
extractor, not a full DOM parser. Wrapper attributes (e.g.
````) are also dropped on re-emission; chunked
output uses bare wrapper tags.
"""
rows: list[tuple[str, str]] = []
current_wrapper = ""
for match in HTML_ROW_PARTS_RE.finditer(body or ""):
wrap = match.group("wrap")
tr = match.group("tr")
if wrap is not None:
tag = HTML_WRAPPER_TAG_RE.match(wrap)
if tag:
slash = tag.group("slash")
name = tag.group("name").lower()
if slash == "/":
if current_wrapper == name:
current_wrapper = ""
else:
current_wrapper = name
elif tr is not None:
rows.append((current_wrapper, tr))
if not rows:
return None
return rows
def serialize_html_rows(rows: list[tuple[str, str]]) -> str:
"""Re-emit ``(wrapper, tr)`` rows grouped under their original
```` / ```` / ```` wrappers.
Consecutive rows sharing the same wrapper name collapse into a
single wrapper block; transitions emit a closing tag for the
previous wrapper and an opening tag for the next. Rows tagged with
``""`` (no wrapper) emit bare ``...
``.
"""
parts: list[str] = []
current_wrapper = ""
for wrapper, tr in rows:
if wrapper != current_wrapper:
if current_wrapper:
parts.append(f"{current_wrapper}>")
if wrapper:
parts.append(f"<{wrapper}>")
current_wrapper = wrapper
parts.append(tr)
if current_wrapper:
parts.append(f"{current_wrapper}>")
return "".join(parts)