table_markup.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. """Shared helpers for parsing and re-emitting ``<table>`` markup.
  2. These primitives are used by the paragraph-semantic chunker (Stage B
  3. oversized-table re-split) and by the native multimodal surrounding-context
  4. extractor. Both call sites need to:
  5. * recognise a post-rewrite ``<table id="…" format="…">…</table>`` tag,
  6. * decide whether the body is JSON or HTML,
  7. * enumerate row-level units (JSON list items or HTML ``<tr>`` rows along
  8. with their ``<thead>`` / ``<tbody>`` / ``<tfoot>`` wrappers), and
  9. * re-serialise a subset of rows while preserving the structural wrappers.
  10. Keeping the regexes and helpers in one place avoids subtle drift when
  11. either consumer evolves.
  12. """
  13. from __future__ import annotations
  14. import json
  15. import re
  16. from typing import Any
  17. # Strict regex for a post-rewrite table tag emitted by the sidecar
  18. # writer (``lightrag.sidecar.writer``):
  19. # <table id="tb-…" format="json"[ caption="…"]>{rows_json}</table>
  20. # blocks.jsonl invariants guarantee the tag has no embedded newlines.
  21. TABLE_TAG_RE = re.compile(
  22. r"<table\s+(?P<attrs>[^>]*)>(?P<body>.*?)</table>",
  23. re.DOTALL,
  24. )
  25. # Format detection regex inside the attrs string, e.g. format="json".
  26. _TABLE_FORMAT_RE = re.compile(r"""format\s*=\s*["'](?P<fmt>[^"']+)["']""")
  27. # HTML <tr>...</tr> row extractor. Standard HTML disallows nested <tr>,
  28. # so a non-greedy match is sufficient for well-formed input.
  29. HTML_TR_RE = re.compile(r"<tr\b[^>]*>.*?</tr>", re.DOTALL | re.IGNORECASE)
  30. # Combined scanner for row-grouping wrappers and rows themselves. Used
  31. # to attribute each <tr> to its surrounding <thead>/<tbody>/<tfoot> so
  32. # the wrapper can be reconstructed around chunk boundaries instead of
  33. # being silently dropped during row-level table splitting.
  34. HTML_ROW_PARTS_RE = re.compile(
  35. r"(?P<wrap></?(?:thead|tbody|tfoot)\b[^>]*>)" r"|(?P<tr><tr\b[^>]*>.*?</tr>)",
  36. re.DOTALL | re.IGNORECASE,
  37. )
  38. HTML_WRAPPER_TAG_RE = re.compile(
  39. r"<(?P<slash>/?)(?P<name>thead|tbody|tfoot)\b", re.IGNORECASE
  40. )
  41. def detect_table_format(attrs: str, body: str) -> str | None:
  42. """Return ``"json"``, ``"html"`` or ``None`` for a parsed ``<table>`` tag.
  43. Prefers an explicit ``format="…"`` attribute. When silent, sniffs
  44. the body: a leading ``[`` / ``{`` (after whitespace) implies JSON;
  45. the presence of any ``<tr`` tag implies HTML. Anything else is
  46. unknown and the caller should fall back to character splitting.
  47. """
  48. match = _TABLE_FORMAT_RE.search(attrs or "")
  49. if match:
  50. fmt = match.group("fmt").strip().lower()
  51. if fmt in {"json", "html"}:
  52. return fmt
  53. return None
  54. stripped = (body or "").lstrip()
  55. if stripped.startswith(("[", "{")):
  56. return "json"
  57. if "<tr" in stripped.lower():
  58. return "html"
  59. return None
  60. def parse_table_tag(text: str) -> tuple[str, list[Any]] | None:
  61. """Parse a JSON ``<table …>{rows_json}</table>``.
  62. Returns ``(attrs_str, rows)`` or ``None`` if the tag is malformed
  63. (does not match ``TABLE_TAG_RE``, body is not JSON, or body decodes
  64. to something other than a list).
  65. """
  66. match = TABLE_TAG_RE.match((text or "").strip())
  67. if not match:
  68. return None
  69. body = match.group("body")
  70. try:
  71. rows = json.loads(body)
  72. except json.JSONDecodeError:
  73. return None
  74. if not isinstance(rows, list):
  75. return None
  76. return match.group("attrs"), rows
  77. def split_html_rows(body: str) -> list[tuple[str, str]] | None:
  78. """Extract ``<tr>...</tr>`` rows tagged with their wrapper context.
  79. Returns a list of ``(wrapper_name, tr_str)`` tuples where
  80. ``wrapper_name`` is ``"thead"`` / ``"tbody"`` / ``"tfoot"`` (lower-
  81. cased) for rows that sit inside the corresponding wrapper, or ``""``
  82. for rows outside any of those wrappers. ``None`` signals "no row
  83. found" so the caller falls through to character splitting.
  84. Whitespace, captions, comments, ``<colgroup>`` and any other text
  85. outside the recognised row-wrappers is dropped — this is a regex
  86. extractor, not a full DOM parser. Wrapper attributes (e.g.
  87. ``<thead class="…">``) are also dropped on re-emission; chunked
  88. output uses bare wrapper tags.
  89. """
  90. rows: list[tuple[str, str]] = []
  91. current_wrapper = ""
  92. for match in HTML_ROW_PARTS_RE.finditer(body or ""):
  93. wrap = match.group("wrap")
  94. tr = match.group("tr")
  95. if wrap is not None:
  96. tag = HTML_WRAPPER_TAG_RE.match(wrap)
  97. if tag:
  98. slash = tag.group("slash")
  99. name = tag.group("name").lower()
  100. if slash == "/":
  101. if current_wrapper == name:
  102. current_wrapper = ""
  103. else:
  104. current_wrapper = name
  105. elif tr is not None:
  106. rows.append((current_wrapper, tr))
  107. if not rows:
  108. return None
  109. return rows
  110. def serialize_html_rows(rows: list[tuple[str, str]]) -> str:
  111. """Re-emit ``(wrapper, tr)`` rows grouped under their original
  112. ``<thead>`` / ``<tbody>`` / ``<tfoot>`` wrappers.
  113. Consecutive rows sharing the same wrapper name collapse into a
  114. single wrapper block; transitions emit a closing tag for the
  115. previous wrapper and an opening tag for the next. Rows tagged with
  116. ``""`` (no wrapper) emit bare ``<tr>...</tr>``.
  117. """
  118. parts: list[str] = []
  119. current_wrapper = ""
  120. for wrapper, tr in rows:
  121. if wrapper != current_wrapper:
  122. if current_wrapper:
  123. parts.append(f"</{current_wrapper}>")
  124. if wrapper:
  125. parts.append(f"<{wrapper}>")
  126. current_wrapper = wrapper
  127. parts.append(tr)
  128. if current_wrapper:
  129. parts.append(f"</{current_wrapper}>")
  130. return "".join(parts)