"""Surrounding-context enrichment for native multimodal sidecars. See ``docs/NativeMultimodalSurroundingContextPlan-zh.md``. For each entry in ``drawings.json`` / ``tables.json`` / ``equations.json``, this module locates the matching ````, ``…
`` / table ```` or ```` inside the *single* ``blocks.jsonl`` content row referenced by the entry's ``blockid``, then extracts up to ``max_tokens`` of leading and trailing text from the same row (without crossing block rows). Sidecar entries gain an optional ``surrounding`` field: { "leading": "…", "trailing": "…" } with both halves capped at ``max_tokens`` tokens (default 2000). Truncation prefers paragraph / sentence / clause boundaries (using the recursive separator cascade from ``CHUNK_R_SEPARATORS`` / falling back to :data:`lightrag.constants.DEFAULT_R_SEPARATORS`); only when a single closest segment alone exceeds the budget does the splitter fall through to a character-level binary search. Multimodal tags (````, ````, ``…
``) inside the candidate text are treated as atomic so the splitter cannot cut a tag in half. For ``tables.json`` entries — where the surrounding should describe text around the target table without dragging other tables along — every ``…
`` is removed from the candidate text *before* token counting and segmentation, so the saved surrounding string and the tokens budgeted against it stay in sync. For ``drawings.json`` / ``equations.json`` entries the table tags are preserved when they fit; oversized JSON or HTML tables are row-trimmed (tail rows for leading, head rows for trailing) so the surrounding keeps the rows physically closest to the target. Parser-internal identifiers (``id`` / ``path`` / ``src`` / ``refid``) are stripped from the candidate text via :func:`lightrag.chunk_schema.strip_internal_multimodal_markup_for_extraction` **before** atomization and token-budgeted truncation. This mirrors the treatment given to chunk content prior to entity extraction (see ``lightrag.operate._process_single_content``) and ensures the multimodal analysis prompt never sees those internal markers. Cleaning before truncation also guarantees the truncation point can never land inside an ``id="…"`` attribute and leave a malformed tag the strip regex would no longer recognize. Unlike the entity-extraction call site, the surrounding path invokes the cleaner with ``keep_cite_tag=True``: parser-internal ``refid`` is removed but the ```` wrapper is preserved so the VLM/LLM can still tell a reference label apart from inline prose (e.g. ``表1`` makes it obvious the visible text "表1" denotes another table elsewhere in the document, rather than appearing as an ordinary noun phrase). Note this only affects ``drawings.json`` / ``equations.json`` surroundings — ``tables.json`` surroundings still drop all cite tags via :func:`remove_table_tags` because the target-table analysis should not be steered by dangling references to other tables. """ from __future__ import annotations import json import logging import os import re from html import escape as html_escape from html import unescape as html_unescape from pathlib import Path from lightrag.chunk_schema import strip_internal_multimodal_markup_for_extraction from lightrag.constants import DEFAULT_R_SEPARATORS from lightrag.table_markup import ( TABLE_TAG_RE, detect_table_format, parse_table_tag, serialize_html_rows, split_html_rows, ) from lightrag.utils import Tokenizer logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tag scanner — atomises a string into a list of ``(kind, text)`` pieces so # the recursive splitter can treat ````, ```` # and ``…
`` as indivisible. # --------------------------------------------------------------------------- _MM_TAG_RE = re.compile( r"]*/>" r"|]*>.*?" r"|]*>.*?", re.DOTALL, ) _TABLE_CITE_RE = re.compile( r']*\btype\s*=\s*"table")[^>]*>.*?
', re.DOTALL, ) def _atomize(text: str) -> list[tuple[str, str]]: """Split ``text`` into ``(kind, content)`` atoms. ``kind`` ∈ ``{"text", "drawing", "equation", "table"}``. Concatenating all atom contents reproduces ``text`` verbatim. """ atoms: list[tuple[str, str]] = [] pos = 0 for match in _MM_TAG_RE.finditer(text): if match.start() > pos: atoms.append(("text", text[pos : match.start()])) tag_text = match.group(0) if tag_text.startswith(" re.Pattern[str]: esc = re.escape(item_id) return re.compile( rf']*?\bid\s*=\s*"{esc}"[^>]*?/>', re.DOTALL, ) def _table_pattern(item_id: str) -> re.Pattern[str]: esc = re.escape(item_id) return re.compile( rf']*?\bid\s*=\s*"{esc}"[^>]*?>.*?' rf'|]*\btype\s*=\s*"table")' rf'(?=[^>]*\brefid\s*=\s*"{esc}")[^>]*>.*?', re.DOTALL, ) def _equation_pattern(item_id: str) -> re.Pattern[str]: esc = re.escape(item_id) return re.compile( rf']*?\bid\s*=\s*"{esc}"[^>]*?>.*?', re.DOTALL, ) def find_target_span( kind: str, item_id: str, block_content: str ) -> tuple[int, int] | None: """Locate the target multimodal marker with the given ``id`` inside ``block_content``. Returns ``(start, end)`` byte offsets, or ``None`` if not found. ``kind`` is the sidecar root key — ``"drawings"`` / ``"tables"`` / ``"equations"``. """ if kind == "drawings": pattern = _drawing_pattern(item_id) elif kind == "tables": pattern = _table_pattern(item_id) elif kind == "equations": pattern = _equation_pattern(item_id) else: return None match = pattern.search(block_content) if not match: return None return match.start(), match.end() # --------------------------------------------------------------------------- # Recursive splitter that respects multimodal tag atoms. # --------------------------------------------------------------------------- def _split_text_segment(text: str, separators: list[str]) -> tuple[list[str], int]: """Split ``text`` using the first separator that produces >1 pieces. Returns ``(segments, sep_index)`` where ``segments`` reproduces ``text`` verbatim when concatenated and ``sep_index`` is the index in ``separators`` of the separator that was used. When no listed separator yields >1 piece the original string is returned as a single-element list with ``sep_index = len(separators)`` — the caller is responsible for any further char-level fallback. The separator is kept attached to the preceding segment so the assembled accumulator preserves whitespace boundaries. """ if not text: return [text], len(separators) for idx, sep in enumerate(separators): if not sep: continue if sep in text: parts = text.split(sep) assembled: list[str] = [] for j, part in enumerate(parts): if j < len(parts) - 1: assembled.append(part + sep) else: if part: assembled.append(part) if len(assembled) > 1: return assembled, idx return [text], len(separators) def _count_tokens(tokenizer: Tokenizer, text: str) -> int: if not text: return 0 return len(tokenizer.encode(text)) def _char_trim_leading(text: str, max_tokens: int, tokenizer: Tokenizer) -> str: """Drop characters from the head until the token count fits. Used as the final char-level fallback for the ``leading`` half — we want to keep the *tail* of the text (closest to the target). """ if _count_tokens(tokenizer, text) <= max_tokens: return text lo, hi = 0, len(text) while lo < hi: mid = (lo + hi) // 2 if _count_tokens(tokenizer, text[mid:]) <= max_tokens: hi = mid else: lo = mid + 1 return text[lo:] def _char_trim_trailing(text: str, max_tokens: int, tokenizer: Tokenizer) -> str: """Drop characters from the tail until the token count fits. Used as the final char-level fallback for the ``trailing`` half — we keep the *head* (closest to the target). """ if _count_tokens(tokenizer, text) <= max_tokens: return text lo, hi = 0, len(text) while lo < hi: mid = (lo + hi + 1) // 2 if _count_tokens(tokenizer, text[:mid]) <= max_tokens: lo = mid else: hi = mid - 1 return text[:lo] # --------------------------------------------------------------------------- # Row-aware table trimming for drawings / equations surrounding. # --------------------------------------------------------------------------- def _row_trim_table_leading( tag_text: str, max_tokens: int, tokenizer: Tokenizer ) -> str | None: """Return a smaller ``…
`` whose tail rows fit ``max_tokens``. For a JSON table, takes the last ``k`` rows (closest to the target) such that the re-wrapped tag still fits. For an HTML table, takes the last ``k`` ````s with their wrapper context. Returns ``None`` when no row-bounded trim fits. """ match = TABLE_TAG_RE.match(tag_text.strip()) if not match: return None attrs = match.group("attrs") body = match.group("body") fmt = detect_table_format(attrs, body) if fmt == "json": parsed = parse_table_tag(tag_text) if not parsed: return None attrs_str, rows = parsed for k in range(len(rows) - 1, 0, -1): candidate = ( f"" f"{json.dumps(rows[-k:], ensure_ascii=False)}" f"
" ) if _count_tokens(tokenizer, candidate) <= max_tokens: return candidate return _char_fallback_json_table( attrs_str, json.dumps(rows[-1], ensure_ascii=False) if rows else body, max_tokens, tokenizer, keep_tail=True, ) if fmt == "html": rows = split_html_rows(body) if not rows: return None for k in range(len(rows) - 1, 0, -1): inner = serialize_html_rows(rows[-k:]) candidate = f"{inner}
" if _count_tokens(tokenizer, candidate) <= max_tokens: return candidate return _char_fallback_html_table( attrs, rows[-1][1] if rows else body, max_tokens, tokenizer, keep_tail=True, ) return None def _row_trim_table_trailing( tag_text: str, max_tokens: int, tokenizer: Tokenizer ) -> str | None: """Return a smaller ``…
`` whose head rows fit ``max_tokens``.""" match = TABLE_TAG_RE.match(tag_text.strip()) if not match: return None attrs = match.group("attrs") body = match.group("body") fmt = detect_table_format(attrs, body) if fmt == "json": parsed = parse_table_tag(tag_text) if not parsed: return None attrs_str, rows = parsed for k in range(len(rows) - 1, 0, -1): candidate = ( f"" f"{json.dumps(rows[:k], ensure_ascii=False)}" f"
" ) if _count_tokens(tokenizer, candidate) <= max_tokens: return candidate return _char_fallback_json_table( attrs_str, json.dumps(rows[0], ensure_ascii=False) if rows else body, max_tokens, tokenizer, keep_tail=False, ) if fmt == "html": rows = split_html_rows(body) if not rows: return None for k in range(len(rows) - 1, 0, -1): inner = serialize_html_rows(rows[:k]) candidate = f"{inner}
" if _count_tokens(tokenizer, candidate) <= max_tokens: return candidate return _char_fallback_html_table( attrs, rows[0][1] if rows else body, max_tokens, tokenizer, keep_tail=False, ) return None def _empty_table(attrs: str) -> str: return f"
" def _char_fallback_json_table( attrs: str, source_text: str, max_tokens: int, tokenizer: Tokenizer, *, keep_tail: bool, ) -> str | None: """Fit one oversized JSON table row while keeping a valid table tag. The fallback stores the truncated serialized row text as a JSON string inside a one-row table. That preserves JSON validity and keeps the closest side of the oversized row when no complete row can fit. """ empty = _empty_table(attrs) if _count_tokens(tokenizer, empty) > max_tokens: return None def candidate(chars: int) -> str: snippet = source_text[-chars:] if keep_tail and chars else source_text[:chars] if not chars: return empty body = json.dumps([[snippet]], ensure_ascii=False) return f"{body}
" if _count_tokens(tokenizer, candidate(len(source_text))) <= max_tokens: return candidate(len(source_text)) lo, hi = 0, len(source_text) while lo < hi: mid = (lo + hi + 1) // 2 if _count_tokens(tokenizer, candidate(mid)) <= max_tokens: lo = mid else: hi = mid - 1 return candidate(lo) def _char_fallback_html_table( attrs: str, row_html: str, max_tokens: int, tokenizer: Tokenizer, *, keep_tail: bool, ) -> str | None: """Fit one oversized HTML row without emitting broken table markup.""" empty = _empty_table(attrs) if _count_tokens(tokenizer, empty) > max_tokens: return None text = html_unescape(re.sub(r"<[^>]+>", "", row_html or "")) def candidate(chars: int) -> str: snippet = text[-chars:] if keep_tail and chars else text[:chars] if not chars: return empty return f"
{html_escape(snippet)}
" if _count_tokens(tokenizer, candidate(len(text))) <= max_tokens: return candidate(len(text)) lo, hi = 0, len(text) while lo < hi: mid = (lo + hi + 1) // 2 if _count_tokens(tokenizer, candidate(mid)) <= max_tokens: lo = mid else: hi = mid - 1 return candidate(lo) def remove_table_tags(text: str) -> str: """Strip every table marker from ``text``. Used to pre-clean candidate text for ``tables.json`` surroundings: we never include sibling tables, so they must be dropped *before* token counting and segmentation so the budget matches the persisted string exactly. """ return _TABLE_CITE_RE.sub("", TABLE_TAG_RE.sub("", text)) # --------------------------------------------------------------------------- # Core leading / trailing builders. # --------------------------------------------------------------------------- def _build_leading( source: str, *, kind: str, tokenizer: Tokenizer, max_tokens: int, separators: list[str], ) -> str: """Build the ``leading`` half: suffix of ``source`` within budget. ``source`` is cleaned via :func:`lightrag.chunk_schema.strip_internal_multimodal_markup_for_extraction` *before* atomization and token-budgeted accumulation, so parser-internal identifiers (``id`` / ``path`` / ``src`` / ``refid``) never reach the accumulated output and the token budget reflects what the LLM actually sees. Cleaning before truncation also prevents a truncation point from landing inside an ``id="…"`` attribute and producing a malformed tag that the strip regex would no longer recognize. """ if not source or max_tokens <= 0: return "" if kind == "tables": source = remove_table_tags(source) if not source: return "" source = strip_internal_multimodal_markup_for_extraction(source, keep_cite_tag=True) if not source: return "" accumulated = "" atoms = _atomize(source) for atom_idx in range(len(atoms) - 1, -1, -1): atom_kind, atom_text = atoms[atom_idx] if not atom_text: continue if atom_kind in {"drawing", "equation"}: candidate = atom_text + accumulated if _count_tokens(tokenizer, candidate) <= max_tokens: accumulated = candidate continue break if atom_kind == "table": # Only reached for drawings/equations surroundings — table # tags are pre-stripped for the ``tables`` kind above. candidate = atom_text + accumulated if _count_tokens(tokenizer, candidate) <= max_tokens: accumulated = candidate continue remaining = max_tokens - _count_tokens(tokenizer, accumulated) if remaining > 0: trimmed = _row_trim_table_leading(atom_text, remaining, tokenizer) if trimmed is not None: accumulated = trimmed + accumulated break # Plain text atom — segment with separator cascade and accumulate # from the right. addition = _accumulate_text_leading( atom_text, existing=accumulated, tokenizer=tokenizer, max_tokens=max_tokens, separators=separators, ) if addition is None: # Even a partial fit was not possible; we stop here. break accumulated = addition + accumulated if _count_tokens(tokenizer, accumulated) >= max_tokens: break return accumulated def _accumulate_text_leading( text: str, *, existing: str, tokenizer: Tokenizer, max_tokens: int, separators: list[str], ) -> str | None: """Add as much of ``text`` (suffix) as fits into the remaining budget. Returns the chunk to prepend to ``existing``, or ``None`` to signal "stop walking earlier atoms" (i.e. budget exhausted with no useful addition). """ segments, sep_idx = _split_text_segment(text, separators) if not segments: return None # Try to add whole segments from the right. ``buf`` is what we will # prepend to ``existing``. buf = "" for i in range(len(segments) - 1, -1, -1): candidate = segments[i] + buf # Total tokens once we prepend ``candidate`` to ``existing``. if _count_tokens(tokenizer, candidate + existing) <= max_tokens: buf = candidate continue # Cannot fit segment ``i`` whole. Two cases: if buf: # We already added at least one segment — stop here without # char-truncating a more-distant segment. return buf # ``buf`` is empty: the closest segment alone overflows. Recurse # into the next separator level so we try a finer split before # falling back to characters. weaker = separators[sep_idx + 1 :] if sep_idx < len(separators) else [] if weaker: return _accumulate_text_leading( segments[i], existing=existing, tokenizer=tokenizer, max_tokens=max_tokens, separators=weaker, ) # Char-level fallback: take the longest suffix of this segment # that fits the remaining budget. remaining = max_tokens - _count_tokens(tokenizer, existing) if remaining <= 0: return None trimmed = _char_trim_leading(segments[i], remaining, tokenizer) return trimmed if trimmed else None return buf if buf else None def _build_trailing( source: str, *, kind: str, tokenizer: Tokenizer, max_tokens: int, separators: list[str], ) -> str: """Build the ``trailing`` half: prefix of ``source`` within budget. See :func:`_build_leading` for the rationale behind stripping parser-internal markers *before* atomization and truncation. """ if not source or max_tokens <= 0: return "" if kind == "tables": source = remove_table_tags(source) if not source: return "" source = strip_internal_multimodal_markup_for_extraction(source, keep_cite_tag=True) if not source: return "" accumulated = "" atoms = _atomize(source) for atom_kind, atom_text in atoms: if not atom_text: continue if atom_kind in {"drawing", "equation"}: candidate = accumulated + atom_text if _count_tokens(tokenizer, candidate) <= max_tokens: accumulated = candidate continue break if atom_kind == "table": candidate = accumulated + atom_text if _count_tokens(tokenizer, candidate) <= max_tokens: accumulated = candidate continue remaining = max_tokens - _count_tokens(tokenizer, accumulated) if remaining > 0: trimmed = _row_trim_table_trailing(atom_text, remaining, tokenizer) if trimmed is not None: accumulated = accumulated + trimmed break addition = _accumulate_text_trailing( atom_text, existing=accumulated, tokenizer=tokenizer, max_tokens=max_tokens, separators=separators, ) if addition is None: break accumulated = accumulated + addition if _count_tokens(tokenizer, accumulated) >= max_tokens: break return accumulated def _accumulate_text_trailing( text: str, *, existing: str, tokenizer: Tokenizer, max_tokens: int, separators: list[str], ) -> str | None: segments, sep_idx = _split_text_segment(text, separators) if not segments: return None buf = "" for i, seg in enumerate(segments): candidate = buf + seg if _count_tokens(tokenizer, existing + candidate) <= max_tokens: buf = candidate continue if buf: return buf weaker = separators[sep_idx + 1 :] if sep_idx < len(separators) else [] if weaker: return _accumulate_text_trailing( seg, existing=existing, tokenizer=tokenizer, max_tokens=max_tokens, separators=weaker, ) remaining = max_tokens - _count_tokens(tokenizer, existing) if remaining <= 0: return None trimmed = _char_trim_trailing(seg, remaining, tokenizer) return trimmed if trimmed else None return buf if buf else None # --------------------------------------------------------------------------- # Public entrypoints. # --------------------------------------------------------------------------- def load_chunk_separators() -> list[str]: """Resolve the recursive-character separator cascade. Reads ``CHUNK_R_SEPARATORS`` and falls back to :data:`lightrag.constants.DEFAULT_R_SEPARATORS` on missing / invalid JSON. The returned list always has the empty-string sentinel dropped — char fallback is signalled separately by the caller. """ raw = os.getenv("CHUNK_R_SEPARATORS") separators: list[str] if raw: try: parsed = json.loads(raw) if isinstance(parsed, list) and all(isinstance(s, str) for s in parsed): separators = parsed else: separators = list(DEFAULT_R_SEPARATORS) except json.JSONDecodeError: separators = list(DEFAULT_R_SEPARATORS) else: separators = list(DEFAULT_R_SEPARATORS) return [s for s in separators if s] def load_content_rows_by_blockid(blocks_path: str) -> dict[str, str]: """Read ``blocks.jsonl`` and return ``{blockid: content_str}``. Only ``type == "content"`` rows are kept. When the same blockid appears multiple times, the first occurrence wins. """ rows: dict[str, str] = {} path = Path(blocks_path) if not path.exists(): return rows with path.open("r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue if not isinstance(obj, dict): continue if obj.get("type") != "content": continue blockid = obj.get("blockid") if not isinstance(blockid, str) or not blockid: continue if blockid in rows: continue content = obj.get("content") if isinstance(content, str): rows[blockid] = content return rows DEFAULT_SURROUNDING_MAX_TOKENS = 2000 def _resolve_surrounding_budget( leading_max_tokens: int | None, trailing_max_tokens: int | None, ) -> tuple[int, int]: """Resolve per-half token budgets, defaulting to env vars then 2000. Reads ``SURROUNDING_LEADING_MAX_TOKENS`` / ``SURROUNDING_TRAILING_MAX_TOKENS`` when the caller passes ``None``. Invalid env values fall back to :data:`DEFAULT_SURROUNDING_MAX_TOKENS`. """ def _from_env(env_var: str) -> int: raw = os.getenv(env_var) if raw is None or not raw.strip(): return DEFAULT_SURROUNDING_MAX_TOKENS try: value = int(raw) except ValueError: logger.warning( "[multimodal_context] invalid %s=%r; falling back to %d", env_var, raw, DEFAULT_SURROUNDING_MAX_TOKENS, ) return DEFAULT_SURROUNDING_MAX_TOKENS return max(0, value) leading = ( leading_max_tokens if leading_max_tokens is not None else _from_env("SURROUNDING_LEADING_MAX_TOKENS") ) trailing = ( trailing_max_tokens if trailing_max_tokens is not None else _from_env("SURROUNDING_TRAILING_MAX_TOKENS") ) return leading, trailing _CONTENT_TRUNCATION_MARKER = ( "\n" ) def trim_content_to_budget( content: str, *, kind: str, max_tokens: int, tokenizer: Tokenizer | None, ) -> tuple[str, bool]: """Trim sidecar ``content`` to fit within ``max_tokens``, preserving the head. Used by ``analyze_multimodal`` to keep the EXTRACT-role prompt within :data:`lightrag.constants.DEFAULT_MAX_EXTRACT_INPUT_TOKENS`. Only ``content`` is compressed — surrounding/captions/footnotes already have their own caps and the prompt template is fixed. Strategy: - ``tables`` (``…
`` wrapped): row-aware trim via :func:`_row_trim_table_trailing` (keep head rows / first k ); falls back to ``_char_fallback_*`` (still ````-wrapped) when no single row fits. Non-``
`` content falls through to char trim from the tail. - ``equations`` / other: :func:`_char_trim_trailing` (keep head chars). A trailing HTML-comment marker is appended *outside* the ``
`` wrapper (when trimmed) so the LLM knows the body is incomplete. The marker is included in the token budget. Returns ``(possibly_trimmed_content, was_trimmed)``. When ``max_tokens <= 0`` or ``tokenizer is None`` the input is returned unchanged with ``was_trimmed=False``. """ if not content or tokenizer is None or max_tokens <= 0: return content, False original_tokens = _count_tokens(tokenizer, content) if original_tokens <= max_tokens: return content, False # Reserve token room for the truncation marker before trimming. marker_probe = _CONTENT_TRUNCATION_MARKER.format( original=original_tokens, final=max_tokens ) marker_tokens = _count_tokens(tokenizer, marker_probe) inner_budget = max(0, max_tokens - marker_tokens) trimmed_inner: str | None = None if kind == "tables" and TABLE_TAG_RE.match(content.strip()): # _row_trim_table_trailing keeps head rows and internally falls back # to char-level fits while preserving the
wrapper. Only # malformed / unrecognized-format markup returns None. trimmed_inner = _row_trim_table_trailing(content, inner_budget, tokenizer) if trimmed_inner is None: trimmed_inner = _char_trim_trailing(content, inner_budget, tokenizer) final_tokens = _count_tokens(tokenizer, trimmed_inner) marker = _CONTENT_TRUNCATION_MARKER.format( original=original_tokens, final=final_tokens ) return trimmed_inner + marker, True def build_surrounding( *, kind: str, block_content: str, span: tuple[int, int], tokenizer: Tokenizer, leading_max_tokens: int, trailing_max_tokens: int, separators: list[str], ) -> dict[str, str]: """Compute ``{"leading": …, "trailing": …}`` for one sidecar entry. ``leading_max_tokens`` and ``trailing_max_tokens`` are independent per-half caps so deployments can tune the two contexts separately via ``SURROUNDING_LEADING_MAX_TOKENS`` / ``SURROUNDING_TRAILING_MAX_TOKENS``. The returned strings have parser-internal markers (``id`` / ``path`` / ``src`` / ``refid``) stripped — the cleaning happens before token-budgeted truncation inside :func:`_build_leading` / :func:`_build_trailing`, so the budget reflects the LLM-visible content and truncation cannot leave malformed tags behind. """ start, end = span leading_src = block_content[:start] trailing_src = block_content[end:] leading = _build_leading( leading_src, kind=kind, tokenizer=tokenizer, max_tokens=leading_max_tokens, separators=separators, ) trailing = _build_trailing( trailing_src, kind=kind, tokenizer=tokenizer, max_tokens=trailing_max_tokens, separators=separators, ) return {"leading": leading, "trailing": trailing} def enrich_sidecars_with_surrounding( *, blocks_path: str, enabled_modalities: set[str], tokenizer: Tokenizer, leading_max_tokens: int | None = None, trailing_max_tokens: int | None = None, separators: list[str] | None = None, ) -> dict[str, int]: """Backfill ``surrounding`` on enabled-modality sidecars. Args: blocks_path: path to the ``…blocks.jsonl`` artifact. enabled_modalities: subset of ``{"drawings", "tables", "equations"}`` reflecting the document's ``process_options``. tokenizer: tokenizer used to enforce the per-half token budget. leading_max_tokens: leading-half cap. ``None`` reads ``SURROUNDING_LEADING_MAX_TOKENS`` (default 2000). trailing_max_tokens: trailing-half cap. ``None`` reads ``SURROUNDING_TRAILING_MAX_TOKENS`` (default 2000). separators: explicit separator cascade. Defaults to the cascade resolved from ``CHUNK_R_SEPARATORS`` (or ``DEFAULT_R_SEPARATORS``). Returns: ``{modality: updated_entries}`` for diagnostics. Modalities without a sidecar on disk are silently skipped (consistent with the rest of the multimodal pipeline). """ counts = {"drawings": 0, "tables": 0, "equations": 0} if not enabled_modalities: return counts blocks_file = Path(blocks_path) if not blocks_file.exists(): return counts content_by_blockid = load_content_rows_by_blockid(blocks_path) if separators is None: separators = load_chunk_separators() leading_tokens, trailing_tokens = _resolve_surrounding_budget( leading_max_tokens, trailing_max_tokens ) base = str(blocks_file) if base.endswith(".blocks.jsonl"): base = base[: -len(".blocks.jsonl")] for root_key in ("drawings", "tables", "equations"): if root_key not in enabled_modalities: continue sidecar_path = Path(base + f".{root_key}.json") if not sidecar_path.exists(): continue try: payload = json.loads(sidecar_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: logger.warning( "[multimodal_context] failed to read %s: %s", sidecar_path, exc, ) continue items = payload.get(root_key) if not isinstance(items, dict): continue updated = 0 for item_id, item in items.items(): if not isinstance(item, dict): continue blockid = item.get("blockid") if not isinstance(blockid, str) or not blockid: continue block_content = content_by_blockid.get(blockid) if block_content is None: continue span = find_target_span(root_key, item_id, block_content) if span is None: logger.debug( "[multimodal_context] %s/%s: id not found in block %s", root_key, item_id, blockid, ) continue surrounding = build_surrounding( kind=root_key, block_content=block_content, span=span, tokenizer=tokenizer, leading_max_tokens=leading_tokens, trailing_max_tokens=trailing_tokens, separators=separators, ) item["surrounding"] = surrounding updated += 1 counts[root_key] = updated try: sidecar_path.write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) except OSError as exc: logger.warning( "[multimodal_context] failed to write %s: %s", sidecar_path, exc, ) continue logger.debug( "[multimodal_context] %s: surrounding written for %d entries", root_key, updated, ) return counts __all__ = [ "DEFAULT_SURROUNDING_MAX_TOKENS", "build_surrounding", "enrich_sidecars_with_surrounding", "find_target_span", "load_chunk_separators", "load_content_rows_by_blockid", "remove_table_tags", "trim_content_to_budget", ]