| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901 |
- from __future__ import annotations
- import fnmatch
- import os
- import re
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any
- from lightrag.constants import (
- DEFAULT_CHUNK_P_SIZE,
- DEFAULT_R_SEPARATORS,
- DEFAULT_SENTENCE_SPLIT_REGEX,
- FULL_DOCS_FORMAT_LIGHTRAG,
- FULL_DOCS_FORMAT_PENDING_PARSE,
- FULL_DOCS_FORMAT_RAW,
- PARSER_ENGINE_DOCLING,
- PARSER_ENGINE_LEGACY,
- PARSER_ENGINE_MINERU,
- PARSER_ENGINE_NATIVE,
- PARSER_ENGINE_SUFFIX_CAPABILITIES,
- PROCESS_OPTION_CHUNK_CHARS,
- PROCESS_OPTION_CHUNK_FIXED,
- PROCESS_OPTION_CHUNK_VECTOR,
- PROCESS_OPTION_CHUNK_PARAGRAH,
- PROCESS_OPTION_CHUNK_RECURSIVE,
- PROCESS_OPTION_EQUATIONS,
- PROCESS_OPTION_IMAGES,
- PROCESS_OPTION_SKIP_KG,
- PROCESS_OPTION_TABLES,
- ProcessChunkingOption,
- SUPPORTED_PARSER_ENGINES,
- SUPPORTED_PROCESS_OPTIONS,
- )
- from lightrag.utils import logger, parse_optional_float
- import json
- from collections.abc import Mapping
- from copy import deepcopy
- _PARSER_RULE_SPLIT_RE = re.compile(r"[;,]")
- _PARSER_ENGINE_ENDPOINT_ENV = {
- PARSER_ENGINE_DOCLING: "DOCLING_ENDPOINT",
- }
- _VALID_MINERU_API_MODES = {"official", "local"}
- # Trailing parser-hint pattern: matches ``.[engine].ext`` at end of basename.
- # Group 1 captures the raw engine token (still needs normalize_parser_engine
- # and SUPPORTED_PARSER_ENGINES validation); group 2 captures ``.ext`` so it
- # can be reattached when stripping the hint.
- _PARSER_HINT_RE = re.compile(r"\.\[([^\]]*)\](\.[^.]+)$")
- class ParserRoutingConfigError(ValueError):
- """Raised when LIGHTRAG_PARSER contains an invalid routing rule."""
- class FilenameParserHintError(ValueError):
- """Raised when a filename parser hint is invalid for ingestion."""
- def normalize_parser_engine(engine: Any) -> str:
- """Normalize engine hints such as mineru-iet to mineru."""
- return str(engine or "").strip().split("-", 1)[0].lower()
- # ---------------------------------------------------------------------------
- # Per-file processing options (i/t/e/!/F/R/V/P)
- # ---------------------------------------------------------------------------
- @dataclass(frozen=True)
- class ProcessOptions:
- """Decoded view of a ``process_options`` string.
- The ``raw`` string is preserved verbatim (with duplicates and ordering)
- for storage / audit purposes; boolean flags reflect the deduped logical
- state used by the pipeline.
- """
- raw: str = ""
- images: bool = False
- tables: bool = False
- equations: bool = False
- skip_kg: bool = False
- chunking: ProcessChunkingOption = PROCESS_OPTION_CHUNK_FIXED
- @property
- def chunking_explicit(self) -> bool:
- """True iff ``raw`` actually contains a chunking selector char.
- Distinguishes "user explicitly opted into a chunking strategy"
- from "no chunking selector supplied — pipeline used the default".
- ``chunking`` itself is unreliable for this question because it
- falls back to :data:`PROCESS_OPTION_CHUNK_FIXED` in both cases.
- Used by ``process_single_document`` to decide whether to
- dispatch via the new file-chunker contract or to honor the
- legacy externally-supplied :attr:`LightRAG.chunking_func`.
- """
- return any(c in PROCESS_OPTION_CHUNK_CHARS for c in self.raw)
- _PROCESS_OPTION_DEFAULT = ProcessOptions()
- def sanitize_process_options(options: Any) -> str:
- """Strip non-supported characters / hyphen / whitespace from an options string.
- Returns the raw token sequence as-is (no dedup, no reorder) so the
- canonical user intent is preserved on disk. Invalid characters are
- silently dropped — the caller is expected to have already validated.
- """
- if not options:
- return ""
- return "".join(ch for ch in str(options) if ch in SUPPORTED_PROCESS_OPTIONS)
- def validate_process_options(
- options: str, *, label: str = "process options"
- ) -> list[str]:
- """Return a list of error messages for an options string; empty if valid."""
- errors: list[str] = []
- if not options:
- return errors
- seen_chunkers: list[str] = []
- for ch in options:
- if ch in (" ", "-"):
- continue
- if ch not in SUPPORTED_PROCESS_OPTIONS:
- errors.append(f"{label} contains unsupported character {ch!r}")
- continue
- if ch in PROCESS_OPTION_CHUNK_CHARS and ch not in seen_chunkers:
- seen_chunkers.append(ch)
- if len(seen_chunkers) > 1:
- errors.append(
- f"{label} specifies multiple chunking modes "
- f"({'/'.join(seen_chunkers)}); pick one of "
- f"{PROCESS_OPTION_CHUNK_FIXED}/{PROCESS_OPTION_CHUNK_RECURSIVE}/{PROCESS_OPTION_CHUNK_VECTOR}/{PROCESS_OPTION_CHUNK_PARAGRAH}"
- )
- return errors
- def parse_process_options(options: Any) -> ProcessOptions:
- """Decode a process-options string into a :class:`ProcessOptions` view."""
- raw = sanitize_process_options(options)
- if not raw:
- return _PROCESS_OPTION_DEFAULT
- chars = set(raw)
- chunking: ProcessChunkingOption = PROCESS_OPTION_CHUNK_FIXED
- # Pick the first chunking selector encountered; validate_process_options
- # already filters duplicates upstream.
- for ch in raw:
- if ch in PROCESS_OPTION_CHUNK_CHARS:
- chunking = ch # type: ignore[assignment]
- break
- return ProcessOptions(
- raw=raw,
- images=PROCESS_OPTION_IMAGES in chars,
- tables=PROCESS_OPTION_TABLES in chars,
- equations=PROCESS_OPTION_EQUATIONS in chars,
- skip_kg=PROCESS_OPTION_SKIP_KG in chars,
- chunking=chunking,
- )
- # ---------------------------------------------------------------------------
- # Per-chunker parameter snapshot (chunk_options) — counterpart to the
- # F/R/V/P selector in ``ProcessOptions``. ``process_options`` chooses
- # the strategy; ``chunk_options`` carries the parameters the chosen
- # strategy reads.
- #
- # Storage shape: the per-document snapshot persisted to
- # ``full_docs[doc_id]['chunk_options']`` carries ONLY the sub-dict of
- # the chunking strategy selected by ``process_options`` — the other
- # strategies' parameters are dropped because they are never consumed
- # during processing. Reparsing a document overwrites both
- # ``process_options`` and ``chunk_options`` together.
- # ---------------------------------------------------------------------------
- # Strategy selector (F/R/V/P) → snapshot sub-dict key. Single source
- # of truth for the slim ``chunk_options`` shape — used by
- # :func:`resolve_chunk_options` to pick which strategy block to keep
- # and by :func:`slim_chunk_options` to project caller-supplied dicts
- # down to the selected strategy.
- _CHUNK_STRATEGY_KEYS: dict[str, str] = {
- PROCESS_OPTION_CHUNK_FIXED: "fixed_token",
- PROCESS_OPTION_CHUNK_RECURSIVE: "recursive_character",
- PROCESS_OPTION_CHUNK_VECTOR: "semantic_vector",
- PROCESS_OPTION_CHUNK_PARAGRAH: "paragraph_semantic",
- }
- def chunk_strategy_key(process_options: Any) -> str:
- """Return the ``chunk_options`` sub-dict key for ``process_options``.
- Accepts a raw options string or a :class:`ProcessOptions` value.
- Falls back to ``"fixed_token"`` when no chunking selector is
- present — F is the default strategy used both by the file-chunker
- dispatcher (when ``chunking_explicit`` is False the legacy
- ``chunking_func`` runs, which defaults to fixed-token chunking
- that reads from the same sub-dict).
- """
- if isinstance(process_options, ProcessOptions):
- strategy = process_options.chunking
- else:
- strategy = parse_process_options(process_options).chunking
- return _CHUNK_STRATEGY_KEYS.get(strategy, "fixed_token")
- def slim_chunk_options(
- snapshot: Mapping[str, Any] | None,
- process_options: Any = "",
- ) -> dict[str, Any]:
- """Project a (possibly full) chunker snapshot down to the active strategy.
- Keeps the top-level ``chunk_token_size`` and the one strategy
- sub-dict picked by :func:`chunk_strategy_key`; everything else is
- discarded. Idempotent: a slim snapshot whose key already matches
- ``process_options`` passes through unchanged (deep-copied for
- isolation). When the matching strategy block is absent from the
- input, an empty dict is used so downstream consumers always see a
- dict-shaped slot.
- Strategy-specific default backfill: for ``paragraph_semantic`` we
- guarantee a populated ``chunk_token_size`` slot before returning
- (caller-supplied value > ``CHUNK_P_SIZE`` env >
- ``DEFAULT_CHUNK_P_SIZE``). This is the single chokepoint that
- every enqueue path runs through — both the
- ``resolve_chunk_options`` path (built from addon_params) AND the
- direct ``chunk_options=`` kwarg path (caller supplies the dict)
- flow through here, so the backfill cannot be bypassed by runtime
- addon_params mutation or by passing an explicit ``chunk_options``
- that omits the P slot. P must NOT inherit the top-level
- ``chunk_token_size`` (global ``CHUNK_SIZE`` / legacy ctor) —
- paragraph-semantic merging needs more headroom than the global
- default.
- """
- key = chunk_strategy_key(process_options)
- src: Mapping[str, Any] = snapshot or {}
- result: dict[str, Any] = {}
- if "chunk_token_size" in src:
- result["chunk_token_size"] = deepcopy(src["chunk_token_size"])
- result[key] = deepcopy(dict(src.get(key) or {}))
- if key == "paragraph_semantic" and "chunk_token_size" not in result[key]:
- p_size_raw = os.getenv("CHUNK_P_SIZE")
- result[key]["chunk_token_size"] = (
- int(p_size_raw) if p_size_raw is not None else DEFAULT_CHUNK_P_SIZE
- )
- return result
- def _env_optional_str(key: str) -> str | None:
- """Return the env value as a string, collapsing empty / 'None' to None."""
- raw = os.getenv(key)
- if raw is None:
- return None
- stripped = raw.strip()
- if not stripped or stripped.lower() == "none":
- return None
- return raw
- def _env_bool(key: str, default: bool = False) -> bool:
- raw = os.getenv(key)
- if raw is None:
- return default
- return raw.strip().lower() in ("1", "true", "yes", "on", "t", "y")
- def default_chunker_config() -> dict[str, Any]:
- """Snapshot the **strategy-specific** env-driven defaults for every shipped chunker.
- Builds a per-strategy sub-dict whose keys mirror each strategy's
- keyword-only signature (so :func:`resolve_chunk_options` can splat
- them straight into the chunker call).
- Provenance / precedence note: this function reads only
- *strategy-specific* env vars (``CHUNK_F_SIZE``,
- ``CHUNK_F_OVERLAP_SIZE``, ``CHUNK_R_SIZE``, ``CHUNK_R_OVERLAP_SIZE``,
- ``CHUNK_R_SEPARATORS``, ``CHUNK_V_SIZE``, ``CHUNK_V_*``,
- ``CHUNK_P_SIZE``, ``CHUNK_P_OVERLAP_SIZE``,
- ``CHUNK_F_SPLIT_BY_CHARACTER``…). It does **not** read the legacy
- top-level envs ``CHUNK_SIZE`` / ``CHUNK_OVERLAP_SIZE``, and it
- deliberately **omits** ``chunk_overlap_token_size`` from a strategy
- sub-dict when its own env var is unset — leaving the slot empty is
- the signal that lets
- :meth:`LightRAG._apply_chunk_size_overlay` apply the legacy
- constructor field (``LightRAG(chunk_overlap_token_size=…)``) and
- finally the legacy ``CHUNK_OVERLAP_SIZE`` env in that order. Same
- rationale for top-level ``chunk_token_size`` — overlay fills it from
- ``LightRAG(chunk_token_size=…)`` then ``CHUNK_SIZE`` env. Net
- precedence (high → low): ``addon_params`` explicit > strategy env
- > legacy ctor field > legacy env.
- Read at instance-creation time via
- :func:`lightrag.addon_params.default_addon_params`; users can mutate
- ``addon_params['chunker']`` at runtime to change the defaults applied
- to subsequently enqueued documents (already-enqueued docs hold a
- frozen ``full_docs[doc_id]['chunk_options']`` snapshot).
- """
- config: dict[str, Any] = {
- "fixed_token": {
- "split_by_character": _env_optional_str("CHUNK_F_SPLIT_BY_CHARACTER"),
- "split_by_character_only": _env_bool(
- "CHUNK_F_SPLIT_BY_CHARACTER_ONLY", False
- ),
- },
- "recursive_character": {
- # Default separators include CJK sentence-ending punctuation
- # so Chinese / mixed-language documents split at semantic
- # boundaries instead of falling through to character-level
- # splitting. See ``constants.DEFAULT_R_SEPARATORS`` for
- # cascade order rationale.
- "separators": json.loads(
- os.getenv("CHUNK_R_SEPARATORS", json.dumps(list(DEFAULT_R_SEPARATORS)))
- ),
- },
- "semantic_vector": {
- "breakpoint_threshold_type": os.getenv(
- "CHUNK_V_BREAKPOINT_THRESHOLD_TYPE", "percentile"
- ),
- "breakpoint_threshold_amount": parse_optional_float(
- os.getenv("CHUNK_V_BREAKPOINT_THRESHOLD_AMOUNT")
- ),
- "buffer_size": int(os.getenv("CHUNK_V_BUFFER_SIZE", "1")),
- # Default extends LangChain's English-only sentence splitter
- # with CJK terminators so SemanticChunker can actually find
- # sentence boundaries on Chinese input. Override per
- # deployment if you need a different language mix.
- "sentence_split_regex": os.getenv(
- "CHUNK_V_SENTENCE_SPLIT_REGEX", DEFAULT_SENTENCE_SPLIT_REGEX
- ),
- },
- "paragraph_semantic": {},
- }
- # Strategy-specific overlap envs only — leave the slot absent when
- # unset so overlay can detect provenance and fill from the legacy
- # tier (constructor field → CHUNK_OVERLAP_SIZE env).
- f_overlap_raw = os.getenv("CHUNK_F_OVERLAP_SIZE")
- if f_overlap_raw is not None:
- config["fixed_token"]["chunk_overlap_token_size"] = int(f_overlap_raw)
- r_overlap_raw = os.getenv("CHUNK_R_OVERLAP_SIZE")
- if r_overlap_raw is not None:
- config["recursive_character"]["chunk_overlap_token_size"] = int(r_overlap_raw)
- p_overlap_raw = os.getenv("CHUNK_P_OVERLAP_SIZE")
- if p_overlap_raw is not None:
- config["paragraph_semantic"]["chunk_overlap_token_size"] = int(p_overlap_raw)
- # P strategy carries its own ``chunk_token_size`` override so the
- # paragraph-semantic merge target can diverge from the global
- # ``CHUNK_SIZE`` (e.g. heading-aligned chunks may want a larger
- # ceiling). Unlike R/V, the slot is ALWAYS populated — when
- # ``CHUNK_P_SIZE`` is unset we use ``DEFAULT_CHUNK_P_SIZE`` (2000)
- # rather than letting the dispatcher fall back to the global
- # ``CHUNK_SIZE`` (1200): paragraph-semantic merging needs more
- # headroom than the global default to keep related paragraphs
- # together, and silently inheriting the smaller global ceiling
- # defeats the strategy's purpose.
- p_size_raw = os.getenv("CHUNK_P_SIZE")
- config["paragraph_semantic"]["chunk_token_size"] = (
- int(p_size_raw) if p_size_raw is not None else DEFAULT_CHUNK_P_SIZE
- )
- # F/R/V strategies likewise carry their own optional ``chunk_token_size``
- # overrides (fixed-token may want a deployment-specific window, recursive
- # character splitting a smaller target, semantic-vector clustering a larger
- # advisory ceiling). Same slot-absent convention as P: leave the slot
- # absent when the env is unset so the strategy inherits the top-level
- # ``chunk_token_size`` fallback at consumption time.
- f_size_raw = os.getenv("CHUNK_F_SIZE")
- if f_size_raw is not None:
- config["fixed_token"]["chunk_token_size"] = int(f_size_raw)
- r_size_raw = os.getenv("CHUNK_R_SIZE")
- if r_size_raw is not None:
- config["recursive_character"]["chunk_token_size"] = int(r_size_raw)
- v_size_raw = os.getenv("CHUNK_V_SIZE")
- if v_size_raw is not None:
- config["semantic_vector"]["chunk_token_size"] = int(v_size_raw)
- return config
- def resolve_chunk_options(
- addon_params: Mapping[str, Any] | None,
- *,
- process_options: Any = "",
- split_by_character: str | None = None,
- split_by_character_only: bool = False,
- ) -> dict[str, Any]:
- """Build a per-document slim ``chunk_options`` snapshot.
- Reads the chunker config from ``addon_params['chunker']``, falling
- back to a freshly built :func:`default_chunker_config` when the
- addon-params mapping is missing or hasn't been populated, then
- keeps only the parameters of the strategy selected by
- ``process_options`` (the other strategies' sub-dicts are dropped —
- they would never be consumed during processing). See
- :func:`slim_chunk_options` for the projection rules and
- :func:`chunk_strategy_key` for the strategy → sub-dict mapping
- (default F → ``fixed_token``).
- The F runtime args from ``LightRAG.ainsert`` overlay the
- ``fixed_token`` sub-dict when (and only when) the active strategy
- is F — for R/V/P these args have no slot to land in and are
- silently dropped:
- - ``split_by_character`` overrides the env when **non-None**.
- ``None`` (signature default) means "use the env / addon_params
- default".
- - ``split_by_character_only`` overrides the env when **True**.
- ``False`` (signature default) means "use the env / addon_params
- default" — there's no clean way to distinguish "unset" from
- "explicit False" with a positional default, so the env wins
- unless the caller actively opts in.
- The returned snapshot is an independent deep copy: mutating it has
- no effect on subsequent resolutions.
- """
- src: Mapping[str, Any] | None = None
- if isinstance(addon_params, Mapping):
- candidate = addon_params.get("chunker")
- if isinstance(candidate, Mapping):
- src = candidate
- if src is None:
- src = default_chunker_config()
- snapshot = slim_chunk_options(src, process_options)
- if chunk_strategy_key(process_options) == "fixed_token":
- fixed = snapshot["fixed_token"]
- if split_by_character is not None:
- fixed["split_by_character"] = split_by_character
- if split_by_character_only:
- fixed["split_by_character_only"] = True
- # P-strategy ``chunk_token_size`` backfill lives in
- # ``slim_chunk_options`` — that's the single chokepoint shared by
- # every enqueue path (this function AND the direct
- # ``chunk_options=`` kwarg path in ``_chunk_options_at``).
- return snapshot
- def split_engine_and_options(bracket_inner: str) -> tuple[str | None, str]:
- """Decompose a bracket-hint inner string into ``(engine, options)``.
- Format rules (see docs/FileProcessingPipeline-zh.md):
- - ``ENGINE-OPTIONS``: first ``-``-separated segment is the engine
- candidate; the remainder is the options string.
- - ``ENGINE``: matches a supported engine name as a whole.
- - ``-OPTIONS``: leading ``-`` marks an options-only hint.
- """
- inner = (bracket_inner or "").strip()
- if not inner:
- return None, ""
- if inner.startswith("-"):
- return None, inner[1:].strip()
- if "-" in inner:
- head, _, tail = inner.partition("-")
- engine_candidate = normalize_parser_engine(head)
- if engine_candidate in SUPPORTED_PARSER_ENGINES:
- return engine_candidate, tail.strip()
- return None, ""
- engine_candidate = normalize_parser_engine(inner)
- if engine_candidate in SUPPORTED_PARSER_ENGINES:
- return engine_candidate, ""
- return None, ""
- def parser_suffix(file_path: str | Path) -> str:
- return Path(file_path).suffix.lower().lstrip(".")
- def parser_engine_supports_suffix(engine: str, suffix: str) -> bool:
- return suffix.lower().lstrip(".") in PARSER_ENGINE_SUFFIX_CAPABILITIES.get(
- engine, frozenset()
- )
- def parser_engine_endpoint_configured(engine: str) -> bool:
- if engine == PARSER_ENGINE_MINERU:
- mode = os.getenv("MINERU_API_MODE", "local").strip().lower()
- if mode == "official":
- return bool(os.getenv("MINERU_API_TOKEN", "").strip())
- if mode == "local":
- return bool(os.getenv("MINERU_LOCAL_ENDPOINT", "").strip())
- return False
- endpoint_env = _PARSER_ENGINE_ENDPOINT_ENV.get(engine)
- if endpoint_env:
- return bool(os.getenv(endpoint_env, "").strip())
- return True
- def parser_engine_endpoint_requirement(engine: str) -> str | None:
- if engine == PARSER_ENGINE_MINERU:
- mode = os.getenv("MINERU_API_MODE", "local").strip().lower()
- if mode == "official":
- return "MINERU_API_TOKEN"
- if mode == "local":
- return "MINERU_LOCAL_ENDPOINT"
- allowed = ", ".join(sorted(_VALID_MINERU_API_MODES))
- return f"valid MINERU_API_MODE ({allowed})"
- return _PARSER_ENGINE_ENDPOINT_ENV.get(engine)
- def _engine_is_usable(
- engine: str,
- suffix: str,
- *,
- require_external_endpoint: bool,
- ) -> bool:
- if engine not in SUPPORTED_PARSER_ENGINES:
- return False
- if not parser_engine_supports_suffix(engine, suffix):
- return False
- if require_external_endpoint and not parser_engine_endpoint_configured(engine):
- return False
- return True
- def _filename_hint_match(
- file_path: str | Path,
- ) -> tuple[re.Match[str], str, str] | None:
- """Locate a supported ``[hint]`` segment in a basename.
- Returns ``(match, engine_or_empty, options)`` when the bracket inner is a
- recognised hint per the spec; otherwise ``None``. This low-level helper
- stays non-throwing because scan grouping and basename canonicalization need
- a best-effort classifier. Ingestion entrypoints must call
- :func:`resolve_file_parser_directives`, which validates malformed hints and
- raises instead of falling back.
- """
- basename = Path(file_path).name
- m = _PARSER_HINT_RE.search(basename)
- if not m:
- return None
- inner = m.group(1).strip()
- if inner.startswith("-") and not inner[1:].strip():
- return None
- if (
- "-" in inner
- and not inner.startswith("-")
- and not inner.partition("-")[2].strip()
- ):
- return None
- engine, options = split_engine_and_options(inner)
- if options:
- option_errors = validate_process_options(options)
- if option_errors:
- logger.warning(
- f"[parser_routing] ignoring filename hint {m.group(0)!r} in "
- f"{basename!r}: {'; '.join(option_errors)}"
- )
- return None
- if engine in SUPPORTED_PARSER_ENGINES:
- return m, engine, options
- if engine is None and options:
- return m, "", options
- return None
- def _validate_filename_hint_for_resolution(
- file_path: str | Path,
- *,
- require_external_endpoint: bool,
- ) -> None:
- """Fail fast for malformed filename hints on ingestion entrypoints."""
- basename = Path(file_path).name
- m = _PARSER_HINT_RE.search(basename)
- if not m:
- return
- inner = m.group(1)
- errors: list[str] = []
- if not inner.strip():
- errors.append(f"filename hint {m.group(0)!r} is empty")
- raise FilenameParserHintError(
- f"Invalid filename parser hint in {basename!r}: " + "; ".join(errors)
- )
- engine: str | None = None
- options = ""
- if inner.startswith("-"):
- options = inner[1:].strip()
- if not options:
- errors.append(f"filename hint {m.group(0)!r} has empty process options")
- else:
- errors.extend(
- validate_process_options(
- options,
- label=f"filename hint {m.group(0)!r} options",
- )
- )
- elif "-" in inner:
- engine_name, _, options = inner.partition("-")
- engine = normalize_parser_engine(engine_name)
- if engine not in SUPPORTED_PARSER_ENGINES:
- supported = ", ".join(sorted(SUPPORTED_PARSER_ENGINES))
- errors.append(
- f"filename hint {m.group(0)!r} uses unsupported parser engine "
- f"{engine_name.strip()!r}; supported engines: {supported}"
- )
- elif not options.strip():
- errors.append(f"filename hint {m.group(0)!r} has empty process options")
- else:
- errors.extend(
- validate_process_options(
- options,
- label=f"filename hint {m.group(0)!r} options",
- )
- )
- else:
- engine = normalize_parser_engine(inner)
- if engine not in SUPPORTED_PARSER_ENGINES:
- supported = ", ".join(sorted(SUPPORTED_PARSER_ENGINES))
- message = (
- f"filename hint {m.group(0)!r} uses unsupported parser engine "
- f"{inner.strip()!r}; supported engines: {supported}"
- )
- if all(ch in SUPPORTED_PROCESS_OPTIONS or ch == " " for ch in inner):
- message += (
- "; options-only filename hints must start with '-' "
- f"(use '[-{inner.strip()}]' instead)"
- )
- errors.append(message)
- if engine in SUPPORTED_PARSER_ENGINES:
- suffix = parser_suffix(file_path)
- if not parser_engine_supports_suffix(engine, suffix):
- supported_suffixes = ", ".join(
- sorted(PARSER_ENGINE_SUFFIX_CAPABILITIES.get(engine, frozenset()))
- )
- errors.append(
- f"filename hint {m.group(0)!r} uses parser engine {engine!r} "
- f"for unsupported suffix {suffix!r}; supported suffixes: "
- f"{supported_suffixes}"
- )
- endpoint_req = parser_engine_endpoint_requirement(engine)
- if (
- require_external_endpoint
- and endpoint_req
- and not parser_engine_endpoint_configured(engine)
- ):
- errors.append(
- f"filename hint {m.group(0)!r} requires {endpoint_req} "
- "to be configured"
- )
- if errors:
- raise FilenameParserHintError(
- f"Invalid filename parser hint in {basename!r}: " + "; ".join(errors)
- )
- def filename_parser_hint(file_path: str | Path) -> str | None:
- """Return the engine inferred from a filename hint, or ``None``."""
- found = _filename_hint_match(file_path)
- if not found:
- return None
- _, engine, _ = found
- return engine or None
- def filename_process_options(file_path: str | Path) -> str:
- """Return the raw process-options string from a filename hint."""
- found = _filename_hint_match(file_path)
- if not found:
- return ""
- return found[2]
- def filename_parser_directives(file_path: str | Path) -> tuple[str | None, str]:
- """Return ``(engine, options)`` decoded from a filename hint."""
- found = _filename_hint_match(file_path)
- if not found:
- return None, ""
- _, engine, options = found
- return (engine or None), options
- def canonicalize_parser_hinted_basename(file_path: str | Path) -> str:
- """Return basename with a supported parser hint removed.
- Only the final ``.[engine].ext`` (or ``.[engine-options].ext`` /
- ``.[-options].ext``) segment is stripped, exactly once, and only when the
- bracket content is a recognised hint. Nested hints such as
- ``name.[native].[mineru].pdf`` therefore become ``name.[native].pdf`` —
- additional outer hints are not unwrapped.
- """
- basename = Path(file_path).name
- found = _filename_hint_match(file_path)
- if not found:
- return basename
- m, _, _ = found
- return f"{basename[: m.start()]}{m.group(2)}"
- def parser_rules_from_env() -> str:
- return os.getenv("LIGHTRAG_PARSER", "").strip()
- def _iter_parser_rule_items(rules: str) -> list[tuple[int, str]]:
- return [
- (index, item.strip())
- for index, item in enumerate(_PARSER_RULE_SPLIT_RE.split(rules), start=1)
- if item.strip()
- ]
- def _rule_pattern_matches_engine_capability(pattern: str, engine: str) -> bool:
- supported_suffixes = PARSER_ENGINE_SUFFIX_CAPABILITIES.get(engine, frozenset())
- return any(fnmatch.fnmatch(suffix, pattern) for suffix in supported_suffixes)
- def _rule_engine_and_options(engine_hint: str) -> tuple[str, str]:
- """Split a ``LIGHTRAG_PARSER`` rule's RHS (``engine[-options]``).
- Returns ``(normalized_engine, options_str)``. Unlike the filename hint
- splitter this always treats the first ``-`` as the engine/options
- boundary, since ``LIGHTRAG_PARSER`` rules cannot be options-only.
- """
- head, _, tail = engine_hint.partition("-")
- return normalize_parser_engine(head), tail.strip()
- def validate_parser_routing_config(parser_rules: str | None = None) -> None:
- """Validate LIGHTRAG_PARSER syntax and required external parser endpoints."""
- rules = parser_rules_from_env() if parser_rules is None else parser_rules.strip()
- if not rules:
- return
- errors: list[str] = []
- for index, item in _iter_parser_rule_items(rules):
- label = f"rule {index} ({item!r})"
- if ":" not in item:
- errors.append(f"{label} must use '<suffix-pattern>:<engine>'")
- continue
- pattern, engine_hint = item.split(":", 1)
- pattern = pattern.strip().lower()
- engine_hint = engine_hint.strip()
- engine, options_str = _rule_engine_and_options(engine_hint)
- if not pattern:
- errors.append(f"{label} has an empty suffix pattern")
- continue
- if "." in pattern:
- errors.append(
- f"{label} matches suffixes without dots; use 'pdf', not '*.pdf'"
- )
- continue
- if not engine_hint:
- errors.append(f"{label} has an empty parser engine")
- continue
- if engine not in SUPPORTED_PARSER_ENGINES:
- supported = ", ".join(sorted(SUPPORTED_PARSER_ENGINES))
- errors.append(
- f"{label} uses unsupported parser engine {engine_hint!r}; "
- f"supported engines: {supported}"
- )
- continue
- if not _rule_pattern_matches_engine_capability(pattern, engine):
- supported_suffixes = ", ".join(
- sorted(PARSER_ENGINE_SUFFIX_CAPABILITIES.get(engine, frozenset()))
- )
- errors.append(
- f"{label} does not match any suffix supported by {engine}; "
- f"supported suffixes: {supported_suffixes}"
- )
- endpoint_req = parser_engine_endpoint_requirement(engine)
- if endpoint_req and not parser_engine_endpoint_configured(engine):
- errors.append(f"{label} requires {endpoint_req} to be configured")
- if options_str:
- errors.extend(
- f"{label}: {msg}"
- for msg in validate_process_options(
- options_str, label="process options"
- )
- )
- if errors:
- raise ParserRoutingConfigError(
- "Invalid LIGHTRAG_PARSER configuration: " + "; ".join(errors)
- )
- def _matching_rule_directives(
- file_path: str | Path,
- *,
- parser_rules: str | None,
- require_external_endpoint: bool,
- ) -> tuple[str | None, str]:
- """Find the first matching ``LIGHTRAG_PARSER`` rule for ``file_path``.
- Returns ``(engine, options_str)`` where ``engine`` is ``None`` when no
- usable rule is found. ``options_str`` is empty when a rule matched but
- has no ``-options`` suffix.
- """
- suffix = parser_suffix(file_path)
- rules = parser_rules_from_env() if parser_rules is None else parser_rules.strip()
- if not rules:
- return None, ""
- for _, item in _iter_parser_rule_items(rules):
- if ":" not in item:
- continue
- pattern, engine_hint = item.split(":", 1)
- pattern = pattern.strip().lower()
- engine, options_str = _rule_engine_and_options(engine_hint.strip())
- if not fnmatch.fnmatch(suffix, pattern):
- continue
- if _engine_is_usable(
- engine,
- suffix,
- require_external_endpoint=require_external_endpoint,
- ):
- return engine, options_str
- return None, ""
- def resolve_file_parser_engine(
- file_path: str | Path,
- *,
- parser_rules: str | None = None,
- require_external_endpoint: bool = True,
- ) -> str:
- """Resolve the extraction engine for a source file before content extraction."""
- engine, _ = resolve_file_parser_directives(
- file_path,
- parser_rules=parser_rules,
- require_external_endpoint=require_external_endpoint,
- )
- return engine
- def resolve_file_parser_directives(
- file_path: str | Path,
- *,
- parser_rules: str | None = None,
- require_external_endpoint: bool = True,
- ) -> tuple[str, str]:
- """Resolve ``(engine, process_options)`` for a source file before extraction.
- Resolution order (mirrors :func:`resolve_file_parser_engine`):
- 1. Filename ``[hint]`` — engine and / or options take precedence.
- 2. ``LIGHTRAG_PARSER`` rules — first matching rule provides defaults
- for whichever of engine / options the filename hint did not
- specify.
- 3. Default engine ``legacy`` with empty options.
- """
- suffix = parser_suffix(file_path)
- _validate_filename_hint_for_resolution(
- file_path,
- require_external_endpoint=require_external_endpoint,
- )
- hinted_engine, hinted_options = filename_parser_directives(file_path)
- if hinted_engine and not _engine_is_usable(
- hinted_engine, suffix, require_external_endpoint=require_external_endpoint
- ):
- # Hinted engine cannot handle this file (e.g. wrong suffix or missing
- # endpoint); fall back to rule-based resolution but keep the hinted
- # options if any.
- hinted_engine = None
- rule_engine, rule_options = _matching_rule_directives(
- file_path,
- parser_rules=parser_rules,
- require_external_endpoint=require_external_endpoint,
- )
- engine = hinted_engine or rule_engine or PARSER_ENGINE_LEGACY
- options_str = hinted_options or rule_options
- return engine, sanitize_process_options(options_str)
- def resolve_stored_document_parser_engine(
- file_path: str | Path,
- content_data: dict[str, Any] | None,
- ) -> str:
- """Resolve parser engine for a full_docs row during pipeline processing."""
- if content_data:
- doc_format = content_data.get("parse_format", FULL_DOCS_FORMAT_RAW)
- if doc_format == FULL_DOCS_FORMAT_LIGHTRAG and content_data.get(
- "sidecar_location"
- ):
- return PARSER_ENGINE_NATIVE
- if doc_format != FULL_DOCS_FORMAT_PENDING_PARSE:
- return PARSER_ENGINE_LEGACY
- suffix = parser_suffix(file_path)
- pending_engine = normalize_parser_engine(content_data.get("parse_engine"))
- if pending_engine in SUPPORTED_PARSER_ENGINES and parser_engine_supports_suffix(
- pending_engine, suffix
- ):
- return pending_engine
- return resolve_file_parser_engine(file_path)
|