_common.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. """Shared helpers for ``lightrag/parser/external/<engine>/`` packages.
  2. Currently consumed by the docling subpackage; expected to be reused when
  3. mineru is migrated under ``parser/external/mineru/``.
  4. These are pure functions with no engine-specific knowledge. Engine-specific
  5. logic (endpoint signature, options signature, cache validation policy) lives
  6. in each engine's own ``cache.py``.
  7. """
  8. from __future__ import annotations
  9. import hashlib
  10. import json
  11. import os
  12. import shutil
  13. from pathlib import Path
  14. from typing import Any
  15. from lightrag.constants import PARSED_DIR_SUFFIX
  16. from lightrag.utils import logger
  17. def compute_size_and_hash(path: Path) -> tuple[int, str]:
  18. """Single-read computation of ``(size_bytes, "sha256:<hex>")``.
  19. Manifest writes use this so the recorded size and hash are guaranteed to
  20. describe the same byte stream; using two ``open()`` calls would risk a
  21. TOCTOU mismatch if the file changed in between.
  22. """
  23. h = hashlib.sha256()
  24. size = 0
  25. with path.open("rb") as f:
  26. for chunk in iter(lambda: f.read(1 << 20), b""):
  27. h.update(chunk)
  28. size += len(chunk)
  29. return size, f"sha256:{h.hexdigest()}"
  30. def clear_dir_contents(directory: Path) -> None:
  31. """Delete everything inside ``directory`` but keep ``directory`` itself."""
  32. if not directory.exists():
  33. return
  34. for entry in directory.iterdir():
  35. try:
  36. if entry.is_dir() and not entry.is_symlink():
  37. shutil.rmtree(entry, ignore_errors=True)
  38. else:
  39. entry.unlink()
  40. except OSError:
  41. continue
  42. def raw_dir_for_parsed_dir(parsed_dir: Path, *, suffix: str) -> Path:
  43. """Sibling raw dir for a ``*.parsed`` dir.
  44. ``foo.parsed/`` with ``suffix=".docling_raw"`` → ``foo.docling_raw/``.
  45. ``suffix`` must start with ``.`` and be engine-specific (the caller
  46. binds it via ``functools.partial`` or a thin wrapper).
  47. """
  48. if not suffix.startswith("."):
  49. raise ValueError(f"raw dir suffix must start with '.', got {suffix!r}")
  50. stem = parsed_dir.name
  51. if stem.endswith(PARSED_DIR_SUFFIX):
  52. stem = stem[: -len(PARSED_DIR_SUFFIX)]
  53. return parsed_dir.parent / f"{stem}{suffix}"
  54. def env_bool(name: str, default: bool) -> bool:
  55. raw = os.getenv(name, "").strip().lower()
  56. if raw in {"1", "true", "yes", "on"}:
  57. return True
  58. if raw in {"0", "false", "no", "off"}:
  59. return False
  60. return default
  61. def env_int(name: str, default: int) -> int:
  62. raw = os.getenv(name, "").strip()
  63. if not raw:
  64. return default
  65. try:
  66. return int(raw)
  67. except ValueError:
  68. logger.warning(
  69. "[external_parser] %s=%r is not an integer; using %s", name, raw, default
  70. )
  71. return default
  72. def env_json(name: str, default: Any) -> Any:
  73. """Parse a JSON env var; on parse error log a warning and return default."""
  74. raw = os.getenv(name, "").strip()
  75. if not raw:
  76. return default
  77. try:
  78. return json.loads(raw)
  79. except json.JSONDecodeError:
  80. logger.warning(
  81. "[external_parser] %s=%r is not valid JSON; using default", name, raw
  82. )
  83. return default
  84. def response_error_detail(resp: Any, *, limit: int = 1000) -> str:
  85. """Return a compact response body snippet for HTTP error reporting."""
  86. try:
  87. payload = resp.json() if getattr(resp, "text", "") else None
  88. except Exception:
  89. payload = None
  90. if payload is not None:
  91. try:
  92. detail = json.dumps(payload, ensure_ascii=False, sort_keys=True)
  93. except TypeError:
  94. detail = repr(payload)
  95. else:
  96. detail = str(getattr(resp, "text", "") or "").strip()
  97. detail = " ".join(detail.split())
  98. if not detail:
  99. return "empty response body"
  100. if len(detail) > limit:
  101. return f"{detail[:limit]}...<truncated>"
  102. return detail
  103. def raise_for_status_with_detail(resp: Any, operation: str) -> None:
  104. """Raise an HTTP error that preserves service-provided response details.
  105. Treats any non-2xx response as an error, matching httpx's
  106. ``raise_for_status`` status handling (which also raises on 1xx/3xx,
  107. not just 4xx/5xx) while attaching a compact response-body snippet to
  108. the message for faster diagnosis.
  109. """
  110. status_code = int(getattr(resp, "status_code", 0) or 0)
  111. if 200 <= status_code < 300:
  112. return
  113. detail = response_error_detail(resp)
  114. raise RuntimeError(f"{operation} failed: HTTP {status_code} {detail}")
  115. __all__ = [
  116. "clear_dir_contents",
  117. "compute_size_and_hash",
  118. "env_bool",
  119. "env_int",
  120. "env_json",
  121. "raise_for_status_with_detail",
  122. "raw_dir_for_parsed_dir",
  123. "response_error_detail",
  124. ]