routing.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. from __future__ import annotations
  2. import fnmatch
  3. import os
  4. import re
  5. from dataclasses import dataclass
  6. from pathlib import Path
  7. from typing import Any
  8. from lightrag.constants import (
  9. DEFAULT_CHUNK_P_SIZE,
  10. DEFAULT_R_SEPARATORS,
  11. DEFAULT_SENTENCE_SPLIT_REGEX,
  12. FULL_DOCS_FORMAT_LIGHTRAG,
  13. FULL_DOCS_FORMAT_PENDING_PARSE,
  14. FULL_DOCS_FORMAT_RAW,
  15. PARSER_ENGINE_DOCLING,
  16. PARSER_ENGINE_LEGACY,
  17. PARSER_ENGINE_MINERU,
  18. PARSER_ENGINE_NATIVE,
  19. PARSER_ENGINE_SUFFIX_CAPABILITIES,
  20. PROCESS_OPTION_CHUNK_CHARS,
  21. PROCESS_OPTION_CHUNK_FIXED,
  22. PROCESS_OPTION_CHUNK_VECTOR,
  23. PROCESS_OPTION_CHUNK_PARAGRAH,
  24. PROCESS_OPTION_CHUNK_RECURSIVE,
  25. PROCESS_OPTION_EQUATIONS,
  26. PROCESS_OPTION_IMAGES,
  27. PROCESS_OPTION_SKIP_KG,
  28. PROCESS_OPTION_TABLES,
  29. ProcessChunkingOption,
  30. SUPPORTED_PARSER_ENGINES,
  31. SUPPORTED_PROCESS_OPTIONS,
  32. )
  33. from lightrag.utils import logger, parse_optional_float
  34. import json
  35. from collections.abc import Mapping
  36. from copy import deepcopy
  37. _PARSER_RULE_SPLIT_RE = re.compile(r"[;,]")
  38. _PARSER_ENGINE_ENDPOINT_ENV = {
  39. PARSER_ENGINE_DOCLING: "DOCLING_ENDPOINT",
  40. }
  41. _VALID_MINERU_API_MODES = {"official", "local"}
  42. # Trailing parser-hint pattern: matches ``.[engine].ext`` at end of basename.
  43. # Group 1 captures the raw engine token (still needs normalize_parser_engine
  44. # and SUPPORTED_PARSER_ENGINES validation); group 2 captures ``.ext`` so it
  45. # can be reattached when stripping the hint.
  46. _PARSER_HINT_RE = re.compile(r"\.\[([^\]]*)\](\.[^.]+)$")
  47. class ParserRoutingConfigError(ValueError):
  48. """Raised when LIGHTRAG_PARSER contains an invalid routing rule."""
  49. class FilenameParserHintError(ValueError):
  50. """Raised when a filename parser hint is invalid for ingestion."""
  51. def normalize_parser_engine(engine: Any) -> str:
  52. """Normalize engine hints such as mineru-iet to mineru."""
  53. return str(engine or "").strip().split("-", 1)[0].lower()
  54. # ---------------------------------------------------------------------------
  55. # Per-file processing options (i/t/e/!/F/R/V/P)
  56. # ---------------------------------------------------------------------------
  57. @dataclass(frozen=True)
  58. class ProcessOptions:
  59. """Decoded view of a ``process_options`` string.
  60. The ``raw`` string is preserved verbatim (with duplicates and ordering)
  61. for storage / audit purposes; boolean flags reflect the deduped logical
  62. state used by the pipeline.
  63. """
  64. raw: str = ""
  65. images: bool = False
  66. tables: bool = False
  67. equations: bool = False
  68. skip_kg: bool = False
  69. chunking: ProcessChunkingOption = PROCESS_OPTION_CHUNK_FIXED
  70. @property
  71. def chunking_explicit(self) -> bool:
  72. """True iff ``raw`` actually contains a chunking selector char.
  73. Distinguishes "user explicitly opted into a chunking strategy"
  74. from "no chunking selector supplied — pipeline used the default".
  75. ``chunking`` itself is unreliable for this question because it
  76. falls back to :data:`PROCESS_OPTION_CHUNK_FIXED` in both cases.
  77. Used by ``process_single_document`` to decide whether to
  78. dispatch via the new file-chunker contract or to honor the
  79. legacy externally-supplied :attr:`LightRAG.chunking_func`.
  80. """
  81. return any(c in PROCESS_OPTION_CHUNK_CHARS for c in self.raw)
  82. _PROCESS_OPTION_DEFAULT = ProcessOptions()
  83. def sanitize_process_options(options: Any) -> str:
  84. """Strip non-supported characters / hyphen / whitespace from an options string.
  85. Returns the raw token sequence as-is (no dedup, no reorder) so the
  86. canonical user intent is preserved on disk. Invalid characters are
  87. silently dropped — the caller is expected to have already validated.
  88. """
  89. if not options:
  90. return ""
  91. return "".join(ch for ch in str(options) if ch in SUPPORTED_PROCESS_OPTIONS)
  92. def validate_process_options(
  93. options: str, *, label: str = "process options"
  94. ) -> list[str]:
  95. """Return a list of error messages for an options string; empty if valid."""
  96. errors: list[str] = []
  97. if not options:
  98. return errors
  99. seen_chunkers: list[str] = []
  100. for ch in options:
  101. if ch in (" ", "-"):
  102. continue
  103. if ch not in SUPPORTED_PROCESS_OPTIONS:
  104. errors.append(f"{label} contains unsupported character {ch!r}")
  105. continue
  106. if ch in PROCESS_OPTION_CHUNK_CHARS and ch not in seen_chunkers:
  107. seen_chunkers.append(ch)
  108. if len(seen_chunkers) > 1:
  109. errors.append(
  110. f"{label} specifies multiple chunking modes "
  111. f"({'/'.join(seen_chunkers)}); pick one of "
  112. f"{PROCESS_OPTION_CHUNK_FIXED}/{PROCESS_OPTION_CHUNK_RECURSIVE}/{PROCESS_OPTION_CHUNK_VECTOR}/{PROCESS_OPTION_CHUNK_PARAGRAH}"
  113. )
  114. return errors
  115. def parse_process_options(options: Any) -> ProcessOptions:
  116. """Decode a process-options string into a :class:`ProcessOptions` view."""
  117. raw = sanitize_process_options(options)
  118. if not raw:
  119. return _PROCESS_OPTION_DEFAULT
  120. chars = set(raw)
  121. chunking: ProcessChunkingOption = PROCESS_OPTION_CHUNK_FIXED
  122. # Pick the first chunking selector encountered; validate_process_options
  123. # already filters duplicates upstream.
  124. for ch in raw:
  125. if ch in PROCESS_OPTION_CHUNK_CHARS:
  126. chunking = ch # type: ignore[assignment]
  127. break
  128. return ProcessOptions(
  129. raw=raw,
  130. images=PROCESS_OPTION_IMAGES in chars,
  131. tables=PROCESS_OPTION_TABLES in chars,
  132. equations=PROCESS_OPTION_EQUATIONS in chars,
  133. skip_kg=PROCESS_OPTION_SKIP_KG in chars,
  134. chunking=chunking,
  135. )
  136. # ---------------------------------------------------------------------------
  137. # Per-chunker parameter snapshot (chunk_options) — counterpart to the
  138. # F/R/V/P selector in ``ProcessOptions``. ``process_options`` chooses
  139. # the strategy; ``chunk_options`` carries the parameters the chosen
  140. # strategy reads.
  141. #
  142. # Storage shape: the per-document snapshot persisted to
  143. # ``full_docs[doc_id]['chunk_options']`` carries ONLY the sub-dict of
  144. # the chunking strategy selected by ``process_options`` — the other
  145. # strategies' parameters are dropped because they are never consumed
  146. # during processing. Reparsing a document overwrites both
  147. # ``process_options`` and ``chunk_options`` together.
  148. # ---------------------------------------------------------------------------
  149. # Strategy selector (F/R/V/P) → snapshot sub-dict key. Single source
  150. # of truth for the slim ``chunk_options`` shape — used by
  151. # :func:`resolve_chunk_options` to pick which strategy block to keep
  152. # and by :func:`slim_chunk_options` to project caller-supplied dicts
  153. # down to the selected strategy.
  154. _CHUNK_STRATEGY_KEYS: dict[str, str] = {
  155. PROCESS_OPTION_CHUNK_FIXED: "fixed_token",
  156. PROCESS_OPTION_CHUNK_RECURSIVE: "recursive_character",
  157. PROCESS_OPTION_CHUNK_VECTOR: "semantic_vector",
  158. PROCESS_OPTION_CHUNK_PARAGRAH: "paragraph_semantic",
  159. }
  160. def chunk_strategy_key(process_options: Any) -> str:
  161. """Return the ``chunk_options`` sub-dict key for ``process_options``.
  162. Accepts a raw options string or a :class:`ProcessOptions` value.
  163. Falls back to ``"fixed_token"`` when no chunking selector is
  164. present — F is the default strategy used both by the file-chunker
  165. dispatcher (when ``chunking_explicit`` is False the legacy
  166. ``chunking_func`` runs, which defaults to fixed-token chunking
  167. that reads from the same sub-dict).
  168. """
  169. if isinstance(process_options, ProcessOptions):
  170. strategy = process_options.chunking
  171. else:
  172. strategy = parse_process_options(process_options).chunking
  173. return _CHUNK_STRATEGY_KEYS.get(strategy, "fixed_token")
  174. def slim_chunk_options(
  175. snapshot: Mapping[str, Any] | None,
  176. process_options: Any = "",
  177. ) -> dict[str, Any]:
  178. """Project a (possibly full) chunker snapshot down to the active strategy.
  179. Keeps the top-level ``chunk_token_size`` and the one strategy
  180. sub-dict picked by :func:`chunk_strategy_key`; everything else is
  181. discarded. Idempotent: a slim snapshot whose key already matches
  182. ``process_options`` passes through unchanged (deep-copied for
  183. isolation). When the matching strategy block is absent from the
  184. input, an empty dict is used so downstream consumers always see a
  185. dict-shaped slot.
  186. Strategy-specific default backfill: for ``paragraph_semantic`` we
  187. guarantee a populated ``chunk_token_size`` slot before returning
  188. (caller-supplied value > ``CHUNK_P_SIZE`` env >
  189. ``DEFAULT_CHUNK_P_SIZE``). This is the single chokepoint that
  190. every enqueue path runs through — both the
  191. ``resolve_chunk_options`` path (built from addon_params) AND the
  192. direct ``chunk_options=`` kwarg path (caller supplies the dict)
  193. flow through here, so the backfill cannot be bypassed by runtime
  194. addon_params mutation or by passing an explicit ``chunk_options``
  195. that omits the P slot. P must NOT inherit the top-level
  196. ``chunk_token_size`` (global ``CHUNK_SIZE`` / legacy ctor) —
  197. paragraph-semantic merging needs more headroom than the global
  198. default.
  199. """
  200. key = chunk_strategy_key(process_options)
  201. src: Mapping[str, Any] = snapshot or {}
  202. result: dict[str, Any] = {}
  203. if "chunk_token_size" in src:
  204. result["chunk_token_size"] = deepcopy(src["chunk_token_size"])
  205. result[key] = deepcopy(dict(src.get(key) or {}))
  206. if key == "paragraph_semantic" and "chunk_token_size" not in result[key]:
  207. p_size_raw = os.getenv("CHUNK_P_SIZE")
  208. result[key]["chunk_token_size"] = (
  209. int(p_size_raw) if p_size_raw is not None else DEFAULT_CHUNK_P_SIZE
  210. )
  211. return result
  212. def _env_optional_str(key: str) -> str | None:
  213. """Return the env value as a string, collapsing empty / 'None' to None."""
  214. raw = os.getenv(key)
  215. if raw is None:
  216. return None
  217. stripped = raw.strip()
  218. if not stripped or stripped.lower() == "none":
  219. return None
  220. return raw
  221. def _env_bool(key: str, default: bool = False) -> bool:
  222. raw = os.getenv(key)
  223. if raw is None:
  224. return default
  225. return raw.strip().lower() in ("1", "true", "yes", "on", "t", "y")
  226. def default_chunker_config() -> dict[str, Any]:
  227. """Snapshot the **strategy-specific** env-driven defaults for every shipped chunker.
  228. Builds a per-strategy sub-dict whose keys mirror each strategy's
  229. keyword-only signature (so :func:`resolve_chunk_options` can splat
  230. them straight into the chunker call).
  231. Provenance / precedence note: this function reads only
  232. *strategy-specific* env vars (``CHUNK_F_SIZE``,
  233. ``CHUNK_F_OVERLAP_SIZE``, ``CHUNK_R_SIZE``, ``CHUNK_R_OVERLAP_SIZE``,
  234. ``CHUNK_R_SEPARATORS``, ``CHUNK_V_SIZE``, ``CHUNK_V_*``,
  235. ``CHUNK_P_SIZE``, ``CHUNK_P_OVERLAP_SIZE``,
  236. ``CHUNK_F_SPLIT_BY_CHARACTER``…). It does **not** read the legacy
  237. top-level envs ``CHUNK_SIZE`` / ``CHUNK_OVERLAP_SIZE``, and it
  238. deliberately **omits** ``chunk_overlap_token_size`` from a strategy
  239. sub-dict when its own env var is unset — leaving the slot empty is
  240. the signal that lets
  241. :meth:`LightRAG._apply_chunk_size_overlay` apply the legacy
  242. constructor field (``LightRAG(chunk_overlap_token_size=…)``) and
  243. finally the legacy ``CHUNK_OVERLAP_SIZE`` env in that order. Same
  244. rationale for top-level ``chunk_token_size`` — overlay fills it from
  245. ``LightRAG(chunk_token_size=…)`` then ``CHUNK_SIZE`` env. Net
  246. precedence (high → low): ``addon_params`` explicit > strategy env
  247. > legacy ctor field > legacy env.
  248. Read at instance-creation time via
  249. :func:`lightrag.addon_params.default_addon_params`; users can mutate
  250. ``addon_params['chunker']`` at runtime to change the defaults applied
  251. to subsequently enqueued documents (already-enqueued docs hold a
  252. frozen ``full_docs[doc_id]['chunk_options']`` snapshot).
  253. """
  254. config: dict[str, Any] = {
  255. "fixed_token": {
  256. "split_by_character": _env_optional_str("CHUNK_F_SPLIT_BY_CHARACTER"),
  257. "split_by_character_only": _env_bool(
  258. "CHUNK_F_SPLIT_BY_CHARACTER_ONLY", False
  259. ),
  260. },
  261. "recursive_character": {
  262. # Default separators include CJK sentence-ending punctuation
  263. # so Chinese / mixed-language documents split at semantic
  264. # boundaries instead of falling through to character-level
  265. # splitting. See ``constants.DEFAULT_R_SEPARATORS`` for
  266. # cascade order rationale.
  267. "separators": json.loads(
  268. os.getenv("CHUNK_R_SEPARATORS", json.dumps(list(DEFAULT_R_SEPARATORS)))
  269. ),
  270. },
  271. "semantic_vector": {
  272. "breakpoint_threshold_type": os.getenv(
  273. "CHUNK_V_BREAKPOINT_THRESHOLD_TYPE", "percentile"
  274. ),
  275. "breakpoint_threshold_amount": parse_optional_float(
  276. os.getenv("CHUNK_V_BREAKPOINT_THRESHOLD_AMOUNT")
  277. ),
  278. "buffer_size": int(os.getenv("CHUNK_V_BUFFER_SIZE", "1")),
  279. # Default extends LangChain's English-only sentence splitter
  280. # with CJK terminators so SemanticChunker can actually find
  281. # sentence boundaries on Chinese input. Override per
  282. # deployment if you need a different language mix.
  283. "sentence_split_regex": os.getenv(
  284. "CHUNK_V_SENTENCE_SPLIT_REGEX", DEFAULT_SENTENCE_SPLIT_REGEX
  285. ),
  286. },
  287. "paragraph_semantic": {},
  288. }
  289. # Strategy-specific overlap envs only — leave the slot absent when
  290. # unset so overlay can detect provenance and fill from the legacy
  291. # tier (constructor field → CHUNK_OVERLAP_SIZE env).
  292. f_overlap_raw = os.getenv("CHUNK_F_OVERLAP_SIZE")
  293. if f_overlap_raw is not None:
  294. config["fixed_token"]["chunk_overlap_token_size"] = int(f_overlap_raw)
  295. r_overlap_raw = os.getenv("CHUNK_R_OVERLAP_SIZE")
  296. if r_overlap_raw is not None:
  297. config["recursive_character"]["chunk_overlap_token_size"] = int(r_overlap_raw)
  298. p_overlap_raw = os.getenv("CHUNK_P_OVERLAP_SIZE")
  299. if p_overlap_raw is not None:
  300. config["paragraph_semantic"]["chunk_overlap_token_size"] = int(p_overlap_raw)
  301. # P strategy carries its own ``chunk_token_size`` override so the
  302. # paragraph-semantic merge target can diverge from the global
  303. # ``CHUNK_SIZE`` (e.g. heading-aligned chunks may want a larger
  304. # ceiling). Unlike R/V, the slot is ALWAYS populated — when
  305. # ``CHUNK_P_SIZE`` is unset we use ``DEFAULT_CHUNK_P_SIZE`` (2000)
  306. # rather than letting the dispatcher fall back to the global
  307. # ``CHUNK_SIZE`` (1200): paragraph-semantic merging needs more
  308. # headroom than the global default to keep related paragraphs
  309. # together, and silently inheriting the smaller global ceiling
  310. # defeats the strategy's purpose.
  311. p_size_raw = os.getenv("CHUNK_P_SIZE")
  312. config["paragraph_semantic"]["chunk_token_size"] = (
  313. int(p_size_raw) if p_size_raw is not None else DEFAULT_CHUNK_P_SIZE
  314. )
  315. # F/R/V strategies likewise carry their own optional ``chunk_token_size``
  316. # overrides (fixed-token may want a deployment-specific window, recursive
  317. # character splitting a smaller target, semantic-vector clustering a larger
  318. # advisory ceiling). Same slot-absent convention as P: leave the slot
  319. # absent when the env is unset so the strategy inherits the top-level
  320. # ``chunk_token_size`` fallback at consumption time.
  321. f_size_raw = os.getenv("CHUNK_F_SIZE")
  322. if f_size_raw is not None:
  323. config["fixed_token"]["chunk_token_size"] = int(f_size_raw)
  324. r_size_raw = os.getenv("CHUNK_R_SIZE")
  325. if r_size_raw is not None:
  326. config["recursive_character"]["chunk_token_size"] = int(r_size_raw)
  327. v_size_raw = os.getenv("CHUNK_V_SIZE")
  328. if v_size_raw is not None:
  329. config["semantic_vector"]["chunk_token_size"] = int(v_size_raw)
  330. return config
  331. def resolve_chunk_options(
  332. addon_params: Mapping[str, Any] | None,
  333. *,
  334. process_options: Any = "",
  335. split_by_character: str | None = None,
  336. split_by_character_only: bool = False,
  337. ) -> dict[str, Any]:
  338. """Build a per-document slim ``chunk_options`` snapshot.
  339. Reads the chunker config from ``addon_params['chunker']``, falling
  340. back to a freshly built :func:`default_chunker_config` when the
  341. addon-params mapping is missing or hasn't been populated, then
  342. keeps only the parameters of the strategy selected by
  343. ``process_options`` (the other strategies' sub-dicts are dropped —
  344. they would never be consumed during processing). See
  345. :func:`slim_chunk_options` for the projection rules and
  346. :func:`chunk_strategy_key` for the strategy → sub-dict mapping
  347. (default F → ``fixed_token``).
  348. The F runtime args from ``LightRAG.ainsert`` overlay the
  349. ``fixed_token`` sub-dict when (and only when) the active strategy
  350. is F — for R/V/P these args have no slot to land in and are
  351. silently dropped:
  352. - ``split_by_character`` overrides the env when **non-None**.
  353. ``None`` (signature default) means "use the env / addon_params
  354. default".
  355. - ``split_by_character_only`` overrides the env when **True**.
  356. ``False`` (signature default) means "use the env / addon_params
  357. default" — there's no clean way to distinguish "unset" from
  358. "explicit False" with a positional default, so the env wins
  359. unless the caller actively opts in.
  360. The returned snapshot is an independent deep copy: mutating it has
  361. no effect on subsequent resolutions.
  362. """
  363. src: Mapping[str, Any] | None = None
  364. if isinstance(addon_params, Mapping):
  365. candidate = addon_params.get("chunker")
  366. if isinstance(candidate, Mapping):
  367. src = candidate
  368. if src is None:
  369. src = default_chunker_config()
  370. snapshot = slim_chunk_options(src, process_options)
  371. if chunk_strategy_key(process_options) == "fixed_token":
  372. fixed = snapshot["fixed_token"]
  373. if split_by_character is not None:
  374. fixed["split_by_character"] = split_by_character
  375. if split_by_character_only:
  376. fixed["split_by_character_only"] = True
  377. # P-strategy ``chunk_token_size`` backfill lives in
  378. # ``slim_chunk_options`` — that's the single chokepoint shared by
  379. # every enqueue path (this function AND the direct
  380. # ``chunk_options=`` kwarg path in ``_chunk_options_at``).
  381. return snapshot
  382. def split_engine_and_options(bracket_inner: str) -> tuple[str | None, str]:
  383. """Decompose a bracket-hint inner string into ``(engine, options)``.
  384. Format rules (see docs/FileProcessingPipeline-zh.md):
  385. - ``ENGINE-OPTIONS``: first ``-``-separated segment is the engine
  386. candidate; the remainder is the options string.
  387. - ``ENGINE``: matches a supported engine name as a whole.
  388. - ``-OPTIONS``: leading ``-`` marks an options-only hint.
  389. """
  390. inner = (bracket_inner or "").strip()
  391. if not inner:
  392. return None, ""
  393. if inner.startswith("-"):
  394. return None, inner[1:].strip()
  395. if "-" in inner:
  396. head, _, tail = inner.partition("-")
  397. engine_candidate = normalize_parser_engine(head)
  398. if engine_candidate in SUPPORTED_PARSER_ENGINES:
  399. return engine_candidate, tail.strip()
  400. return None, ""
  401. engine_candidate = normalize_parser_engine(inner)
  402. if engine_candidate in SUPPORTED_PARSER_ENGINES:
  403. return engine_candidate, ""
  404. return None, ""
  405. def parser_suffix(file_path: str | Path) -> str:
  406. return Path(file_path).suffix.lower().lstrip(".")
  407. def parser_engine_supports_suffix(engine: str, suffix: str) -> bool:
  408. return suffix.lower().lstrip(".") in PARSER_ENGINE_SUFFIX_CAPABILITIES.get(
  409. engine, frozenset()
  410. )
  411. def parser_engine_endpoint_configured(engine: str) -> bool:
  412. if engine == PARSER_ENGINE_MINERU:
  413. mode = os.getenv("MINERU_API_MODE", "local").strip().lower()
  414. if mode == "official":
  415. return bool(os.getenv("MINERU_API_TOKEN", "").strip())
  416. if mode == "local":
  417. return bool(os.getenv("MINERU_LOCAL_ENDPOINT", "").strip())
  418. return False
  419. endpoint_env = _PARSER_ENGINE_ENDPOINT_ENV.get(engine)
  420. if endpoint_env:
  421. return bool(os.getenv(endpoint_env, "").strip())
  422. return True
  423. def parser_engine_endpoint_requirement(engine: str) -> str | None:
  424. if engine == PARSER_ENGINE_MINERU:
  425. mode = os.getenv("MINERU_API_MODE", "local").strip().lower()
  426. if mode == "official":
  427. return "MINERU_API_TOKEN"
  428. if mode == "local":
  429. return "MINERU_LOCAL_ENDPOINT"
  430. allowed = ", ".join(sorted(_VALID_MINERU_API_MODES))
  431. return f"valid MINERU_API_MODE ({allowed})"
  432. return _PARSER_ENGINE_ENDPOINT_ENV.get(engine)
  433. def _engine_is_usable(
  434. engine: str,
  435. suffix: str,
  436. *,
  437. require_external_endpoint: bool,
  438. ) -> bool:
  439. if engine not in SUPPORTED_PARSER_ENGINES:
  440. return False
  441. if not parser_engine_supports_suffix(engine, suffix):
  442. return False
  443. if require_external_endpoint and not parser_engine_endpoint_configured(engine):
  444. return False
  445. return True
  446. def _filename_hint_match(
  447. file_path: str | Path,
  448. ) -> tuple[re.Match[str], str, str] | None:
  449. """Locate a supported ``[hint]`` segment in a basename.
  450. Returns ``(match, engine_or_empty, options)`` when the bracket inner is a
  451. recognised hint per the spec; otherwise ``None``. This low-level helper
  452. stays non-throwing because scan grouping and basename canonicalization need
  453. a best-effort classifier. Ingestion entrypoints must call
  454. :func:`resolve_file_parser_directives`, which validates malformed hints and
  455. raises instead of falling back.
  456. """
  457. basename = Path(file_path).name
  458. m = _PARSER_HINT_RE.search(basename)
  459. if not m:
  460. return None
  461. inner = m.group(1).strip()
  462. if inner.startswith("-") and not inner[1:].strip():
  463. return None
  464. if (
  465. "-" in inner
  466. and not inner.startswith("-")
  467. and not inner.partition("-")[2].strip()
  468. ):
  469. return None
  470. engine, options = split_engine_and_options(inner)
  471. if options:
  472. option_errors = validate_process_options(options)
  473. if option_errors:
  474. logger.warning(
  475. f"[parser_routing] ignoring filename hint {m.group(0)!r} in "
  476. f"{basename!r}: {'; '.join(option_errors)}"
  477. )
  478. return None
  479. if engine in SUPPORTED_PARSER_ENGINES:
  480. return m, engine, options
  481. if engine is None and options:
  482. return m, "", options
  483. return None
  484. def _validate_filename_hint_for_resolution(
  485. file_path: str | Path,
  486. *,
  487. require_external_endpoint: bool,
  488. ) -> None:
  489. """Fail fast for malformed filename hints on ingestion entrypoints."""
  490. basename = Path(file_path).name
  491. m = _PARSER_HINT_RE.search(basename)
  492. if not m:
  493. return
  494. inner = m.group(1)
  495. errors: list[str] = []
  496. if not inner.strip():
  497. errors.append(f"filename hint {m.group(0)!r} is empty")
  498. raise FilenameParserHintError(
  499. f"Invalid filename parser hint in {basename!r}: " + "; ".join(errors)
  500. )
  501. engine: str | None = None
  502. options = ""
  503. if inner.startswith("-"):
  504. options = inner[1:].strip()
  505. if not options:
  506. errors.append(f"filename hint {m.group(0)!r} has empty process options")
  507. else:
  508. errors.extend(
  509. validate_process_options(
  510. options,
  511. label=f"filename hint {m.group(0)!r} options",
  512. )
  513. )
  514. elif "-" in inner:
  515. engine_name, _, options = inner.partition("-")
  516. engine = normalize_parser_engine(engine_name)
  517. if engine not in SUPPORTED_PARSER_ENGINES:
  518. supported = ", ".join(sorted(SUPPORTED_PARSER_ENGINES))
  519. errors.append(
  520. f"filename hint {m.group(0)!r} uses unsupported parser engine "
  521. f"{engine_name.strip()!r}; supported engines: {supported}"
  522. )
  523. elif not options.strip():
  524. errors.append(f"filename hint {m.group(0)!r} has empty process options")
  525. else:
  526. errors.extend(
  527. validate_process_options(
  528. options,
  529. label=f"filename hint {m.group(0)!r} options",
  530. )
  531. )
  532. else:
  533. engine = normalize_parser_engine(inner)
  534. if engine not in SUPPORTED_PARSER_ENGINES:
  535. supported = ", ".join(sorted(SUPPORTED_PARSER_ENGINES))
  536. message = (
  537. f"filename hint {m.group(0)!r} uses unsupported parser engine "
  538. f"{inner.strip()!r}; supported engines: {supported}"
  539. )
  540. if all(ch in SUPPORTED_PROCESS_OPTIONS or ch == " " for ch in inner):
  541. message += (
  542. "; options-only filename hints must start with '-' "
  543. f"(use '[-{inner.strip()}]' instead)"
  544. )
  545. errors.append(message)
  546. if engine in SUPPORTED_PARSER_ENGINES:
  547. suffix = parser_suffix(file_path)
  548. if not parser_engine_supports_suffix(engine, suffix):
  549. supported_suffixes = ", ".join(
  550. sorted(PARSER_ENGINE_SUFFIX_CAPABILITIES.get(engine, frozenset()))
  551. )
  552. errors.append(
  553. f"filename hint {m.group(0)!r} uses parser engine {engine!r} "
  554. f"for unsupported suffix {suffix!r}; supported suffixes: "
  555. f"{supported_suffixes}"
  556. )
  557. endpoint_req = parser_engine_endpoint_requirement(engine)
  558. if (
  559. require_external_endpoint
  560. and endpoint_req
  561. and not parser_engine_endpoint_configured(engine)
  562. ):
  563. errors.append(
  564. f"filename hint {m.group(0)!r} requires {endpoint_req} "
  565. "to be configured"
  566. )
  567. if errors:
  568. raise FilenameParserHintError(
  569. f"Invalid filename parser hint in {basename!r}: " + "; ".join(errors)
  570. )
  571. def filename_parser_hint(file_path: str | Path) -> str | None:
  572. """Return the engine inferred from a filename hint, or ``None``."""
  573. found = _filename_hint_match(file_path)
  574. if not found:
  575. return None
  576. _, engine, _ = found
  577. return engine or None
  578. def filename_process_options(file_path: str | Path) -> str:
  579. """Return the raw process-options string from a filename hint."""
  580. found = _filename_hint_match(file_path)
  581. if not found:
  582. return ""
  583. return found[2]
  584. def filename_parser_directives(file_path: str | Path) -> tuple[str | None, str]:
  585. """Return ``(engine, options)`` decoded from a filename hint."""
  586. found = _filename_hint_match(file_path)
  587. if not found:
  588. return None, ""
  589. _, engine, options = found
  590. return (engine or None), options
  591. def canonicalize_parser_hinted_basename(file_path: str | Path) -> str:
  592. """Return basename with a supported parser hint removed.
  593. Only the final ``.[engine].ext`` (or ``.[engine-options].ext`` /
  594. ``.[-options].ext``) segment is stripped, exactly once, and only when the
  595. bracket content is a recognised hint. Nested hints such as
  596. ``name.[native].[mineru].pdf`` therefore become ``name.[native].pdf`` —
  597. additional outer hints are not unwrapped.
  598. """
  599. basename = Path(file_path).name
  600. found = _filename_hint_match(file_path)
  601. if not found:
  602. return basename
  603. m, _, _ = found
  604. return f"{basename[: m.start()]}{m.group(2)}"
  605. def parser_rules_from_env() -> str:
  606. return os.getenv("LIGHTRAG_PARSER", "").strip()
  607. def _iter_parser_rule_items(rules: str) -> list[tuple[int, str]]:
  608. return [
  609. (index, item.strip())
  610. for index, item in enumerate(_PARSER_RULE_SPLIT_RE.split(rules), start=1)
  611. if item.strip()
  612. ]
  613. def _rule_pattern_matches_engine_capability(pattern: str, engine: str) -> bool:
  614. supported_suffixes = PARSER_ENGINE_SUFFIX_CAPABILITIES.get(engine, frozenset())
  615. return any(fnmatch.fnmatch(suffix, pattern) for suffix in supported_suffixes)
  616. def _rule_engine_and_options(engine_hint: str) -> tuple[str, str]:
  617. """Split a ``LIGHTRAG_PARSER`` rule's RHS (``engine[-options]``).
  618. Returns ``(normalized_engine, options_str)``. Unlike the filename hint
  619. splitter this always treats the first ``-`` as the engine/options
  620. boundary, since ``LIGHTRAG_PARSER`` rules cannot be options-only.
  621. """
  622. head, _, tail = engine_hint.partition("-")
  623. return normalize_parser_engine(head), tail.strip()
  624. def validate_parser_routing_config(parser_rules: str | None = None) -> None:
  625. """Validate LIGHTRAG_PARSER syntax and required external parser endpoints."""
  626. rules = parser_rules_from_env() if parser_rules is None else parser_rules.strip()
  627. if not rules:
  628. return
  629. errors: list[str] = []
  630. for index, item in _iter_parser_rule_items(rules):
  631. label = f"rule {index} ({item!r})"
  632. if ":" not in item:
  633. errors.append(f"{label} must use '<suffix-pattern>:<engine>'")
  634. continue
  635. pattern, engine_hint = item.split(":", 1)
  636. pattern = pattern.strip().lower()
  637. engine_hint = engine_hint.strip()
  638. engine, options_str = _rule_engine_and_options(engine_hint)
  639. if not pattern:
  640. errors.append(f"{label} has an empty suffix pattern")
  641. continue
  642. if "." in pattern:
  643. errors.append(
  644. f"{label} matches suffixes without dots; use 'pdf', not '*.pdf'"
  645. )
  646. continue
  647. if not engine_hint:
  648. errors.append(f"{label} has an empty parser engine")
  649. continue
  650. if engine not in SUPPORTED_PARSER_ENGINES:
  651. supported = ", ".join(sorted(SUPPORTED_PARSER_ENGINES))
  652. errors.append(
  653. f"{label} uses unsupported parser engine {engine_hint!r}; "
  654. f"supported engines: {supported}"
  655. )
  656. continue
  657. if not _rule_pattern_matches_engine_capability(pattern, engine):
  658. supported_suffixes = ", ".join(
  659. sorted(PARSER_ENGINE_SUFFIX_CAPABILITIES.get(engine, frozenset()))
  660. )
  661. errors.append(
  662. f"{label} does not match any suffix supported by {engine}; "
  663. f"supported suffixes: {supported_suffixes}"
  664. )
  665. endpoint_req = parser_engine_endpoint_requirement(engine)
  666. if endpoint_req and not parser_engine_endpoint_configured(engine):
  667. errors.append(f"{label} requires {endpoint_req} to be configured")
  668. if options_str:
  669. errors.extend(
  670. f"{label}: {msg}"
  671. for msg in validate_process_options(
  672. options_str, label="process options"
  673. )
  674. )
  675. if errors:
  676. raise ParserRoutingConfigError(
  677. "Invalid LIGHTRAG_PARSER configuration: " + "; ".join(errors)
  678. )
  679. def _matching_rule_directives(
  680. file_path: str | Path,
  681. *,
  682. parser_rules: str | None,
  683. require_external_endpoint: bool,
  684. ) -> tuple[str | None, str]:
  685. """Find the first matching ``LIGHTRAG_PARSER`` rule for ``file_path``.
  686. Returns ``(engine, options_str)`` where ``engine`` is ``None`` when no
  687. usable rule is found. ``options_str`` is empty when a rule matched but
  688. has no ``-options`` suffix.
  689. """
  690. suffix = parser_suffix(file_path)
  691. rules = parser_rules_from_env() if parser_rules is None else parser_rules.strip()
  692. if not rules:
  693. return None, ""
  694. for _, item in _iter_parser_rule_items(rules):
  695. if ":" not in item:
  696. continue
  697. pattern, engine_hint = item.split(":", 1)
  698. pattern = pattern.strip().lower()
  699. engine, options_str = _rule_engine_and_options(engine_hint.strip())
  700. if not fnmatch.fnmatch(suffix, pattern):
  701. continue
  702. if _engine_is_usable(
  703. engine,
  704. suffix,
  705. require_external_endpoint=require_external_endpoint,
  706. ):
  707. return engine, options_str
  708. return None, ""
  709. def resolve_file_parser_engine(
  710. file_path: str | Path,
  711. *,
  712. parser_rules: str | None = None,
  713. require_external_endpoint: bool = True,
  714. ) -> str:
  715. """Resolve the extraction engine for a source file before content extraction."""
  716. engine, _ = resolve_file_parser_directives(
  717. file_path,
  718. parser_rules=parser_rules,
  719. require_external_endpoint=require_external_endpoint,
  720. )
  721. return engine
  722. def resolve_file_parser_directives(
  723. file_path: str | Path,
  724. *,
  725. parser_rules: str | None = None,
  726. require_external_endpoint: bool = True,
  727. ) -> tuple[str, str]:
  728. """Resolve ``(engine, process_options)`` for a source file before extraction.
  729. Resolution order (mirrors :func:`resolve_file_parser_engine`):
  730. 1. Filename ``[hint]`` — engine and / or options take precedence.
  731. 2. ``LIGHTRAG_PARSER`` rules — first matching rule provides defaults
  732. for whichever of engine / options the filename hint did not
  733. specify.
  734. 3. Default engine ``legacy`` with empty options.
  735. """
  736. suffix = parser_suffix(file_path)
  737. _validate_filename_hint_for_resolution(
  738. file_path,
  739. require_external_endpoint=require_external_endpoint,
  740. )
  741. hinted_engine, hinted_options = filename_parser_directives(file_path)
  742. if hinted_engine and not _engine_is_usable(
  743. hinted_engine, suffix, require_external_endpoint=require_external_endpoint
  744. ):
  745. # Hinted engine cannot handle this file (e.g. wrong suffix or missing
  746. # endpoint); fall back to rule-based resolution but keep the hinted
  747. # options if any.
  748. hinted_engine = None
  749. rule_engine, rule_options = _matching_rule_directives(
  750. file_path,
  751. parser_rules=parser_rules,
  752. require_external_endpoint=require_external_endpoint,
  753. )
  754. engine = hinted_engine or rule_engine or PARSER_ENGINE_LEGACY
  755. options_str = hinted_options or rule_options
  756. return engine, sanitize_process_options(options_str)
  757. def resolve_stored_document_parser_engine(
  758. file_path: str | Path,
  759. content_data: dict[str, Any] | None,
  760. ) -> str:
  761. """Resolve parser engine for a full_docs row during pipeline processing."""
  762. if content_data:
  763. doc_format = content_data.get("parse_format", FULL_DOCS_FORMAT_RAW)
  764. if doc_format == FULL_DOCS_FORMAT_LIGHTRAG and content_data.get(
  765. "sidecar_location"
  766. ):
  767. return PARSER_ENGINE_NATIVE
  768. if doc_format != FULL_DOCS_FORMAT_PENDING_PARSE:
  769. return PARSER_ENGINE_LEGACY
  770. suffix = parser_suffix(file_path)
  771. pending_engine = normalize_parser_engine(content_data.get("parse_engine"))
  772. if pending_engine in SUPPORTED_PARSER_ENGINES and parser_engine_supports_suffix(
  773. pending_engine, suffix
  774. ):
  775. return pending_engine
  776. return resolve_file_parser_engine(file_path)