json_doc_status_impl.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. from dataclasses import dataclass
  2. import os
  3. from typing import Any, Union, final
  4. from lightrag.base import (
  5. DocProcessingStatus,
  6. DocStatus,
  7. DocStatusStorage,
  8. )
  9. from lightrag.file_atomic import reap_orphan_tmp_files
  10. from lightrag.utils import (
  11. _cooperative_yield,
  12. load_json,
  13. logger,
  14. write_json,
  15. get_pinyin_sort_key,
  16. )
  17. from lightrag.exceptions import StorageNotInitializedError
  18. from .shared_storage import (
  19. get_namespace_data,
  20. get_namespace_lock,
  21. get_data_init_lock,
  22. get_update_flag,
  23. set_all_update_flags,
  24. clear_all_update_flags,
  25. try_initialize_namespace,
  26. )
  27. @final
  28. @dataclass
  29. class JsonDocStatusStorage(DocStatusStorage):
  30. """JSON-file-backed document-status storage, sharing memory across processes.
  31. Uses the **same shared-memory + dirty-flag protocol** as
  32. ``JsonKVStorage`` — see that class's docstring for the canonical
  33. description of:
  34. * how ``self._data`` is a cross-process
  35. ``multiprocessing.Manager().dict()`` proxy obtained via
  36. ``get_namespace_data``;
  37. * how ``try_initialize_namespace`` ensures exactly one process
  38. reads the JSON file on first init;
  39. * how ``set_all_update_flags`` marks dirty state (semantics
  40. *reversed* from the file-backed classes
  41. ``NanoVectorDBStorage`` / ``FaissVectorDBStorage`` /
  42. ``NetworkXStorage``);
  43. * how ``index_done_callback`` flushes and calls
  44. ``clear_all_update_flags``;
  45. * why ``_storage_lock`` wraps **every** ``self._data`` access
  46. (not just commit / reload).
  47. Differences from ``JsonKVStorage`` (in this class only):
  48. * ``upsert`` calls ``index_done_callback`` synchronously after
  49. mutating shared memory, so doc-status changes hit disk
  50. immediately rather than being deferred to the pipeline's
  51. batched ``_insert_done()``. Rationale: doc-status is the
  52. recovery anchor for the ingest pipeline — if the process
  53. crashes after an in-memory upsert but before the next batch
  54. commit, the doc must still be visible as PENDING/PROCESSING
  55. on restart. The other writes (``delete``, ``drop``) follow
  56. the standard deferred-commit pattern.
  57. * Pre-upsert preparation (``chunks_list`` default) runs
  58. *outside* the lock because it only mutates the caller-
  59. supplied dict, not the shared store.
  60. * Read methods are richer (``get_docs_by_status`` /
  61. ``get_docs_by_track_id`` / ``get_docs_paginated`` /
  62. ``get_doc_by_file_path`` / etc.), but they all follow the
  63. same "acquire ``_storage_lock``, scan ``self._data``, copy
  64. values out before returning" template.
  65. Non-pipeline write paths:
  66. * ``drop`` — destructive, **not** serialized; the caller must
  67. hold the pipeline ``busy`` reservation (the
  68. ``/documents/clear`` endpoint does this).
  69. """
  70. def __post_init__(self):
  71. working_dir = self.global_config["working_dir"]
  72. if self.workspace:
  73. # Include workspace in the file path for data isolation
  74. workspace_dir = os.path.join(working_dir, self.workspace)
  75. else:
  76. # Default behavior when workspace is empty
  77. workspace_dir = working_dir
  78. self.workspace = ""
  79. os.makedirs(workspace_dir, exist_ok=True)
  80. self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json")
  81. self._data = None
  82. self._storage_lock = None
  83. self.storage_updated = None
  84. reap_orphan_tmp_files(self._file_name, self.workspace or "_")
  85. async def initialize(self):
  86. """Bind to the shared namespace dict and load from disk on first init.
  87. Same protocol as ``JsonKVStorage.initialize``: a global init
  88. lock (``try_initialize_namespace``) elects one process to read
  89. the JSON file into the shared ``self._data``; other processes
  90. skip the read and see the same shared dict.
  91. """
  92. self._storage_lock = get_namespace_lock(
  93. self.namespace, workspace=self.workspace
  94. )
  95. self.storage_updated = await get_update_flag(
  96. self.namespace, workspace=self.workspace
  97. )
  98. async with get_data_init_lock():
  99. # check need_init must before get_namespace_data
  100. need_init = await try_initialize_namespace(
  101. self.namespace, workspace=self.workspace
  102. )
  103. self._data = await get_namespace_data(
  104. self.namespace, workspace=self.workspace
  105. )
  106. if need_init:
  107. loaded_data = load_json(self._file_name) or {}
  108. async with self._storage_lock:
  109. self._data.update(loaded_data)
  110. logger.info(
  111. f"[{self.workspace}] Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records"
  112. )
  113. async def filter_keys(self, keys: set[str]) -> set[str]:
  114. """Return keys that should be processed (not in storage or not successfully processed)"""
  115. if self._storage_lock is None:
  116. raise StorageNotInitializedError("JsonDocStatusStorage")
  117. async with self._storage_lock:
  118. return set(keys) - set(self._data.keys())
  119. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  120. ordered_results: list[dict[str, Any] | None] = []
  121. if self._storage_lock is None:
  122. raise StorageNotInitializedError("JsonDocStatusStorage")
  123. async with self._storage_lock:
  124. for id in ids:
  125. data = self._data.get(id, None)
  126. if data:
  127. ordered_results.append(data.copy())
  128. else:
  129. ordered_results.append(None)
  130. return ordered_results
  131. async def get_status_counts(self) -> dict[str, int]:
  132. """Get counts of documents in each status"""
  133. counts = {status.value: 0 for status in DocStatus}
  134. if self._storage_lock is None:
  135. raise StorageNotInitializedError("JsonDocStatusStorage")
  136. async with self._storage_lock:
  137. for doc in self._data.values():
  138. counts[doc["status"]] += 1
  139. return counts
  140. async def get_docs_by_status(
  141. self, status: DocStatus
  142. ) -> dict[str, DocProcessingStatus]:
  143. """Get all documents with a specific status"""
  144. return await self.get_docs_by_statuses([status])
  145. async def get_docs_by_statuses(
  146. self, statuses: list[DocStatus]
  147. ) -> dict[str, DocProcessingStatus]:
  148. """Get all documents matching any of the given statuses in a single pass.
  149. Acquires the storage lock once and scans the in-memory dict once,
  150. filtering against a set of status values. More efficient than N separate
  151. get_docs_by_status() calls, which would acquire the lock N times and scan
  152. the data N times.
  153. """
  154. if not statuses:
  155. return {}
  156. status_values = {s.value for s in statuses}
  157. result = {}
  158. async with self._storage_lock:
  159. for k, v in self._data.items():
  160. if v["status"] not in status_values:
  161. continue
  162. try:
  163. data = v.copy()
  164. data.pop("content", None)
  165. if not data.get("file_path"):
  166. data["file_path"] = "no-file-path"
  167. if "metadata" not in data:
  168. data["metadata"] = {}
  169. if "error_msg" not in data:
  170. data["error_msg"] = None
  171. result[k] = DocProcessingStatus(**data)
  172. except (KeyError, TypeError) as e:
  173. logger.error(
  174. f"[{self.workspace}] Missing required field for document {k}: {e}"
  175. )
  176. continue
  177. return result
  178. async def get_docs_by_track_id(
  179. self, track_id: str
  180. ) -> dict[str, DocProcessingStatus]:
  181. """Get all documents with a specific track_id"""
  182. result = {}
  183. async with self._storage_lock:
  184. for k, v in self._data.items():
  185. if v.get("track_id") == track_id:
  186. try:
  187. # Make a copy of the data to avoid modifying the original
  188. data = v.copy()
  189. # Remove deprecated content field if it exists
  190. data.pop("content", None)
  191. # Normalize missing or null file_path
  192. if not data.get("file_path"):
  193. data["file_path"] = "no-file-path"
  194. # Ensure new fields exist with default values
  195. if "metadata" not in data:
  196. data["metadata"] = {}
  197. if "error_msg" not in data:
  198. data["error_msg"] = None
  199. result[k] = DocProcessingStatus(**data)
  200. except KeyError as e:
  201. logger.error(
  202. f"[{self.workspace}] Missing required field for document {k}: {e}"
  203. )
  204. continue
  205. return result
  206. async def index_done_callback(self) -> None:
  207. """Flush dirty shared memory to disk and clear all dirty flags.
  208. Identical commit protocol to ``JsonKVStorage.index_done_callback``
  209. (snapshot the shared dict → ``write_json`` → if sanitization
  210. happened reload the cleaned data → ``clear_all_update_flags``).
  211. See ``JsonKVStorage`` docstring for details.
  212. """
  213. async with self._storage_lock:
  214. if self.storage_updated.value:
  215. data_dict = (
  216. dict(self._data) if hasattr(self._data, "_getvalue") else self._data
  217. )
  218. logger.debug(
  219. f"[{self.workspace}] Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}"
  220. )
  221. # Write JSON and check if sanitization was applied
  222. needs_reload = write_json(data_dict, self._file_name)
  223. # If data was sanitized, reload cleaned data to update shared memory
  224. if needs_reload:
  225. logger.info(
  226. f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
  227. )
  228. cleaned_data = load_json(self._file_name)
  229. if cleaned_data is not None:
  230. self._data.clear()
  231. self._data.update(cleaned_data)
  232. await clear_all_update_flags(self.namespace, workspace=self.workspace)
  233. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  234. """Insert/update doc-status records and **persist immediately**.
  235. Differs from ``JsonKVStorage.upsert`` in that it calls
  236. ``index_done_callback`` synchronously at the end, so changes
  237. are flushed to disk before this coroutine returns. Rationale:
  238. doc-status is the recovery anchor for the ingest pipeline — if
  239. the process crashes after an in-memory upsert but before the
  240. next batch commit, the doc must still be visible as
  241. PENDING/PROCESSING on restart.
  242. Steps:
  243. 1. Pre-process the caller's dict (default ``chunks_list``)
  244. **outside** the lock — only mutates the caller-supplied
  245. value dicts, not shared state.
  246. 2. Under ``_storage_lock``, ``self._data.update(data)`` and
  247. ``set_all_update_flags`` to mark every process dirty.
  248. 3. Await ``index_done_callback`` for an immediate flush.
  249. See ``JsonKVStorage`` class docstring for the shared-memory +
  250. dirty-flag protocol that underpins step 2.
  251. """
  252. if not data:
  253. return
  254. logger.debug(
  255. f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}"
  256. )
  257. if self._storage_lock is None:
  258. raise StorageNotInitializedError("JsonDocStatusStorage")
  259. # Prepare data outside the lock: this only mutates the caller-supplied
  260. # dict values, not shared storage state, so no lock needed here.
  261. for i, (doc_id, doc_data) in enumerate(data.items(), start=1):
  262. if "chunks_list" not in doc_data:
  263. doc_data["chunks_list"] = []
  264. await _cooperative_yield(i)
  265. async with self._storage_lock:
  266. self._data.update(data)
  267. await set_all_update_flags(self.namespace, workspace=self.workspace)
  268. await self.index_done_callback()
  269. async def is_empty(self) -> bool:
  270. """Check if the storage is empty
  271. Returns:
  272. bool: True if storage is empty, False otherwise
  273. Raises:
  274. StorageNotInitializedError: If storage is not initialized
  275. """
  276. if self._storage_lock is None:
  277. raise StorageNotInitializedError("JsonDocStatusStorage")
  278. async with self._storage_lock:
  279. return len(self._data) == 0
  280. async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
  281. async with self._storage_lock:
  282. return self._data.get(id)
  283. async def get_docs_paginated(
  284. self,
  285. status_filter: DocStatus | None = None,
  286. status_filters: list[DocStatus] | None = None,
  287. page: int = 1,
  288. page_size: int = 50,
  289. sort_field: str = "updated_at",
  290. sort_direction: str = "desc",
  291. ) -> tuple[list[tuple[str, DocProcessingStatus]], int]:
  292. """Get documents with pagination support
  293. Args:
  294. status_filter: Filter by document status, None for all statuses
  295. page: Page number (1-based)
  296. page_size: Number of documents per page (10-200)
  297. sort_field: Field to sort by ('created_at', 'updated_at', 'id')
  298. sort_direction: Sort direction ('asc' or 'desc')
  299. Returns:
  300. Tuple of (list of (doc_id, DocProcessingStatus) tuples, total_count)
  301. """
  302. status_filter_values = self.resolve_status_filter_values(
  303. status_filter=status_filter,
  304. status_filters=status_filters,
  305. )
  306. # Validate parameters
  307. if page < 1:
  308. page = 1
  309. if page_size < 10:
  310. page_size = 10
  311. elif page_size > 200:
  312. page_size = 200
  313. if sort_field not in ["created_at", "updated_at", "id", "file_path"]:
  314. sort_field = "updated_at"
  315. if sort_direction.lower() not in ["asc", "desc"]:
  316. sort_direction = "desc"
  317. # For JSON storage, we load all data and sort/filter in memory
  318. all_docs = []
  319. async with self._storage_lock:
  320. for doc_id, doc_data in self._data.items():
  321. # Apply status filter
  322. if (
  323. status_filter_values is not None
  324. and doc_data.get("status") not in status_filter_values
  325. ):
  326. continue
  327. try:
  328. # Prepare document data
  329. data = doc_data.copy()
  330. data.pop("content", None)
  331. if not data.get("file_path"):
  332. data["file_path"] = "no-file-path"
  333. if "metadata" not in data:
  334. data["metadata"] = {}
  335. if "error_msg" not in data:
  336. data["error_msg"] = None
  337. doc_status = DocProcessingStatus(**data)
  338. # Add sort key for sorting
  339. if sort_field == "id":
  340. doc_status._sort_key = doc_id
  341. elif sort_field == "file_path":
  342. # Use pinyin sorting for file_path field to support Chinese characters
  343. file_path_value = getattr(doc_status, sort_field, "")
  344. doc_status._sort_key = get_pinyin_sort_key(file_path_value)
  345. else:
  346. doc_status._sort_key = getattr(doc_status, sort_field, "")
  347. all_docs.append((doc_id, doc_status))
  348. except KeyError as e:
  349. logger.error(
  350. f"[{self.workspace}] Error processing document {doc_id}: {e}"
  351. )
  352. continue
  353. # Sort documents
  354. reverse_sort = sort_direction.lower() == "desc"
  355. all_docs.sort(
  356. key=lambda x: getattr(x[1], "_sort_key", ""), reverse=reverse_sort
  357. )
  358. # Remove sort key from documents
  359. for doc_id, doc in all_docs:
  360. if hasattr(doc, "_sort_key"):
  361. delattr(doc, "_sort_key")
  362. total_count = len(all_docs)
  363. # Apply pagination
  364. start_idx = (page - 1) * page_size
  365. end_idx = start_idx + page_size
  366. paginated_docs = all_docs[start_idx:end_idx]
  367. return paginated_docs, total_count
  368. async def get_all_status_counts(self) -> dict[str, int]:
  369. """Get counts of documents in each status for all documents
  370. Returns:
  371. Dictionary mapping status names to counts, including 'all' field
  372. """
  373. counts = await self.get_status_counts()
  374. # Add 'all' field with total count
  375. total_count = sum(counts.values())
  376. counts["all"] = total_count
  377. return counts
  378. async def delete(self, doc_ids: list[str]) -> None:
  379. """Remove doc-status records from shared memory.
  380. Unlike ``upsert``, ``delete`` does **not** force an immediate
  381. flush — it follows the standard deferred-commit pattern.
  382. Persistence happens at the next ``index_done_callback``
  383. (driven by the pipeline's ``_insert_done()`` at end of batch).
  384. Only calls ``set_all_update_flags`` if at least one key was
  385. actually present (avoids creating spurious dirty state for
  386. no-op deletes).
  387. Args:
  388. doc_ids: List of document IDs to be deleted from storage
  389. """
  390. async with self._storage_lock:
  391. any_deleted = False
  392. for doc_id in doc_ids:
  393. result = self._data.pop(doc_id, None)
  394. if result is not None:
  395. any_deleted = True
  396. if any_deleted:
  397. await set_all_update_flags(self.namespace, workspace=self.workspace)
  398. async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
  399. """Get document by file path
  400. Args:
  401. file_path: The file path to search for
  402. Returns:
  403. Union[dict[str, Any], None]: Document data if found, None otherwise
  404. Returns the same format as get_by_ids method
  405. """
  406. if self._storage_lock is None:
  407. raise StorageNotInitializedError("JsonDocStatusStorage")
  408. async with self._storage_lock:
  409. for doc_id, doc_data in self._data.items():
  410. if doc_data.get("file_path") == file_path:
  411. # Return complete document data, consistent with get_by_ids method
  412. return doc_data
  413. return None
  414. async def get_doc_by_file_basename(
  415. self, basename: str
  416. ) -> Union[tuple[str, dict[str, Any]], None]:
  417. """Find an existing record whose canonical basename matches.
  418. The caller is responsible for passing an already-canonical basename.
  419. Stored ``file_path`` values are canonicalized by the business layer, so
  420. this lookup intentionally performs an exact match only.
  421. """
  422. if not basename:
  423. return None
  424. if self._storage_lock is None:
  425. raise StorageNotInitializedError("JsonDocStatusStorage")
  426. if basename == "unknown_source":
  427. return None
  428. async with self._storage_lock:
  429. for doc_id, doc_data in self._data.items():
  430. if doc_data.get("file_path") == basename:
  431. return doc_id, doc_data
  432. return None
  433. async def get_doc_by_content_hash(
  434. self, content_hash: str
  435. ) -> Union[tuple[str, dict[str, Any]], None]:
  436. """Find an existing record whose content_hash field matches."""
  437. if not content_hash:
  438. return None
  439. if self._storage_lock is None:
  440. raise StorageNotInitializedError("JsonDocStatusStorage")
  441. async with self._storage_lock:
  442. for doc_id, doc_data in self._data.items():
  443. if doc_data.get("content_hash") == content_hash:
  444. return doc_id, doc_data
  445. return None
  446. async def drop(self) -> dict[str, str]:
  447. """Clear shared memory and immediately persist the empty state.
  448. This method will:
  449. 1. Clear the shared ``self._data`` dict under
  450. ``_storage_lock`` (visible to all processes immediately).
  451. 2. ``set_all_update_flags`` so every process knows there is
  452. dirty state pending persistence.
  453. 3. Call ``index_done_callback`` synchronously to flush the
  454. empty state to disk and clear the dirty flags.
  455. Caller contract:
  456. ``drop`` is destructive and **not** serialized by this
  457. storage class. The caller must hold the pipeline ``busy``
  458. reservation (the ``/documents/clear`` endpoint does this)
  459. before invoking it. See class docstring,
  460. *Non-pipeline write paths*.
  461. Returns:
  462. dict[str, str]: Operation status and message
  463. - On success: {"status": "success", "message": "data dropped"}
  464. - On failure: {"status": "error", "message": "<error details>"}
  465. """
  466. try:
  467. async with self._storage_lock:
  468. self._data.clear()
  469. await set_all_update_flags(self.namespace, workspace=self.workspace)
  470. await self.index_done_callback()
  471. logger.info(
  472. f"[{self.workspace}] Process {os.getpid()} drop {self.namespace}"
  473. )
  474. return {"status": "success", "message": "data dropped"}
  475. except Exception as e:
  476. logger.error(f"[{self.workspace}] Error dropping {self.namespace}: {e}")
  477. return {"status": "error", "message": str(e)}