test_file_atomic.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. """Tests for ``lightrag.file_atomic`` — the shared atomic-write helpers.
  2. These tests cover the helper in isolation. End-to-end coverage of the
  3. individual storage backends that build on it lives in
  4. ``test_networkx_atomic_write.py``, ``test_atomic_write_write_json.py``,
  5. ``test_atomic_write_faiss.py``, and ``test_atomic_write_nano.py``.
  6. """
  7. import os
  8. import stat
  9. import sys
  10. import threading
  11. import time
  12. from unittest.mock import patch
  13. import pytest
  14. from lightrag.file_atomic import (
  15. TMP_REAP_AGE_SECONDS,
  16. atomic_write,
  17. reap_orphan_tmp_files,
  18. tmp_path_for,
  19. )
  20. @pytest.mark.offline
  21. def test_tmp_path_for_unique_across_concurrent_writers():
  22. """Mirror the production call pattern — each real writer calls
  23. ``tmp_path_for`` once per atomic_write. Across N concurrent writers,
  24. every tmp path must be distinct so no writer's ``os.replace`` can hit a
  25. sibling that another writer already renamed away.
  26. Same-thread back-to-back calls inside one ns tick are intentionally not
  27. tested: production never does that (write_fn does real IO between calls)
  28. and ``time.time_ns()`` resolution on some platforms (notably macOS) is
  29. coarse enough that a tight loop will collide."""
  30. paths: list[str] = []
  31. lock = threading.Lock()
  32. barrier = threading.Barrier(16)
  33. def collect():
  34. barrier.wait()
  35. p = tmp_path_for("/tmp/x")
  36. with lock:
  37. paths.append(p)
  38. threads = [threading.Thread(target=collect) for _ in range(16)]
  39. for t in threads:
  40. t.start()
  41. for t in threads:
  42. t.join()
  43. assert len(paths) == len(set(paths)), "tmp_path_for must be unique across writers"
  44. for p in paths:
  45. assert p.startswith("/tmp/x.tmp.")
  46. @pytest.mark.offline
  47. def test_atomic_write_publishes_file_via_replace(tmp_path):
  48. dst = str(tmp_path / "out.txt")
  49. def writer(tmp):
  50. with open(tmp, "w") as f:
  51. f.write("hello")
  52. atomic_write(dst, writer)
  53. assert open(dst).read() == "hello"
  54. assert [p for p in os.listdir(tmp_path) if ".tmp." in p] == []
  55. @pytest.mark.offline
  56. def test_atomic_write_write_fn_exception_cleans_tmp_and_preserves_prior(tmp_path):
  57. """If ``write_fn`` raises, the prior destination must survive and the
  58. tmp must be removed by ``atomic_write``'s ``finally``."""
  59. dst = str(tmp_path / "out.txt")
  60. def commit_v1(tmp):
  61. with open(tmp, "w") as f:
  62. f.write("v1")
  63. atomic_write(dst, commit_v1)
  64. def boom(tmp):
  65. with open(tmp, "w") as f:
  66. f.write("partial")
  67. raise RuntimeError("boom")
  68. with pytest.raises(RuntimeError, match="boom"):
  69. atomic_write(dst, boom)
  70. assert open(dst).read() == "v1"
  71. leftovers = [p for p in os.listdir(tmp_path) if ".tmp." in p]
  72. assert leftovers == [], f"write_fn exception must clean tmp, got {leftovers}"
  73. @pytest.mark.offline
  74. def test_atomic_write_replace_exception_cleans_tmp_and_preserves_prior(tmp_path):
  75. """If ``os.replace`` raises, the prior destination must survive and the
  76. tmp must be removed."""
  77. dst = str(tmp_path / "out.txt")
  78. def commit(tmp, payload):
  79. with open(tmp, "w") as f:
  80. f.write(payload)
  81. atomic_write(dst, lambda tmp: commit(tmp, "v1"))
  82. with patch(
  83. "lightrag.file_atomic.os.replace",
  84. side_effect=OSError("simulated crash"),
  85. ):
  86. with pytest.raises(OSError, match="simulated crash"):
  87. atomic_write(dst, lambda tmp: commit(tmp, "v2"))
  88. assert open(dst).read() == "v1"
  89. leftovers = [p for p in os.listdir(tmp_path) if ".tmp." in p]
  90. assert leftovers == [], f"os.replace exception must clean tmp, got {leftovers}"
  91. @pytest.mark.offline
  92. @pytest.mark.skipif(sys.platform == "win32", reason="POSIX chmod semantics")
  93. def test_atomic_write_preserves_existing_mode(tmp_path):
  94. """The inode swap done by ``os.replace`` would otherwise inherit fresh
  95. tmp permissions and silently widen a 0600 destination."""
  96. dst = str(tmp_path / "secret.txt")
  97. atomic_write(dst, lambda tmp: open(tmp, "w").write("seed"))
  98. os.chmod(dst, 0o600)
  99. assert stat.S_IMODE(os.stat(dst).st_mode) == 0o600
  100. atomic_write(dst, lambda tmp: open(tmp, "w").write("updated"))
  101. assert stat.S_IMODE(os.stat(dst).st_mode) == 0o600
  102. assert open(dst).read() == "updated"
  103. @pytest.mark.offline
  104. def test_reap_orphan_tmp_files_respects_age_and_locality(tmp_path):
  105. """Aged tmp siblings get reaped; fresh ones (potentially belonging to a
  106. live concurrent writer) and unrelated paths are left alone."""
  107. dst = str(tmp_path / "data.json")
  108. old_tmp = f"{dst}.tmp.111.222.333"
  109. young_tmp = f"{dst}.tmp.444.555.666"
  110. unrelated = str(tmp_path / "other.json.tmp.999")
  111. for p in (old_tmp, young_tmp, unrelated):
  112. with open(p, "w") as fh:
  113. fh.write("partial")
  114. aged_mtime = time.time() - (TMP_REAP_AGE_SECONDS + 60)
  115. os.utime(old_tmp, (aged_mtime, aged_mtime))
  116. reap_orphan_tmp_files(dst)
  117. assert not os.path.exists(old_tmp)
  118. assert os.path.exists(young_tmp)
  119. assert os.path.exists(unrelated)
  120. @pytest.mark.offline
  121. def test_reap_orphan_tmp_files_handles_glob_metacharacters(tmp_path):
  122. """``file_name`` is composed from workspace + namespace, both of which
  123. can legitimately contain glob metacharacters on POSIX. The reaper must
  124. match literally — not miss the real orphan because ``[v2]`` parses as a
  125. character class, nor widen its pattern to match unrelated siblings."""
  126. dst = str(tmp_path / "data_[v2].json")
  127. real_orphan = f"{dst}.tmp.111.222.333"
  128. decoy = str(tmp_path / "data_v.json.tmp.unrelated")
  129. for p in (real_orphan, decoy):
  130. with open(p, "w") as fh:
  131. fh.write("partial")
  132. aged_mtime = time.time() - (TMP_REAP_AGE_SECONDS + 60)
  133. for p in (real_orphan, decoy):
  134. os.utime(p, (aged_mtime, aged_mtime))
  135. reap_orphan_tmp_files(dst)
  136. assert not os.path.exists(real_orphan)
  137. assert os.path.exists(decoy), "Reaper must not match siblings of an unrelated path"
  138. @pytest.mark.offline
  139. def test_reap_orphan_tmp_files_extra_patterns_clean_legacy_residue(tmp_path):
  140. """The default ``.tmp.*`` pattern intentionally does not match a bare
  141. trailing ``.tmp`` (the historical Faiss meta suffix). ``extra_patterns``
  142. is the migration path for those residues."""
  143. import glob
  144. dst = str(tmp_path / "meta.json")
  145. legacy_tmp = f"{dst}.tmp"
  146. with open(legacy_tmp, "w") as fh:
  147. fh.write("legacy partial")
  148. aged_mtime = time.time() - (TMP_REAP_AGE_SECONDS + 60)
  149. os.utime(legacy_tmp, (aged_mtime, aged_mtime))
  150. # Default pattern leaves the legacy residue.
  151. reap_orphan_tmp_files(dst)
  152. assert os.path.exists(legacy_tmp)
  153. # Explicit migration pattern clears it.
  154. reap_orphan_tmp_files(dst, extra_patterns=(glob.escape(dst) + ".tmp",))
  155. assert not os.path.exists(legacy_tmp)