| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627 |
- """Spec-compliant sidecar writer.
- This module is the *single executable specification* of the LightRAG sidecar
- format (``docs/LightRAGSidecarFormat-zh.md``). Engine adapters hand it an
- :class:`IRDoc`; it emits the ``*.parsed/`` directory.
- Responsibilities (none of these belong in adapters):
- - id allocation: ``tb-/im-/eq-<doc_hash>-NNNN`` (4-digit zero-padded,
- global per-doc sequence)
- - placeholder rendering: ``{{TBL:k}}`` / ``{{IMG:k}}`` / ``{{EQ:k}}`` /
- ``{{EQI:k}}`` → spec-shaped XML-style tags
- - blockid computation: ``md5(doc_id:block_index:heading:content)``
- - assets dir creation and file copying; ``asset_dir`` flag in meta is
- derived from "directory exists and is non-empty"
- - merged_text + document_hash
- - meta line shape (spec §3.1)
- - conditional writes: ``tables.json`` / ``drawings.json`` / ``equations.json``
- appear only when their dict is non-empty
- """
- from __future__ import annotations
- import hashlib
- import json
- import re
- import shutil
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any
- from lightrag.constants import FULL_DOCS_FORMAT_LIGHTRAG
- from lightrag.sidecar.ir import (
- AssetSpec,
- IRBlock,
- IRDoc,
- IRDrawing,
- IREquation,
- IRTable,
- )
- from lightrag.sidecar.placeholders import (
- render_drawing_tag,
- render_equation_tag,
- render_table_tag,
- render_template,
- table_body_for_rows,
- )
- from lightrag.utils import logger
- # ---------------------------------------------------------------------------
- # Public entry point
- # ---------------------------------------------------------------------------
- _VALID_BLOCK_DRAWING_PATH_STYLES = {"with_prefix", "basename_only"}
- def write_sidecar(
- ir: IRDoc,
- *,
- parsed_dir: Path,
- doc_id: str,
- engine: str,
- clean_parsed_dir: bool = True,
- block_drawing_path_style: str = "with_prefix",
- ) -> dict[str, Any]:
- """Emit a spec-compliant ``*.parsed/`` directory from an IR.
- Args:
- ir: Document IR produced by an engine adapter.
- parsed_dir: Output directory. By default cleared and recreated; the
- caller is responsible for placing it under
- ``__parsed__/<base>.parsed/``.
- doc_id: ``doc-<md5>``; ``doc_hash`` for sidecar ids is the 32-char
- tail after stripping the ``doc-`` prefix.
- engine: One of ``native`` / ``mineru`` / ``docling`` / ``legacy``;
- written verbatim to ``meta.parse_engine``.
- clean_parsed_dir: When True (default) the writer ``rmtree``s
- ``parsed_dir`` before writing. Set to False when the caller has
- already pre-populated the directory with side artifacts that
- must survive — e.g. the native docx adapter pre-extracts image
- bytes into ``<base>.blocks.assets/`` before the writer runs,
- and passing ``AssetSpec.source=None`` lets the writer record
- them without copying.
- block_drawing_path_style: How ``<drawing path="...">`` in
- ``blocks.jsonl`` resolves the asset path. ``"with_prefix"``
- (default) renders ``<base>.blocks.assets/<filename>`` — matches
- the path stored in ``drawings.json``. ``"basename_only"``
- renders just ``<filename>``; legacy native docx convention
- (downstream consumers read the file path from ``drawings.json``,
- not from this attribute, so the basename-only form is purely
- cosmetic but kept for byte-equivalence with the original
- adapter).
- Returns:
- Dict shaped like the pipeline's existing ``parsed_data`` payload:
- ``{doc_id, file_path, parse_format, content, blocks_path}``.
- ``file_path`` is ``ir.document_name``; the caller resolves it to the
- actual on-disk path it wants persisted.
- """
- if block_drawing_path_style not in _VALID_BLOCK_DRAWING_PATH_STYLES:
- allowed = ", ".join(sorted(_VALID_BLOCK_DRAWING_PATH_STYLES))
- raise ValueError(
- f"block_drawing_path_style must be one of {allowed}, "
- f"got {block_drawing_path_style!r}"
- )
- if clean_parsed_dir and parsed_dir.exists():
- shutil.rmtree(parsed_dir)
- parsed_dir.mkdir(parents=True, exist_ok=True)
- base_name = Path(ir.document_name).stem or ir.document_name
- blocks_path = parsed_dir / f"{base_name}.blocks.jsonl"
- tables_path = parsed_dir / f"{base_name}.tables.json"
- drawings_path = parsed_dir / f"{base_name}.drawings.json"
- equations_path = parsed_dir / f"{base_name}.equations.json"
- assets_dir = parsed_dir / f"{base_name}.blocks.assets"
- # ``clean_parsed_dir=False`` is reserved for callers that pre-populate
- # the directory with artifacts that must survive (e.g. the native docx
- # adapter pre-extracts assets). If a stale ``blocks.jsonl`` is sitting
- # there, the caller forgot to pre-clean — warn so the leftover doesn't
- # get silently overwritten with partially-stale neighbors.
- if not clean_parsed_dir and blocks_path.exists():
- logger.warning(
- "[sidecar] clean_parsed_dir=False but %s already exists; "
- "caller is expected to pre-clean before invoking write_sidecar",
- blocks_path,
- )
- # Stage 1: realize assets first so drawings can carry resolved paths.
- asset_paths = _materialize_assets(ir.assets, assets_dir)
- # Stage 2: walk blocks, allocate ids, render templates, accumulate
- # sidecar item dicts and blocks.jsonl lines.
- doc_hash = doc_id.removeprefix("doc-")
- tables: dict[str, dict[str, Any]] = {}
- drawings: dict[str, dict[str, Any]] = {}
- equations: dict[str, dict[str, Any]] = {}
- blocks_lines: list[str] = []
- merged_parts: list[str] = []
- table_seq = 0
- drawing_seq = 0
- equation_seq = 0
- asset_prefix = f"{assets_dir.name}/"
- # ``block_index`` in the blockid hash refers to the position in the
- # SOURCE block list (``enumerate`` over ``ir.blocks``), not the emitted
- # position. Otherwise an editor turning a previously-non-empty block
- # into an empty one — which then gets dropped — would shift the
- # blockids of every block after it; we want stable ids across edits.
- for block_index, block in enumerate(ir.blocks):
- # Allocate ids for items declared on this block. Order: tables ->
- # drawings -> equations (per-block deterministic; the global
- # sequence advances across blocks).
- table_id_by_key: dict[str, str] = {}
- for table in block.tables:
- table_seq += 1
- tb_id = f"tb-{doc_hash}-{table_seq:04d}"
- table_id_by_key[table.placeholder_key] = tb_id
- drawing_id_by_key: dict[str, str] = {}
- for drawing in block.drawings:
- drawing_seq += 1
- im_id = f"im-{doc_hash}-{drawing_seq:04d}"
- drawing_id_by_key[drawing.placeholder_key] = im_id
- equation_id_by_key: dict[str, str] = {}
- for equation in block.equations:
- if not equation.is_block:
- continue
- equation_seq += 1
- eq_id = f"eq-{doc_hash}-{equation_seq:04d}"
- equation_id_by_key[equation.placeholder_key] = eq_id
- # Render placeholder template.
- rendered = _render_block_content(
- block,
- table_id_by_key=table_id_by_key,
- drawing_id_by_key=drawing_id_by_key,
- equation_id_by_key=equation_id_by_key,
- asset_paths=asset_paths,
- asset_prefix=asset_prefix,
- block_drawing_path_style=block_drawing_path_style,
- )
- rendered = rendered.strip()
- if not rendered:
- # Drop empty blocks entirely — neither blocks.jsonl entry nor
- # sidecar items (the items were tied to the placeholder; if it
- # vanished, the items are orphans). This mirrors the existing
- # native_docx behaviour and ensures merged_text is contiguous.
- continue
- blockid = hashlib.md5(
- f"{doc_id}:{block_index}:{block.heading}:{rendered}".encode("utf-8")
- ).hexdigest()
- # Realize per-block sidecar item dicts now that blockid is known.
- # Defensive: an adapter that declares an item on block.tables /
- # drawings / equations but omits the matching ``{{TBL/IMG/EQ:k}}``
- # token from ``content_template`` would leave the rendered text
- # without the corresponding tag. We detect that by checking whether
- # the allocated id (which is doc-unique) appears in the rendered
- # output, warn, and skip the sidecar entry — otherwise the per-
- # modality JSON would reference a blockid whose body never names it.
- for table in block.tables:
- tb_id = table_id_by_key[table.placeholder_key]
- if tb_id not in rendered:
- logger.warning(
- "[sidecar] orphan table id=%s on block %d "
- "(placeholder %r not referenced in content_template); "
- "skipping sidecar entry",
- tb_id,
- block_index,
- table.placeholder_key,
- )
- continue
- tables[tb_id] = _table_item_dict(tb_id, blockid, block.heading, table)
- for drawing in block.drawings:
- im_id = drawing_id_by_key[drawing.placeholder_key]
- if im_id not in rendered:
- logger.warning(
- "[sidecar] orphan drawing id=%s on block %d "
- "(placeholder %r not referenced in content_template); "
- "skipping sidecar entry",
- im_id,
- block_index,
- drawing.placeholder_key,
- )
- continue
- drawings[im_id] = _drawing_item_dict(
- im_id, blockid, block.heading, drawing, asset_paths, asset_prefix
- )
- for equation in block.equations:
- if not equation.is_block:
- continue
- eq_id = equation_id_by_key[equation.placeholder_key]
- if eq_id not in rendered:
- logger.warning(
- "[sidecar] orphan equation id=%s on block %d "
- "(placeholder %r not referenced in content_template); "
- "skipping sidecar entry",
- eq_id,
- block_index,
- equation.placeholder_key,
- )
- continue
- equations[eq_id] = _equation_item_dict(
- eq_id, blockid, block.heading, equation
- )
- row: dict[str, Any] = {
- "type": "content",
- "blockid": blockid,
- "format": "plain_text",
- "content": rendered,
- "heading": block.heading,
- "parent_headings": list(block.parent_headings),
- "level": int(block.level),
- "session_type": block.session_type or "body",
- "table_slice": block.table_slice or "none",
- "positions": [p.to_jsonable() for p in block.positions],
- }
- if block.table_header:
- row["table_header"] = block.table_header
- blocks_lines.append(json.dumps(row, ensure_ascii=False))
- merged_parts.append(rendered)
- # Stage 3: doc-level metadata.
- merged_text = "\n\n".join(p for p in merged_parts if p.strip())
- document_hash = hashlib.sha256(merged_text.encode("utf-8")).hexdigest()
- parse_time = datetime.now(timezone.utc).isoformat()
- asset_dir_present = assets_dir.exists() and any(assets_dir.iterdir())
- if not asset_dir_present and assets_dir.exists():
- try:
- assets_dir.rmdir()
- except OSError:
- pass
- meta: dict[str, Any] = {
- "type": "meta",
- "format": "lightrag",
- "version": "1.0",
- "document_name": ir.document_name,
- "document_format": ir.document_format,
- "document_hash": f"sha256:{document_hash}",
- "table_file": bool(tables),
- "equation_file": bool(equations),
- "drawing_file": bool(drawings),
- "asset_dir": asset_dir_present,
- "split_option": dict(ir.split_option or {}),
- "blocks": len(blocks_lines),
- "doc_id": doc_id,
- "parse_engine": engine,
- "parse_time": parse_time,
- "doc_title": ir.doc_title,
- }
- if ir.bbox_attributes is not None:
- meta["bbox_attributes"] = dict(ir.bbox_attributes)
- blocks_path.write_text(
- "\n".join([json.dumps(meta, ensure_ascii=False)] + blocks_lines) + "\n",
- encoding="utf-8",
- )
- # Sidecar JSONs end with a trailing newline (POSIX text-file convention;
- # also keeps end-of-file linters / pre-commit hooks happy and matches the
- # ``blocks.jsonl`` convention above).
- if tables:
- tables_path.write_text(
- json.dumps(
- {"version": "1.0", "tables": tables},
- ensure_ascii=False,
- indent=2,
- )
- + "\n",
- encoding="utf-8",
- )
- if drawings:
- drawings_path.write_text(
- json.dumps(
- {"version": "1.0", "drawings": drawings},
- ensure_ascii=False,
- indent=2,
- )
- + "\n",
- encoding="utf-8",
- )
- if equations:
- equations_path.write_text(
- json.dumps(
- {"version": "1.0", "equations": equations},
- ensure_ascii=False,
- indent=2,
- )
- + "\n",
- encoding="utf-8",
- )
- logger.info(
- "[sidecar] wrote %d blocks for doc_id=%s "
- "(%d tables, %d drawings, %d equations, assets=%s, engine=%s)",
- len(blocks_lines),
- doc_id,
- len(tables),
- len(drawings),
- len(equations),
- asset_dir_present,
- engine,
- )
- return {
- "doc_id": doc_id,
- "file_path": ir.document_name,
- "parse_format": FULL_DOCS_FORMAT_LIGHTRAG,
- "content": merged_text,
- "blocks_path": str(blocks_path),
- }
- # ---------------------------------------------------------------------------
- # Helpers
- # ---------------------------------------------------------------------------
- def _materialize_assets(
- assets: list[AssetSpec],
- assets_dir: Path,
- ) -> dict[str, str]:
- """Materialize :class:`AssetSpec` objects into ``assets_dir``.
- Returns: ``{ref: filename_inside_assets_dir}``.
- Collision policy: if two specs map to the same target name, the second
- gets a ``-2``, ``-3``, ... suffix on the stem. We never overwrite a file
- we've already produced.
- """
- if not assets:
- return {}
- assets_dir.mkdir(parents=True, exist_ok=True)
- out: dict[str, str] = {}
- used_names: set[str] = set()
- for spec in assets:
- target_name = _allocate_unique_name(spec.suggested_name, used_names)
- target_path = assets_dir / target_name
- if isinstance(spec.source, (str, Path)):
- src_path = Path(spec.source)
- if not src_path.exists():
- logger.warning(
- "[sidecar] asset source missing for ref=%s (%s); " "skipping copy",
- spec.ref,
- src_path,
- )
- continue
- if src_path.resolve() != target_path.resolve():
- shutil.copyfile(src_path, target_path)
- elif isinstance(spec.source, bytes):
- target_path.write_bytes(spec.source)
- elif spec.source is None:
- # Assumed already on disk at the target location (native_docx
- # writes assets during extraction). Verify presence; warn if
- # missing.
- if not target_path.exists():
- logger.warning(
- "[sidecar] asset ref=%s declared in place but %s " "is absent",
- spec.ref,
- target_path,
- )
- continue
- else:
- logger.warning(
- "[sidecar] unsupported AssetSpec.source type for ref=%s: %s",
- spec.ref,
- type(spec.source).__name__,
- )
- continue
- used_names.add(target_name)
- out[spec.ref] = target_name
- return out
- def _allocate_unique_name(suggested: str, used: set[str]) -> str:
- """Make ``suggested`` unique within ``used``: ``foo.png`` → ``foo-2.png``."""
- if suggested not in used:
- return suggested
- stem = Path(suggested).stem
- suffix = Path(suggested).suffix
- n = 2
- while True:
- cand = f"{stem}-{n}{suffix}"
- if cand not in used:
- return cand
- n += 1
- def _render_block_content(
- block: IRBlock,
- *,
- table_id_by_key: dict[str, str],
- drawing_id_by_key: dict[str, str],
- equation_id_by_key: dict[str, str],
- asset_paths: dict[str, str],
- asset_prefix: str,
- block_drawing_path_style: str = "with_prefix",
- ) -> str:
- """Expand placeholder tokens in ``block.content_template``."""
- tables_by_key = {t.placeholder_key: t for t in block.tables}
- drawings_by_key = {d.placeholder_key: d for d in block.drawings}
- equations_by_key = {e.placeholder_key: e for e in block.equations}
- def _table(key: str) -> str:
- table = tables_by_key.get(key)
- if table is None:
- return ""
- tb_id = table_id_by_key.get(key, "")
- if table.body_override is not None:
- # Verbatim block-text body — used by adapters that need to
- # preserve the parser's original whitespace/escaping (native
- # docx). Sidecar entry's ``content`` field still gets the
- # canonical ``table_body_for_rows`` encoding via
- # ``_table_item_dict``.
- fmt = "json" if table.rows is not None else "html"
- return render_table_tag(tb_id, fmt, table.body_override)
- if table.rows is not None:
- return render_table_tag(tb_id, "json", table_body_for_rows(table.rows))
- return render_table_tag(tb_id, "html", table.html or "")
- def _drawing(key: str) -> str:
- drawing = drawings_by_key.get(key)
- if drawing is None:
- return ""
- im_id = drawing_id_by_key.get(key, "")
- if drawing.path_override is not None:
- # Verbatim external/linked reference — pass through unchanged.
- path = drawing.path_override
- else:
- filename = asset_paths.get(drawing.asset_ref, "")
- if not filename:
- path = ""
- elif block_drawing_path_style == "basename_only":
- path = filename
- else:
- path = f"{asset_prefix}{filename}"
- return render_drawing_tag(
- im_id,
- drawing.fmt,
- drawing.caption,
- path,
- drawing.src,
- )
- def _equation(key: str) -> str:
- eq = equations_by_key.get(key)
- if eq is None:
- return ""
- if not eq.is_block:
- # Adapter mistake: an EQ token should only be used for block
- # equations. Treat as inline to avoid a dangling token.
- return render_equation_tag(None, eq.latex, eq.caption)
- eq_id = equation_id_by_key.get(key, "")
- return render_equation_tag(eq_id, eq.latex, eq.caption)
- def _inline_equation(key: str) -> str:
- eq = equations_by_key.get(key)
- if eq is None:
- return ""
- return render_equation_tag(None, eq.latex, eq.caption)
- return render_template(
- block.content_template,
- table_renderer=_table,
- drawing_renderer=_drawing,
- equation_renderer=_equation,
- inline_equation_renderer=_inline_equation,
- )
- def _table_item_dict(
- table_id: str,
- blockid: str,
- heading: str,
- table: IRTable,
- ) -> dict[str, Any]:
- if table.rows is not None:
- fmt = "json"
- content = table_body_for_rows(table.rows)
- else:
- fmt = "html"
- content = table.html or ""
- item: dict[str, Any] = {
- "id": table_id,
- "blockid": blockid,
- "heading": heading,
- "dimension": [int(table.num_rows), int(table.num_cols)],
- "format": fmt,
- "content": content,
- "caption": table.caption,
- "footnotes": list(table.footnotes),
- }
- if table.table_header is not None:
- # Spec §5: stored as JSON string.
- item["table_header"] = json.dumps(table.table_header, ensure_ascii=False)
- if table.self_ref:
- item["self_ref"] = table.self_ref
- if table.extras:
- item["extras"] = dict(table.extras)
- return item
- def _drawing_item_dict(
- drawing_id: str,
- blockid: str,
- heading: str,
- drawing: IRDrawing,
- asset_paths: dict[str, str],
- asset_prefix: str,
- ) -> dict[str, Any]:
- if drawing.path_override is not None:
- path = drawing.path_override
- else:
- filename = asset_paths.get(drawing.asset_ref, "")
- path = f"{asset_prefix}{filename}" if filename else ""
- item: dict[str, Any] = {
- "id": drawing_id,
- "blockid": blockid,
- "heading": heading,
- "format": drawing.fmt,
- "path": path,
- "src": drawing.src,
- "caption": drawing.caption,
- "footnotes": list(drawing.footnotes),
- }
- if drawing.self_ref:
- item["self_ref"] = drawing.self_ref
- if drawing.extras:
- item["extras"] = dict(drawing.extras)
- return item
- _LATEX_DOLLAR_RE = re.compile(r"^\s*\$\$?(.+?)\$\$?\s*$", re.DOTALL)
- def _strip_latex_dollar_wrappers(latex: str) -> str:
- """Strip leading/trailing ``$``/``$$`` wrappers from a latex string.
- ``equations.json`` stores clean latex (per the MinerU adapter contract:
- ``blocks.jsonl`` keeps the parser's raw form so the rendered
- ``<equation>`` body is byte-identical to the source, while the
- per-equation sidecar carries delimiter-free latex). Leaves strings
- without wrappers untouched.
- """
- if not latex:
- return latex
- m = _LATEX_DOLLAR_RE.match(latex)
- return m.group(1).strip() if m else latex.strip()
- def _equation_item_dict(
- eq_id: str,
- blockid: str,
- heading: str,
- equation: IREquation,
- ) -> dict[str, Any]:
- item: dict[str, Any] = {
- "id": eq_id,
- "blockid": blockid,
- "heading": heading,
- "format": "latex",
- "content": _strip_latex_dollar_wrappers(equation.latex),
- "caption": equation.caption,
- "footnotes": list(equation.footnotes),
- }
- if equation.self_ref:
- item["self_ref"] = equation.self_ref
- if equation.extras:
- item["extras"] = dict(equation.extras)
- return item
|