file_atomic.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """Shared atomic file-write helpers.
  2. Why this lives at the package root rather than under ``lightrag/kg/``:
  3. ``lightrag.utils.write_json`` needs ``atomic_write`` to gain crash safety,
  4. and several ``lightrag/kg/*`` modules need both. Hosting the helpers under
  5. ``lightrag/kg/`` would create a ``utils -> kg -> utils`` import cycle.
  6. Keeping this module dependency-free (stdlib only) avoids that.
  7. Semantics
  8. ---------
  9. ``atomic_write`` writes through a per-writer ``.tmp.<pid>.<tid>.<ns>`` sibling
  10. and renames into place with ``os.replace`` — atomic on the same filesystem on
  11. both POSIX (``rename(2)``) and Windows (``MoveFileEx`` with
  12. ``MOVEFILE_REPLACE_EXISTING``). Two failure modes are handled differently:
  13. - A Python exception (``write_fn`` raised, ``os.replace`` failed, etc.):
  14. ``finally`` runs, the in-flight tmp is removed best-effort, and the
  15. exception propagates. The on-disk destination is the prior snapshot.
  16. - A process-level kill (SIGKILL, OOM, hard reboot) between writing the tmp
  17. and the rename: ``finally`` does not run, the tmp survives as an orphan,
  18. and ``reap_orphan_tmp_files`` cleans it on the next startup once it ages
  19. past the threshold.
  20. What is *not* preserved across the inode swap done by ``os.replace``: owner,
  21. group, ACLs, xattrs, hard-link relationships, and any symlink-target identity.
  22. The mode bits (rwx) are preserved explicitly — see ``_preserve_mode``.
  23. """
  24. from __future__ import annotations
  25. import glob
  26. import logging
  27. import os
  28. import stat
  29. import threading
  30. import time
  31. from typing import Callable
  32. logger = logging.getLogger("lightrag")
  33. # Orphan .tmp files older than this are reaped on startup. Large enough that
  34. # an in-flight write from another live process cannot plausibly still be
  35. # running (multi-million-node graphml writes finish in minutes, not hours).
  36. TMP_REAP_AGE_SECONDS = 3600
  37. def tmp_path_for(file_name: str) -> str:
  38. """Return a per-writer tmp sibling for ``file_name``.
  39. The suffix embeds PID, thread id, and a nanosecond timestamp so that
  40. multiple concurrent writers — separate processes sharing the same working
  41. directory, or multiple threads inside one process — cannot trample each
  42. other's in-flight tmp and leave a "no such file" rename error behind.
  43. """
  44. return f"{file_name}.tmp.{os.getpid()}.{threading.get_ident()}.{time.time_ns()}"
  45. def _preserve_mode(tmp: str, dst: str, workspace: str) -> None:
  46. """Carry ``dst``'s existing mode bits onto ``tmp`` before the rename.
  47. Without this, ``os.replace`` swaps the inode and the new file inherits
  48. umask defaults — any intentional restriction (e.g. chmod 0600) on the
  49. prior snapshot would be silently widened.
  50. """
  51. if not os.path.exists(dst):
  52. return
  53. try:
  54. os.chmod(tmp, stat.S_IMODE(os.stat(dst).st_mode))
  55. except OSError as exc:
  56. logger.warning(f"[{workspace}] Could not preserve mode of {dst}: {exc}")
  57. def reap_orphan_tmp_files(
  58. file_name: str,
  59. workspace: str = "_",
  60. age_seconds: int = TMP_REAP_AGE_SECONDS,
  61. extra_patterns: tuple[str, ...] = (),
  62. ) -> None:
  63. """Delete stale tmp siblings of ``file_name`` left behind by hard kills.
  64. Default pattern matches ``glob.escape(file_name) + ".tmp.*"`` — the suffix
  65. shape produced by ``tmp_path_for``. ``extra_patterns`` accepts already-built
  66. glob patterns and is intended for migrating away from legacy naming
  67. schemes (e.g. Faiss's previous fixed ``<meta>.tmp`` suffix, which the
  68. default pattern's trailing ``.*`` will not match).
  69. ``glob.escape`` is required because ``file_name`` is composed from
  70. ``working_dir + namespace`` and can legitimately contain glob
  71. metacharacters (workspace ``[v2]``, ``*``, ``?``). Concatenating naively
  72. would silently miss the real orphan or widen the match to tmp files of
  73. unrelated storage types.
  74. """
  75. patterns = [glob.escape(file_name) + ".tmp.*", *extra_patterns]
  76. now = time.time()
  77. for pattern in patterns:
  78. for path in glob.glob(pattern):
  79. try:
  80. age = now - os.path.getmtime(path)
  81. except OSError:
  82. continue
  83. if age < age_seconds:
  84. continue
  85. try:
  86. os.remove(path)
  87. logger.info(
  88. f"[{workspace}] Reaped orphan tmp file: {path} (age {age:.0f}s)"
  89. )
  90. except OSError as exc:
  91. logger.warning(
  92. f"[{workspace}] Failed to reap orphan tmp file {path}: {exc}"
  93. )
  94. def atomic_write(
  95. file_name: str,
  96. write_fn: Callable[[str], None],
  97. workspace: str = "_",
  98. ) -> None:
  99. """Run ``write_fn(tmp_path)`` then atomically replace ``file_name`` with it.
  100. ``write_fn`` is responsible for actually producing the file contents at
  101. the path it receives. It must not assume the tmp path equals ``file_name``
  102. — Faiss/Nano callers rely on the tmp path being a real sibling.
  103. On any exception from ``write_fn`` or from the rename, the tmp is removed
  104. best-effort and the exception propagates. The destination file is not
  105. touched in that case.
  106. """
  107. tmp = tmp_path_for(file_name)
  108. try:
  109. write_fn(tmp)
  110. _preserve_mode(tmp, file_name, workspace)
  111. os.replace(tmp, file_name)
  112. except BaseException:
  113. try:
  114. if os.path.exists(tmp):
  115. os.remove(tmp)
  116. except OSError as exc:
  117. logger.warning(
  118. f"[{workspace}] Failed to remove tmp after failed atomic write: {exc}"
  119. )
  120. raise