utils_pipeline.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. """Pipeline-specific helpers for document status, identity, and content.
  2. These helpers are shared by the LightRAG pipeline mixin (lightrag/pipeline.py)
  3. and by other LightRAG methods that touch the document ingestion paths
  4. (custom-chunks ingest, deletion, etc.). They are kept out of utils.py because
  5. they are tied to the doc_status / full_docs domain rather than to general
  6. text/token utilities.
  7. """
  8. from __future__ import annotations
  9. import hashlib
  10. import json
  11. import os
  12. import re
  13. import time
  14. from pathlib import Path
  15. from typing import Any, cast
  16. from urllib.parse import quote, unquote, urlsplit
  17. from lightrag.base import DocProcessingStatus, DocStatus, DocStatusStorage
  18. from lightrag.constants import (
  19. FULL_DOCS_FORMAT_LIGHTRAG,
  20. LIGHTRAG_DOC_CONTENT_PREFIX,
  21. PARSED_DIR_NAME,
  22. )
  23. from lightrag.parser.routing import canonicalize_parser_hinted_basename
  24. from lightrag.utils import (
  25. compute_mdhash_id,
  26. logger,
  27. move_file_to_parsed_dir,
  28. )
  29. PLACEHOLDER_DOCUMENT_SOURCES = {"", "no-file-path", "unknown_source"}
  30. SIDECAR_LOCATION_UNKNOWN = "unknown_source"
  31. def build_chunks_dict_from_chunking_result(
  32. chunking_result: list[dict[str, Any]],
  33. *,
  34. doc_id: str,
  35. file_path: str,
  36. ) -> dict[str, dict[str, Any]]:
  37. """Assemble the per-doc chunks dict written into chunks_vdb / text_chunks.
  38. Resolves a stable ``chunk_key`` for each entry — preferring an explicit
  39. ``chunk_id`` (auto-prefixed with ``doc_id-`` if not already), falling back
  40. to a positional ``chunk-NNN`` derived from ``chunk_order_index``, and
  41. finally hashing on collision so two entries inside one document never
  42. overwrite each other.
  43. """
  44. chunks: dict[str, dict[str, Any]] = {}
  45. for dp in chunking_result:
  46. chunk_content = dp.get("content", "")
  47. if not chunk_content:
  48. continue
  49. raw_chunk_id = dp.get("chunk_id", "")
  50. order = dp.get("chunk_order_index")
  51. if isinstance(raw_chunk_id, str) and raw_chunk_id.strip():
  52. chunk_key = (
  53. raw_chunk_id
  54. if raw_chunk_id.startswith(f"{doc_id}-")
  55. else f"{doc_id}-{raw_chunk_id}"
  56. )
  57. elif isinstance(order, int):
  58. chunk_key = f"{doc_id}-chunk-{order:03d}"
  59. else:
  60. chunk_key = compute_mdhash_id(f"{doc_id}:{chunk_content}", prefix="chunk-")
  61. # Hard collision guard (same chunk_id inside one document).
  62. if chunk_key in chunks:
  63. chunk_key = compute_mdhash_id(
  64. f"{doc_id}:{order}:{chunk_content}",
  65. prefix="chunk-",
  66. )
  67. # Preserve any pre-populated cache ids on dp (multimodal chunks
  68. # arrive with analysis cache ids already attached so document
  69. # deletion can find them via the per-chunk llm_cache_list).
  70. existing_cache_list = dp.get("llm_cache_list")
  71. seed_cache_list: list[str] = []
  72. if isinstance(existing_cache_list, list):
  73. seen: set[str] = set()
  74. for entry in existing_cache_list:
  75. key = str(entry or "").strip()
  76. if key and key not in seen:
  77. seen.add(key)
  78. seed_cache_list.append(key)
  79. chunks[chunk_key] = {
  80. **dp,
  81. "full_doc_id": doc_id,
  82. "file_path": file_path,
  83. "llm_cache_list": seed_cache_list,
  84. }
  85. return chunks
  86. def chunk_fields_from_status_doc(
  87. status_doc: DocProcessingStatus,
  88. ) -> tuple[list[str], int]:
  89. """Return (chunks_list, chunks_count) preserved from a status document.
  90. Filters out any non-string or empty chunk IDs. When chunks_count is
  91. absent or invalid, it is inferred from the length of chunks_list.
  92. """
  93. chunks_list: list[str] = []
  94. if isinstance(status_doc.chunks_list, list):
  95. chunks_list = [
  96. chunk_id
  97. for chunk_id in status_doc.chunks_list
  98. if isinstance(chunk_id, str) and chunk_id
  99. ]
  100. if isinstance(status_doc.chunks_count, int) and status_doc.chunks_count >= 0:
  101. return chunks_list, status_doc.chunks_count
  102. return chunks_list, len(chunks_list)
  103. def resolve_doc_file_path(
  104. status_doc: DocProcessingStatus | None = None,
  105. content_data: dict[str, Any] | None = None,
  106. ) -> str:
  107. """Resolve the best available document file path.
  108. Returns the first non-placeholder ``file_path`` from doc_status, then
  109. full_docs. Both are already canonicalized at write time, so this only
  110. has to skip placeholder sentinels.
  111. """
  112. for source in (
  113. getattr(status_doc, "file_path", None),
  114. content_data.get("file_path") if content_data else None,
  115. ):
  116. if not isinstance(source, str):
  117. continue
  118. candidate = source.strip()
  119. if candidate and candidate not in PLACEHOLDER_DOCUMENT_SOURCES:
  120. return candidate
  121. return "unknown_source"
  122. def normalize_document_file_path(file_path: Any) -> str:
  123. """Return the canonical basename stored as ``file_path``.
  124. Strips any supported ``[hint]`` segment so ``abc.docx`` and
  125. ``abc.[native-iet].docx`` map to the same key. Collapses placeholders to
  126. ``"unknown_source"``. Idempotent.
  127. """
  128. source = str(file_path or "").strip()
  129. if source in PLACEHOLDER_DOCUMENT_SOURCES:
  130. return "unknown_source"
  131. canonical = canonicalize_parser_hinted_basename(source).strip()
  132. if canonical in PLACEHOLDER_DOCUMENT_SOURCES:
  133. return "unknown_source"
  134. return canonical or "unknown_source"
  135. # Back-compat alias retained until call sites that import the old name are
  136. # all switched over (the public surface is ``normalize_document_file_path``).
  137. document_canonical_key = normalize_document_file_path
  138. def has_known_document_source(source_key: str) -> bool:
  139. return source_key not in PLACEHOLDER_DOCUMENT_SOURCES
  140. def doc_status_field(doc: Any, field: str, default: Any = "") -> Any:
  141. if isinstance(doc, dict):
  142. return doc.get(field, default)
  143. return getattr(doc, field, default)
  144. # Long-lived per-document metadata fields that must survive every
  145. # doc_status state transition. ``process_options`` records the user's
  146. # per-file processing strategy at enqueue time and is read by analyze /
  147. # chunk / KG-skip stages and by admin/list APIs throughout the document's
  148. # lifetime, so we cannot let an intermediate transition (PARSING /
  149. # ANALYZING / PROCESSING / PROCESSED / FAILED upsert) clobber it.
  150. # ``parse_warnings`` records non-fatal parser warnings (e.g. legacy docx
  151. # tables missing ``w14:paraId``) that admins should be able to surface
  152. # alongside the document record after PROCESSED.
  153. # ``chunk_opts`` is written when entering PROCESSING (via ``extraction_meta``)
  154. # and records the actual chunker params used for that document in the same
  155. # format as the ``Chunking <strategy>: ...`` log line (params portion only).
  156. # Carrying it forward keeps the value visible after PROCESSING -> FAILED,
  157. # whose ``metadata_extra`` only carries timing fields.
  158. # ``parsing_start_time`` / ``analyzing_start_time`` are Unix epoch seconds
  159. # stamped at the entry of ``_parse_worker`` / ``_analyze_worker`` (mirrors
  160. # the existing ``processing_start_time`` set when entering PROCESSING) so
  161. # per-stage durations can be derived from doc_status post-mortem.
  162. # ``parsing_end_time`` is the paired Unix epoch seconds stamped by
  163. # ``_parse_worker`` when the parse stage actually runs (cache-miss branch,
  164. # covering ``parse_native`` too which has no cache concept). Absent on
  165. # cache-hit attempts (``parse_stage_skipped`` is set instead).
  166. # ``analyzing_end_time`` is the paired Unix epoch seconds stamped by
  167. # ``_analyze_worker`` only when ``analyze_multimodal`` returns with
  168. # ``multimodal_processed=True`` (the explicit "fully completed" sentinel).
  169. # It is intentionally NOT stamped on soft-swallowed exception paths or on
  170. # malformed/empty sidecar early returns inside ``analyze_multimodal``, so
  171. # operators can distinguish "analyze actually completed" from "analyze
  172. # attempted but bailed".
  173. # ``parse_stage_skipped`` is written by ``parse_mineru`` / ``parse_docling``
  174. # when the raw bundle cache is valid and the parse stage round trip is
  175. # skipped; absence == not skipped (e.g. native parser, or cache miss).
  176. # ``analyzing_stage_skipped`` is its analyze-stage counterpart, written by
  177. # ``analyze_multimodal``'s three user/config early-return branches (no
  178. # blocks_path, blocks file missing, or user opted out of every i/t/e
  179. # modality). Soft-swallowed exception paths are intentionally NOT considered
  180. # "skipped" — they write neither end_time nor skipped (failure is its own
  181. # state, captured via the FAILED transition's ``error_msg``).
  182. # Within each stage, the ``*_end_time`` and ``*_stage_skipped`` fields are
  183. # mutually exclusive (at most one is written per attempt; both may be
  184. # absent if analyze soft-failed).
  185. # ``source_file_name`` records the original pending-parse source basename used
  186. # by parser workers; it is intentionally separate from canonical ``file_path``.
  187. #
  188. # The order of this tuple is the rendering order of metadata fields in
  189. # the WebUI ``DocumentStatusDetailsDialog`` (carry-over builds the new
  190. # metadata dict by iterating this tuple, and dict / JSON / JSX preserve
  191. # insertion order all the way to the rendered output). Keep fields
  192. # grouped by stage: parse-stage fields together, analyze-stage fields
  193. # together, etc., so the dialog reads top-to-bottom along the pipeline.
  194. _DOC_STATUS_METADATA_CARRY_OVER_KEYS: tuple[str, ...] = (
  195. "process_options",
  196. "source_file_name",
  197. "parse_warnings",
  198. "chunk_opts",
  199. "parsing_start_time",
  200. "parsing_end_time",
  201. "parse_stage_skipped",
  202. "analyzing_start_time",
  203. "analyzing_end_time",
  204. "analyzing_stage_skipped",
  205. )
  206. def doc_status_metadata_carry_over(status_doc: Any) -> dict[str, Any]:
  207. """Return the subset of ``status_doc.metadata`` to preserve across upserts.
  208. ``doc_status`` storage backends generally treat the ``metadata`` field
  209. as an opaque blob and **replace** it on every upsert, so callers must
  210. explicitly carry forward fields they want to keep. This helper centralises
  211. the list of fields we always carry: today only ``process_options``, but
  212. new long-lived metadata can be added by extending
  213. ``_DOC_STATUS_METADATA_CARRY_OVER_KEYS``.
  214. """
  215. if status_doc is None:
  216. return {}
  217. raw_metadata = doc_status_field(status_doc, "metadata", {})
  218. if not isinstance(raw_metadata, dict):
  219. return {}
  220. carry: dict[str, Any] = {}
  221. for key in _DOC_STATUS_METADATA_CARRY_OVER_KEYS:
  222. if key in raw_metadata and raw_metadata[key] not in (None, ""):
  223. carry[key] = raw_metadata[key]
  224. return carry
  225. def doc_status_transition_metadata(
  226. status_doc: Any,
  227. *,
  228. extra: dict[str, Any] | None = None,
  229. ) -> dict[str, Any]:
  230. """Build a doc_status ``metadata`` payload that preserves carry-over fields.
  231. Use at every state-transition upsert site so the user's
  232. ``process_options`` (and any future long-lived metadata fields) survive
  233. PENDING → PARSING → ANALYZING → PROCESSING → PROCESSED / FAILED.
  234. """
  235. payload = doc_status_metadata_carry_over(status_doc)
  236. if extra:
  237. payload.update(extra)
  238. return payload
  239. def doc_status_value(doc: Any) -> str:
  240. status = doc_status_field(doc, "status", "")
  241. if isinstance(status, DocStatus):
  242. return status.value
  243. return str(status or "")
  244. # Sidecar item ids embed ``doc_hash`` (= doc_id without the ``doc-`` prefix),
  245. # and for pending_parse uploads doc_id derives from the filename — so the
  246. # same content under two filenames renders with different ids in
  247. # ``merged_text``. Strip those surfaces before hashing so cross-filename
  248. # content_hash dedup actually fires.
  249. _SIDECAR_ID_PATTERN = re.compile(r"\b(tb|im|eq)-[0-9a-f]{32}-(\d{4})\b")
  250. _ASSET_PATH_PATTERN = re.compile(r'(?<=path=")[^"]*\.blocks\.assets/')
  251. def normalize_merged_text_for_hash(content: str) -> str:
  252. """Strip filename-derived prefixes from sidecar ids and asset paths.
  253. Idempotent and safe on plain text (matches the doc_hash literal only —
  254. 32 lowercase hex digits between the modality prefix and a 4-digit
  255. sequence). RAW text bodies without sidecar markup pass through
  256. unchanged.
  257. """
  258. if not content:
  259. return content
  260. content = _SIDECAR_ID_PATTERN.sub(r"\1-<DOC>-\2", content)
  261. content = _ASSET_PATH_PATTERN.sub("<ASSETS>/", content)
  262. return content
  263. def compute_text_content_hash(content: str) -> str:
  264. """MD5 hex digest of text content used for cross-filename dedup.
  265. Input is normalized via :func:`normalize_merged_text_for_hash` first so
  266. sidecar-rendered bodies dedupe across filenames despite carrying
  267. filename-derived item ids and asset paths.
  268. """
  269. return compute_mdhash_id(normalize_merged_text_for_hash(content), prefix="")
  270. def compute_file_content_hash(path_str: str) -> str | None:
  271. """Stream-compute MD5 of a file's bytes; returns None if unreadable.
  272. Resolves the LightRAG ``*.blocks.jsonl`` conventions used by
  273. ``_load_lightrag_document_content`` so the hash matches the actual
  274. document body regardless of whether ``path_str`` points at the blocks
  275. file directly or its parent directory/base name.
  276. """
  277. if not path_str:
  278. return None
  279. try:
  280. path = Path(path_str)
  281. if path.is_dir():
  282. candidates = sorted(path.glob("*.blocks.jsonl"))
  283. if not candidates:
  284. return None
  285. path = candidates[0]
  286. elif not (path.exists() and path.is_file()):
  287. blocks_path = Path(path_str + ".blocks.jsonl")
  288. if blocks_path.exists() and blocks_path.is_file():
  289. path = blocks_path
  290. else:
  291. return None
  292. h = hashlib.md5()
  293. with path.open("rb") as f:
  294. for chunk in iter(lambda: f.read(65536), b""):
  295. h.update(chunk)
  296. return h.hexdigest()
  297. except Exception as e:
  298. logger.warning(f"Failed to compute file content hash for {path_str}: {e}")
  299. return None
  300. def configured_input_dir() -> Path:
  301. input_dir = os.getenv("INPUT_DIR", "").strip()
  302. return Path(input_dir) if input_dir else Path.cwd() / "inputs"
  303. async def get_existing_doc_by_file_basename(
  304. doc_status: DocStatusStorage, file_path: Any
  305. ) -> tuple[str, Any] | None:
  306. """Find an existing doc_status record by canonical file basename.
  307. Inputs are normalized via :func:`normalize_document_file_path` so callers
  308. may pass either the bare canonical name (``abc.docx``) or a hint-bearing
  309. variant (``abc.[native-iet].docx``); both resolve to the same logical
  310. document.
  311. """
  312. basename = normalize_document_file_path(file_path)
  313. if basename == "unknown_source":
  314. return None
  315. return await doc_status.get_doc_by_file_basename(basename)
  316. async def get_existing_doc_by_content_hash(
  317. doc_status: DocStatusStorage, content_hash: str
  318. ) -> tuple[str, Any] | None:
  319. """Find an existing doc_status record by content hash."""
  320. if not content_hash:
  321. return None
  322. return await doc_status.get_doc_by_content_hash(content_hash)
  323. async def get_duplicate_doc_by_content_hash(
  324. doc_status: DocStatusStorage, content_hash: str, current_doc_id: str
  325. ) -> tuple[str, Any] | None:
  326. """Find another doc_status record with the same content hash."""
  327. if not content_hash:
  328. return None
  329. match = await doc_status.get_doc_by_content_hash(content_hash)
  330. if match and match[0] != current_doc_id:
  331. return match
  332. try:
  333. docs = await doc_status.get_docs_by_statuses(list(DocStatus))
  334. except Exception:
  335. return None
  336. for doc_id, doc in docs.items():
  337. if doc_id == current_doc_id:
  338. continue
  339. if doc_status_field(doc, "content_hash", "") == content_hash:
  340. return doc_id, doc
  341. return None
  342. def make_lightrag_doc_content(merged_text: str) -> str:
  343. """Build the ``full_docs.content`` value for ``format=lightrag`` records.
  344. The result has shape ``"{{LRdoc}}<merged_text>"`` — the marker prefix
  345. distinguishes lightrag-format full_docs from raw-format ones, and the
  346. body is the complete merged text from the ``.blocks.jsonl`` content
  347. lines so F-chunking can run identically on raw and lightrag inputs
  348. (the prefix is stripped at chunking time via
  349. ``strip_lightrag_doc_prefix``).
  350. """
  351. return f"{LIGHTRAG_DOC_CONTENT_PREFIX}{merged_text or ''}"
  352. def strip_lightrag_doc_prefix(content: str | None, parse_format: str | None) -> str:
  353. """Return the bare body for a stored ``full_docs.content`` value.
  354. The ``{{LRdoc}}`` marker is stripped **only** when ``parse_format``
  355. indicates the record is in lightrag format. Any other ``parse_format``
  356. (``raw``, ``pending_parse``, ``None`` ...) returns the content
  357. unchanged so a raw document whose literal body happens to start with
  358. ``{{LRdoc}}`` is never silently truncated.
  359. Centralizing the format check here turns "must check format before
  360. stripping" from a caller-side discipline into a structural property of
  361. the function: any future call site that forgets to gate is protected
  362. automatically.
  363. """
  364. if (
  365. parse_format == FULL_DOCS_FORMAT_LIGHTRAG
  366. and isinstance(content, str)
  367. and content.startswith(LIGHTRAG_DOC_CONTENT_PREFIX)
  368. ):
  369. return content[len(LIGHTRAG_DOC_CONTENT_PREFIX) :]
  370. return content or ""
  371. # ---------------------------------------------------------------------------
  372. # Document path / artifact helpers (moved from _PipelineMixin)
  373. # ---------------------------------------------------------------------------
  374. def input_dir_path() -> Path:
  375. return configured_input_dir()
  376. def parsed_dir() -> Path:
  377. """Return the project-wide parsed-artifact root: ``<input_dir>/__parsed__``."""
  378. return input_dir_path() / PARSED_DIR_NAME
  379. def parsed_artifact_dir_for(
  380. file_path: str, *, parent_hint: Path | str | None = None
  381. ) -> Path:
  382. """Return the per-document sidecar directory for ``file_path``.
  383. ``file_path`` must already be canonical (run ``normalize_document_file_path``
  384. first if unsure). When ``parent_hint`` is supplied (e.g. the live source
  385. file's parent), the sidecar is placed next to it under ``__parsed__/``
  386. rather than under the global ``input_dir``; this keeps test isolation
  387. intact when the source lives outside ``INPUT_DIR``. On collision with an
  388. existing non-directory entry, the helper appends ``_001``..``_999`` and
  389. finally a unix timestamp suffix.
  390. """
  391. if parent_hint is not None:
  392. hint = Path(parent_hint)
  393. # ``hint`` may already point at a ``__parsed__/`` dir (e.g. when the
  394. # caller re-archived a source); reuse it in place rather than nesting.
  395. root = hint if hint.name == PARSED_DIR_NAME else hint / PARSED_DIR_NAME
  396. else:
  397. root = parsed_dir()
  398. source_name = (
  399. canonicalize_parser_hinted_basename(file_path or "document") or "document"
  400. )
  401. artifact_name = f"{source_name}.parsed"
  402. artifact_dir = root / artifact_name
  403. if not artifact_dir.exists() or artifact_dir.is_dir():
  404. return artifact_dir
  405. for i in range(1, 1000):
  406. candidate = root / f"{artifact_name}_{i:03d}"
  407. if not candidate.exists() or candidate.is_dir():
  408. return candidate
  409. return root / f"{artifact_name}_{int(time.time())}"
  410. # ---------------------------------------------------------------------------
  411. # Sidecar URI helpers (``full_docs.sidecar_location``)
  412. # ---------------------------------------------------------------------------
  413. #
  414. # Sidecar URI scheme conventions:
  415. # - Local: ``file:///abs/path/to/abc.parsed/`` (trailing slash required)
  416. # - Remote: ``s3://bucket/workspace/abc.parsed/`` (future; resolver returns
  417. # None today so local readers gracefully skip)
  418. # - Unknown sentinel: literal string ``"unknown_source"``
  419. def sidecar_uri_for(parsed_artifact_dir: Path | str) -> str:
  420. """Build the canonical sidecar URI for a local artifact directory.
  421. The result always ends with ``/`` so a reader can distinguish a directory
  422. from a file at the URI level. Non-ASCII characters are percent-encoded.
  423. """
  424. p = Path(parsed_artifact_dir).resolve()
  425. encoded = quote(str(p), safe="/")
  426. return f"file://{encoded}/"
  427. def resolve_sidecar_uri(uri: str | None) -> Path | None:
  428. """Decode a sidecar URI into a local filesystem Path.
  429. Returns None for the unknown sentinel, empty input, or any non-``file://``
  430. scheme (remote schemes will get their own resolvers).
  431. """
  432. if not uri or uri == SIDECAR_LOCATION_UNKNOWN:
  433. return None
  434. parts = urlsplit(uri)
  435. if parts.scheme != "file":
  436. return None
  437. path_str = unquote(parts.path)
  438. if path_str.endswith("/") and len(path_str) > 1:
  439. path_str = path_str[:-1]
  440. return Path(path_str)
  441. def sidecar_blocks_path(uri: str | None) -> str | None:
  442. """Locate the first ``*.blocks.jsonl`` file inside a sidecar URI.
  443. Returns the absolute path as a string, or None when the URI cannot be
  444. resolved locally or the directory holds no blocks file.
  445. """
  446. d = resolve_sidecar_uri(uri)
  447. if d is None or not d.is_dir():
  448. return None
  449. candidates = sorted(d.glob("*.blocks.jsonl"))
  450. return str(candidates[0]) if candidates else None
  451. def sidecar_modality_path(uri: str | None, modality: str) -> str | None:
  452. """Return the path for a sidecar modality JSON (drawings/tables/equations).
  453. Does not require the file to exist — callers check. Returns None when the
  454. sidecar URI cannot be resolved or has no blocks file to anchor the name.
  455. """
  456. blocks = sidecar_blocks_path(uri)
  457. if not blocks:
  458. return None
  459. return f"{blocks[: -len('.blocks.jsonl')]}.{modality}.json"
  460. def sidecar_assets_dir_for_uri(uri: str | None) -> Path | None:
  461. """Return the ``*.blocks.assets/`` directory Path for a sidecar URI.
  462. The directory may not exist; callers create it on first asset write.
  463. """
  464. blocks = sidecar_blocks_path(uri)
  465. if not blocks:
  466. return None
  467. return Path(f"{blocks[: -len('.blocks.jsonl')]}.blocks.assets")
  468. # ---------------------------------------------------------------------------
  469. # Source archive helpers
  470. # ---------------------------------------------------------------------------
  471. async def archive_docx_source_after_full_docs_sync(source_path: str) -> str | None:
  472. source = Path(source_path)
  473. try:
  474. target = await move_file_to_parsed_dir(source, skip_if_already_parsed=True)
  475. except Exception as e:
  476. logger.warning(
  477. f"[parse] Source archive skipped after full_docs sync: {source_path}: {e}"
  478. )
  479. return None
  480. if target is None:
  481. return None
  482. if target != source:
  483. logger.debug(
  484. f"[parse] Archived DOCX source after full_docs sync: {source} -> {target}"
  485. )
  486. return str(target)
  487. async def archive_source_after_full_docs_sync(source_path: str) -> str | None:
  488. return await archive_docx_source_after_full_docs_sync(source_path)
  489. # ---------------------------------------------------------------------------
  490. # LightRAG Document blocks loader
  491. # ---------------------------------------------------------------------------
  492. async def load_lightrag_document_content(sidecar_uri: str) -> tuple[str, str]:
  493. """Load LightRAG Document blocks and return ``(merged_text, blocks_path)``.
  494. ``sidecar_uri`` is a sidecar location URI (see ``sidecar_uri_for``); this
  495. locates the ``*.blocks.jsonl`` file inside it, reads the content lines
  496. (skipping the meta header at index 0 and any non-content entries), and
  497. returns the merged body plus the absolute blocks path.
  498. """
  499. resolved = sidecar_blocks_path(sidecar_uri)
  500. if resolved is None:
  501. raise FileNotFoundError(
  502. f"LightRAG blocks file not found from sidecar uri: {sidecar_uri}"
  503. )
  504. blocks_path = Path(resolved)
  505. merged_parts: list[str] = []
  506. with blocks_path.open("r", encoding="utf-8") as f:
  507. for i, line in enumerate(f):
  508. text = line.strip()
  509. if not text:
  510. continue
  511. obj = json.loads(text)
  512. if i == 0:
  513. continue
  514. if obj.get("type") != "content":
  515. continue
  516. content = obj.get("content", "")
  517. if isinstance(content, str) and content.strip():
  518. merged_parts.append(content)
  519. return "\n\n".join(merged_parts), str(blocks_path)
  520. # ---------------------------------------------------------------------------
  521. # Payload introspection helpers (parser response normalization)
  522. # ---------------------------------------------------------------------------
  523. def get_by_path(payload: Any, path: str) -> Any:
  524. if not path:
  525. return None
  526. cur = payload
  527. for part in path.split("."):
  528. if isinstance(cur, dict) and part in cur:
  529. cur = cur[part]
  530. else:
  531. return None
  532. return cur
  533. def extract_content_list_from_payload(
  534. payload: Any,
  535. ) -> list[dict[str, Any]] | None:
  536. """Try to find a MinerU/Docling-like content list from arbitrary JSON payload."""
  537. if isinstance(payload, list):
  538. if payload and all(isinstance(x, dict) for x in payload):
  539. first = payload[0]
  540. if "type" in first or "label" in first or "text" in first:
  541. return cast(list[dict[str, Any]], payload)
  542. return None
  543. if not isinstance(payload, dict):
  544. return None
  545. # Common direct keys first
  546. for key in ("content_list", "content", "items", "result"):
  547. value = payload.get(key)
  548. if isinstance(value, list):
  549. extracted = extract_content_list_from_payload(value)
  550. if extracted is not None:
  551. return extracted
  552. elif isinstance(value, dict):
  553. extracted = extract_content_list_from_payload(value)
  554. if extracted is not None:
  555. return extracted
  556. # Deep search as fallback
  557. for value in payload.values():
  558. extracted = extract_content_list_from_payload(value)
  559. if extracted is not None:
  560. return extracted
  561. return None
  562. def normalize_parser_result_to_content_list(
  563. parser_result: str | list[dict[str, Any]] | dict[str, Any] | None,
  564. ) -> list[dict[str, Any]] | None:
  565. """Normalize parser result to structured content list if possible."""
  566. if parser_result is None:
  567. return None
  568. if isinstance(parser_result, list):
  569. return extract_content_list_from_payload(parser_result)
  570. if isinstance(parser_result, dict):
  571. return extract_content_list_from_payload(parser_result)
  572. text = str(parser_result).strip()
  573. if not text:
  574. return None
  575. try:
  576. payload = json.loads(text)
  577. return extract_content_list_from_payload(payload)
  578. except Exception:
  579. return None
  580. # Multimodal entity injection used to live here as a centralized post-pass
  581. # over all chunk_results. It has been moved into
  582. # :func:`lightrag.operate.extract_entities._process_single_content` so each
  583. # multimodal chunk injects its own entity/relation records while still under
  584. # its concurrency slot. The chunk's ``sidecar.type`` (drawing/table/equation)
  585. # is the dispatch key; see operate.py for the new logic.