"""Shared helpers for parsing and re-emitting ```` markup. These primitives are used by the paragraph-semantic chunker (Stage B oversized-table re-split) and by the native multimodal surrounding-context extractor. Both call sites need to: * recognise a post-rewrite ``
`` tag, * decide whether the body is JSON or HTML, * enumerate row-level units (JSON list items or HTML ```` rows along with their ```` / ```` / ```` wrappers), and * re-serialise a subset of rows while preserving the structural wrappers. Keeping the regexes and helpers in one place avoids subtle drift when either consumer evolves. """ from __future__ import annotations import json import re from typing import Any # Strict regex for a post-rewrite table tag emitted by the sidecar # writer (``lightrag.sidecar.writer``): # {rows_json}
# blocks.jsonl invariants guarantee the tag has no embedded newlines. TABLE_TAG_RE = re.compile( r"[^>]*)>(?P.*?)", re.DOTALL, ) # Format detection regex inside the attrs string, e.g. format="json". _TABLE_FORMAT_RE = re.compile(r"""format\s*=\s*["'](?P[^"']+)["']""") # HTML ... row extractor. Standard HTML disallows nested , # so a non-greedy match is sufficient for well-formed input. HTML_TR_RE = re.compile(r"]*>.*?", re.DOTALL | re.IGNORECASE) # Combined scanner for row-grouping wrappers and rows themselves. Used # to attribute each to its surrounding // so # the wrapper can be reconstructed around chunk boundaries instead of # being silently dropped during row-level table splitting. HTML_ROW_PARTS_RE = re.compile( r"(?P]*>)" r"|(?P]*>.*?)", re.DOTALL | re.IGNORECASE, ) HTML_WRAPPER_TAG_RE = re.compile( r"<(?P/?)(?Pthead|tbody|tfoot)\b", re.IGNORECASE ) def detect_table_format(attrs: str, body: str) -> str | None: """Return ``"json"``, ``"html"`` or ``None`` for a parsed ```` tag. Prefers an explicit ``format="…"`` attribute. When silent, sniffs the body: a leading ``[`` / ``{`` (after whitespace) implies JSON; the presence of any `` tuple[str, list[Any]] | None: """Parse a JSON ``
{rows_json}
``. Returns ``(attrs_str, rows)`` or ``None`` if the tag is malformed (does not match ``TABLE_TAG_RE``, body is not JSON, or body decodes to something other than a list). """ match = TABLE_TAG_RE.match((text or "").strip()) if not match: return None body = match.group("body") try: rows = json.loads(body) except json.JSONDecodeError: return None if not isinstance(rows, list): return None return match.group("attrs"), rows def split_html_rows(body: str) -> list[tuple[str, str]] | None: """Extract ``...`` rows tagged with their wrapper context. Returns a list of ``(wrapper_name, tr_str)`` tuples where ``wrapper_name`` is ``"thead"`` / ``"tbody"`` / ``"tfoot"`` (lower- cased) for rows that sit inside the corresponding wrapper, or ``""`` for rows outside any of those wrappers. ``None`` signals "no row found" so the caller falls through to character splitting. Whitespace, captions, comments, ```` and any other text outside the recognised row-wrappers is dropped — this is a regex extractor, not a full DOM parser. Wrapper attributes (e.g. ````) are also dropped on re-emission; chunked output uses bare wrapper tags. """ rows: list[tuple[str, str]] = [] current_wrapper = "" for match in HTML_ROW_PARTS_RE.finditer(body or ""): wrap = match.group("wrap") tr = match.group("tr") if wrap is not None: tag = HTML_WRAPPER_TAG_RE.match(wrap) if tag: slash = tag.group("slash") name = tag.group("name").lower() if slash == "/": if current_wrapper == name: current_wrapper = "" else: current_wrapper = name elif tr is not None: rows.append((current_wrapper, tr)) if not rows: return None return rows def serialize_html_rows(rows: list[tuple[str, str]]) -> str: """Re-emit ``(wrapper, tr)`` rows grouped under their original ```` / ```` / ```` wrappers. Consecutive rows sharing the same wrapper name collapse into a single wrapper block; transitions emit a closing tag for the previous wrapper and an opening tag for the next. Rows tagged with ``""`` (no wrapper) emit bare ``...``. """ parts: list[str] = [] current_wrapper = "" for wrapper, tr in rows: if wrapper != current_wrapper: if current_wrapper: parts.append(f"") if wrapper: parts.append(f"<{wrapper}>") current_wrapper = wrapper parts.append(tr) if current_wrapper: parts.append(f"") return "".join(parts)