json_kv_impl.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. import os
  2. from dataclasses import dataclass
  3. from typing import Any, final
  4. from lightrag.base import (
  5. BaseKVStorage,
  6. )
  7. from lightrag.file_atomic import reap_orphan_tmp_files
  8. from lightrag.utils import (
  9. _cooperative_yield,
  10. load_json,
  11. logger,
  12. write_json,
  13. )
  14. from lightrag.exceptions import StorageNotInitializedError
  15. from .shared_storage import (
  16. get_namespace_data,
  17. get_namespace_lock,
  18. get_data_init_lock,
  19. get_update_flag,
  20. set_all_update_flags,
  21. clear_all_update_flags,
  22. try_initialize_namespace,
  23. )
  24. @final
  25. @dataclass
  26. class JsonKVStorage(BaseKVStorage):
  27. """JSON-file-backed KV storage with **shared in-memory state across processes**.
  28. This class uses a *fundamentally different* cross-process model from
  29. ``NanoVectorDBStorage`` / ``FaissVectorDBStorage`` / ``NetworkXStorage``
  30. (which keep one in-memory copy per process and reconcile via file
  31. reloads). Compare carefully before changing either side.
  32. Storage model:
  33. ``self._data`` is **not** a per-process dict — it is the value
  34. returned by ``get_namespace_data(namespace, workspace=...)``, i.e.
  35. a reference into ``shared_storage._shared_dicts``. In multi-
  36. process mode this is a ``multiprocessing.Manager().dict()`` proxy
  37. that every worker sees the **same instance** of; in single-
  38. process mode it degrades to a plain ``dict``. Either way, a
  39. mutation in any process is *immediately* visible to every other
  40. process — there is no reload needed.
  41. The on-disk file at
  42. ``working_dir/[workspace/]kv_store_<namespace>.json`` exists for
  43. durability only. It is the source of truth at startup and the
  44. target of ``index_done_callback`` flushes, but is **not** part of
  45. the steady-state read/write path.
  46. First-time load (``initialize``):
  47. ``try_initialize_namespace`` is a global init lock that returns
  48. ``True`` to exactly one process per ``(namespace, workspace)``.
  49. That process reads the JSON file and populates ``self._data``
  50. under ``_storage_lock``. Other processes skip the load — they
  51. will see the data through the same shared ``self._data`` proxy.
  52. Cross-process sync protocol (note: reversed semantics vs file-backed
  53. classes):
  54. Anyone writing (``upsert`` / ``delete`` / ``drop``):
  55. 1. Mutate ``self._data`` under ``_storage_lock`` (same lock,
  56. same dict, all processes see the change immediately).
  57. 2. Call ``set_all_update_flags`` to mark **every** process's
  58. ``storage_updated`` flag ``True``. Here ``True`` means
  59. *"there is dirty data that still needs to be flushed"*,
  60. not *"there is fresher data on disk that I need to
  61. reload"* as in the file-backed implementations.
  62. Commit (``index_done_callback``):
  63. 1. Under ``_storage_lock``, if ``storage_updated.value`` is
  64. ``True``, snapshot ``self._data`` and write it to disk
  65. via ``write_json`` (atomic).
  66. 2. ``clear_all_update_flags`` — wipe every process's flag
  67. back to ``False``. Because the in-memory state is already
  68. consistent across processes, there is nothing for the
  69. *other* processes to do; the clear is just a
  70. "the dirty data has been persisted" signal.
  71. Lock scope:
  72. ``_storage_lock`` is a per-``(namespace, workspace)`` keyed lock
  73. spanning intra-process coroutines **and** inter-process workers.
  74. Unlike the file-backed classes (which only lock reload/commit
  75. critical sections), this class **holds the lock over every
  76. ``self._data`` access** — read or write — because the underlying
  77. ``Manager().dict()`` is not free-threaded across processes.
  78. Two places intentionally do work outside the lock for latency
  79. reasons:
  80. * ``upsert`` performs its per-key timestamp prep loop inside
  81. the lock but yields to the event loop via
  82. ``_cooperative_yield`` between keys (safe: ``NamespaceLock``
  83. is non-reentrant, so siblings blocked on it stay blocked).
  84. * ``JsonDocStatusStorage.upsert`` prepares its caller-supplied
  85. dict outside the lock (it only mutates the input, not the
  86. shared store).
  87. Who can write:
  88. Pipeline ``busy`` still serializes the document ingest / purge
  89. flows, but the *file-flush trigger* is symmetric: any process
  90. whose ``storage_updated.value`` is ``True`` when
  91. ``index_done_callback`` fires will perform the write. In a
  92. single-writer pipeline this is always the same process; if you
  93. ever permit multiple writers, two processes may race to flush
  94. the same in-memory state — that race is safe (both flush the
  95. same shared dict, ``write_json`` is atomic per file) but
  96. wasteful, and the ``clear_all_update_flags`` after each flush
  97. means subsequent re-flushes are no-ops.
  98. Caveats vs file-backed implementations:
  99. * **No reload path.** If something writes to the on-disk file
  100. out of band, this class will not pick it up until restart.
  101. The file is only ever written by ``index_done_callback`` and
  102. read once in ``initialize``.
  103. * **No ``_get_*`` entry method.** Adding one would be wrong —
  104. there's nothing to "get fresher than" since the in-memory
  105. state is already the shared, authoritative view.
  106. * **``write_json`` may sanitize.** If sanitization happens, the
  107. on-disk JSON differs from what was in memory; the callback
  108. re-reads the cleaned file back into ``self._data`` under the
  109. same lock so the shared view stays consistent with disk.
  110. Non-pipeline write paths:
  111. * ``drop`` — destructive, **not** serialized by this storage
  112. class. Currently gated by the API layer
  113. (``/documents/clear``); any new caller must hold the pipeline
  114. ``busy`` reservation.
  115. * ``upsert`` / ``delete`` invoked from non-pipeline admin flows
  116. (cache management, etc.) — safe under the shared-lock model,
  117. but consumers should still respect the pipeline gate to avoid
  118. interleaving with batched ingest work.
  119. """
  120. def __post_init__(self):
  121. working_dir = self.global_config["working_dir"]
  122. if self.workspace:
  123. # Include workspace in the file path for data isolation
  124. workspace_dir = os.path.join(working_dir, self.workspace)
  125. else:
  126. # Default behavior when workspace is empty
  127. workspace_dir = working_dir
  128. self.workspace = ""
  129. os.makedirs(workspace_dir, exist_ok=True)
  130. self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json")
  131. self._data = None
  132. self._storage_lock = None
  133. self.storage_updated = None
  134. reap_orphan_tmp_files(self._file_name, self.workspace or "_")
  135. async def initialize(self):
  136. """Bind to the shared namespace dict and load from disk on first init.
  137. ``try_initialize_namespace`` is a global init lock that returns
  138. ``True`` for exactly one process per ``(namespace, workspace)``;
  139. that process reads the JSON file and populates the shared
  140. ``self._data`` under ``_storage_lock``. Subsequent processes
  141. skip the file read — they will see the same shared dict via
  142. ``get_namespace_data``.
  143. For ``*_cache`` namespaces an extra
  144. ``_migrate_legacy_cache_structure`` pass runs against the loaded
  145. data and may rewrite the on-disk file if a migration was applied.
  146. """
  147. self._storage_lock = get_namespace_lock(
  148. self.namespace, workspace=self.workspace
  149. )
  150. self.storage_updated = await get_update_flag(
  151. self.namespace, workspace=self.workspace
  152. )
  153. async with get_data_init_lock():
  154. # check need_init must before get_namespace_data
  155. need_init = await try_initialize_namespace(
  156. self.namespace, workspace=self.workspace
  157. )
  158. self._data = await get_namespace_data(
  159. self.namespace, workspace=self.workspace
  160. )
  161. if need_init:
  162. loaded_data = load_json(self._file_name) or {}
  163. async with self._storage_lock:
  164. # Migrate legacy cache structure if needed
  165. if self.namespace.endswith("_cache"):
  166. loaded_data = await self._migrate_legacy_cache_structure(
  167. loaded_data
  168. )
  169. self._data.update(loaded_data)
  170. data_count = len(loaded_data)
  171. logger.info(
  172. f"[{self.workspace}] Process {os.getpid()} KV load {self.namespace} with {data_count} records"
  173. )
  174. async def index_done_callback(self) -> None:
  175. """Flush dirty in-memory state to disk and clear all dirty flags.
  176. Commit point in the shared-memory protocol (see class docstring,
  177. *Cross-process sync protocol*). Steps:
  178. 1. Under ``_storage_lock``, check this process's
  179. ``storage_updated.value``. If ``False``, nothing to do —
  180. return.
  181. 2. Snapshot ``self._data`` (converting from ``Manager.dict``
  182. proxy to a plain ``dict`` so the JSON encoder doesn't trip
  183. over the proxy) and write it via ``write_json``.
  184. 3. If ``write_json`` reports sanitization was applied, the
  185. on-disk file no longer matches what was in memory — reload
  186. the cleaned data back into ``self._data`` under the same
  187. lock so the shared view stays consistent.
  188. 4. ``clear_all_update_flags`` — wipe every process's
  189. ``storage_updated`` flag back to ``False``, signaling
  190. that the dirty data has been persisted.
  191. Note the **semantic difference** from the file-backed classes'
  192. commit: there is no ``set_all_update_flags`` here. The shared
  193. dict is already consistent across processes; the only thing
  194. ``index_done_callback`` does globally is *clear* the dirty
  195. flags.
  196. """
  197. async with self._storage_lock:
  198. if self.storage_updated.value:
  199. data_dict = (
  200. dict(self._data) if hasattr(self._data, "_getvalue") else self._data
  201. )
  202. # Calculate data count - all data is now flattened
  203. data_count = len(data_dict)
  204. logger.debug(
  205. f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
  206. )
  207. # Write JSON and check if sanitization was applied
  208. needs_reload = write_json(data_dict, self._file_name)
  209. # If data was sanitized, reload cleaned data to update shared memory
  210. if needs_reload:
  211. logger.info(
  212. f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
  213. )
  214. cleaned_data = load_json(self._file_name)
  215. if cleaned_data is not None:
  216. self._data.clear()
  217. self._data.update(cleaned_data)
  218. await clear_all_update_flags(self.namespace, workspace=self.workspace)
  219. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  220. async with self._storage_lock:
  221. result = self._data.get(id)
  222. if result:
  223. # Create a copy to avoid modifying the original data
  224. result = dict(result)
  225. # Ensure time fields are present, provide default values for old data
  226. result.setdefault("create_time", 0)
  227. result.setdefault("update_time", 0)
  228. # Ensure _id field contains the clean ID
  229. result["_id"] = id
  230. return result
  231. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  232. async with self._storage_lock:
  233. results = []
  234. for id in ids:
  235. data = self._data.get(id, None)
  236. if data:
  237. # Create a copy to avoid modifying the original data
  238. result = {k: v for k, v in data.items()}
  239. # Ensure time fields are present, provide default values for old data
  240. result.setdefault("create_time", 0)
  241. result.setdefault("update_time", 0)
  242. # Ensure _id field contains the clean ID
  243. result["_id"] = id
  244. results.append(result)
  245. else:
  246. results.append(None)
  247. return results
  248. async def filter_keys(self, keys: set[str]) -> set[str]:
  249. async with self._storage_lock:
  250. return set(keys) - set(self._data.keys())
  251. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  252. """Insert or update KV records in shared memory; mark all processes dirty.
  253. Two side effects under ``_storage_lock``:
  254. 1. Stamp ``create_time`` / ``update_time`` / ``_id`` on each
  255. value, then ``self._data.update(data)``. Because
  256. ``self._data`` is the shared ``Manager.dict()`` proxy, the
  257. update is visible to all processes immediately — no
  258. reload needed.
  259. 2. ``set_all_update_flags`` — flip every process's
  260. ``storage_updated.value`` to ``True``. Here ``True``
  261. means *"there is dirty data that still needs to be
  262. flushed to disk"*, **not** *"there is fresher data on
  263. disk"* as in the file-backed classes (see class docstring
  264. for the contrast).
  265. Persistence is deferred to the next ``index_done_callback`` (the
  266. pipeline calls this via ``_insert_done()`` after each batch).
  267. Note: the per-key prep loop calls ``_cooperative_yield`` inside
  268. the lock. That is safe because ``NamespaceLock`` is non-
  269. reentrant — siblings waiting on this lock stay blocked across
  270. the yield; only unrelated coroutines benefit from the yield.
  271. """
  272. if not data:
  273. return
  274. import time
  275. current_time = int(time.time()) # Get current Unix timestamp
  276. logger.debug(
  277. f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}"
  278. )
  279. if self._storage_lock is None:
  280. raise StorageNotInitializedError("JsonKVStorage")
  281. async with self._storage_lock:
  282. # Add timestamps to data based on whether key exists.
  283. # The loop reads self._data (k in self._data) so it must stay inside
  284. # the lock. _cooperative_yield is safe here: NamespaceLock is
  285. # non-reentrant, so other coroutines waiting on this lock will block
  286. # until we release it; the yield only benefits unrelated coroutines.
  287. for i, (k, v) in enumerate(data.items(), start=1):
  288. # For text_chunks namespace, ensure llm_cache_list field exists
  289. if self.namespace.endswith("text_chunks"):
  290. if "llm_cache_list" not in v:
  291. v["llm_cache_list"] = []
  292. # Add timestamps based on whether key exists
  293. if k in self._data: # Key exists, only update update_time
  294. v["update_time"] = current_time
  295. else: # New key, set both create_time and update_time
  296. v["create_time"] = current_time
  297. v["update_time"] = current_time
  298. v["_id"] = k
  299. await _cooperative_yield(i)
  300. self._data.update(data)
  301. await set_all_update_flags(self.namespace, workspace=self.workspace)
  302. async def delete(self, ids: list[str]) -> None:
  303. """Remove records from shared memory; mark all processes dirty if any deleted.
  304. Under ``_storage_lock``: ``self._data.pop(doc_id, None)`` for
  305. each id. Only calls ``set_all_update_flags`` if at least one key
  306. was actually present (avoids creating spurious dirty state for
  307. no-op deletes).
  308. See class docstring for the shared-memory + dirty-flag protocol
  309. and the semantic contrast vs file-backed classes.
  310. Args:
  311. ids: List of document IDs to be deleted from storage
  312. """
  313. async with self._storage_lock:
  314. any_deleted = False
  315. for doc_id in ids:
  316. result = self._data.pop(doc_id, None)
  317. if result is not None:
  318. any_deleted = True
  319. if any_deleted:
  320. await set_all_update_flags(self.namespace, workspace=self.workspace)
  321. async def is_empty(self) -> bool:
  322. """Check if the storage is empty
  323. Returns:
  324. bool: True if storage contains no data, False otherwise
  325. """
  326. async with self._storage_lock:
  327. return len(self._data) == 0
  328. async def drop(self) -> dict[str, str]:
  329. """Clear shared memory and immediately persist the empty state.
  330. This method will:
  331. 1. Clear the shared ``self._data`` dict under
  332. ``_storage_lock`` (visible to all processes immediately).
  333. 2. ``set_all_update_flags`` so every process knows there is
  334. dirty state pending persistence.
  335. 3. Call ``index_done_callback`` synchronously to flush the
  336. empty state to disk and clear the dirty flags.
  337. Caller contract:
  338. ``drop`` is destructive and **not** serialized by this
  339. storage class. The caller must hold the pipeline ``busy``
  340. reservation (the ``/documents/clear`` endpoint does this)
  341. before invoking it — running ``drop`` concurrently with an
  342. active document pipeline will wipe out in-flight work and
  343. silently lose data. See class docstring,
  344. *Non-pipeline write paths*.
  345. Returns:
  346. dict[str, str]: Operation status and message
  347. - On success: {"status": "success", "message": "data dropped"}
  348. - On failure: {"status": "error", "message": "<error details>"}
  349. """
  350. try:
  351. async with self._storage_lock:
  352. self._data.clear()
  353. await set_all_update_flags(self.namespace, workspace=self.workspace)
  354. await self.index_done_callback()
  355. logger.info(
  356. f"[{self.workspace}] Process {os.getpid()} drop {self.namespace}"
  357. )
  358. return {"status": "success", "message": "data dropped"}
  359. except Exception as e:
  360. logger.error(f"[{self.workspace}] Error dropping {self.namespace}: {e}")
  361. return {"status": "error", "message": str(e)}
  362. async def _migrate_legacy_cache_structure(self, data: dict) -> dict:
  363. """Migrate legacy nested cache structure to flattened structure
  364. Args:
  365. data: Original data dictionary that may contain legacy structure
  366. Returns:
  367. Migrated data dictionary with flattened cache keys (sanitized if needed)
  368. """
  369. from lightrag.utils import generate_cache_key
  370. # Early return if data is empty
  371. if not data:
  372. return data
  373. # Check first entry to see if it's already in new format
  374. first_key = next(iter(data.keys()))
  375. if ":" in first_key and len(first_key.split(":")) == 3:
  376. # Already in flattened format, return as-is
  377. return data
  378. migrated_data = {}
  379. migration_count = 0
  380. for key, value in data.items():
  381. # Check if this is a legacy nested cache structure
  382. if isinstance(value, dict) and all(
  383. isinstance(v, dict) and "return" in v for v in value.values()
  384. ):
  385. # This looks like a legacy cache mode with nested structure
  386. mode = key
  387. for cache_hash, cache_entry in value.items():
  388. cache_type = cache_entry.get("cache_type", "extract")
  389. flattened_key = generate_cache_key(mode, cache_type, cache_hash)
  390. migrated_data[flattened_key] = cache_entry
  391. migration_count += 1
  392. else:
  393. # Keep non-cache data or already flattened cache data as-is
  394. migrated_data[key] = value
  395. if migration_count > 0:
  396. logger.info(
  397. f"[{self.workspace}] Migrated {migration_count} legacy cache entries to flattened structure"
  398. )
  399. # Persist migrated data immediately and check if sanitization was applied
  400. needs_reload = write_json(migrated_data, self._file_name)
  401. # If data was sanitized during write, reload cleaned data
  402. if needs_reload:
  403. logger.info(
  404. f"[{self.workspace}] Reloading sanitized migration data for {self.namespace}"
  405. )
  406. cleaned_data = load_json(self._file_name)
  407. if cleaned_data is not None:
  408. return cleaned_data # Return cleaned data to update shared memory
  409. return migrated_data
  410. async def finalize(self):
  411. """On shutdown, flush ``*_cache`` namespaces to disk.
  412. Cache namespaces are routinely written to during query/extract
  413. without triggering an immediate ``index_done_callback`` (caches
  414. churn fast and the pipeline doesn't always end at a natural
  415. commit point). This hook ensures whatever dirty cache state is
  416. in shared memory at process exit gets persisted, so the next
  417. run can pick it up.
  418. Non-cache namespaces don't need this — their writes already
  419. flow through pipeline-driven ``_insert_done()`` commits.
  420. """
  421. if self.namespace.endswith("_cache"):
  422. await self.index_done_callback()