shared_storage.py 65 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742
  1. import os
  2. import sys
  3. import asyncio
  4. import multiprocessing as mp
  5. from multiprocessing.synchronize import Lock as ProcessLock
  6. from multiprocessing import Manager
  7. import time
  8. import logging
  9. from contextvars import ContextVar
  10. from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
  11. from lightrag.exceptions import PipelineNotInitializedError
  12. DEBUG_LOCKS = False
  13. # Define a direct print function for critical logs that must be visible in all processes
  14. def direct_log(message, enable_output: bool = True, level: str = "DEBUG"):
  15. """
  16. Log a message directly to stderr to ensure visibility in all processes,
  17. including the Gunicorn master process.
  18. Args:
  19. message: The message to log
  20. level: Log level for message (control the visibility of the message by comparing with the current logger level)
  21. enable_output: Enable or disable log message (Force to turn off the message,)
  22. """
  23. if not enable_output:
  24. return
  25. # Get the current logger level from the lightrag logger
  26. try:
  27. from lightrag.utils import logger
  28. current_level = logger.getEffectiveLevel()
  29. except ImportError:
  30. # Fallback if lightrag.utils is not available
  31. current_level = 20 # INFO
  32. # Convert string level to numeric level for comparison
  33. level_mapping = {
  34. "DEBUG": 10, # DEBUG
  35. "INFO": 20, # INFO
  36. "WARNING": 30, # WARNING
  37. "ERROR": 40, # ERROR
  38. "CRITICAL": 50, # CRITICAL
  39. }
  40. message_level = level_mapping.get(level.upper(), logging.DEBUG)
  41. if message_level >= current_level:
  42. print(f"{level}: {message}", file=sys.stderr, flush=True)
  43. T = TypeVar("T")
  44. LockType = Union[ProcessLock, asyncio.Lock]
  45. _is_multiprocess = None
  46. _workers = None
  47. _manager = None
  48. # Global singleton data for multi-process keyed locks
  49. _lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None
  50. _lock_registry_count: Optional[Dict[str, int]] = None
  51. _lock_cleanup_data: Optional[Dict[str, time.time]] = None
  52. _registry_guard = None
  53. # Timeout for keyed locks in seconds (Default 300)
  54. CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
  55. # Cleanup pending list threshold for triggering cleanup (Default 500)
  56. CLEANUP_THRESHOLD = 500
  57. # Minimum interval between cleanup operations in seconds (Default 30)
  58. MIN_CLEANUP_INTERVAL_SECONDS = 30
  59. # Track the earliest cleanup time for efficient cleanup triggering (multiprocess locks only)
  60. _earliest_mp_cleanup_time: Optional[float] = None
  61. # Track the last cleanup time to enforce minimum interval (multiprocess locks only)
  62. _last_mp_cleanup_time: Optional[float] = None
  63. _initialized = None
  64. # Default workspace for backward compatibility
  65. _default_workspace: Optional[str] = None
  66. # shared data for storage across processes
  67. _shared_dicts: Optional[Dict[str, Any]] = None
  68. _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
  69. _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
  70. # locks for mutex access
  71. _internal_lock: Optional[LockType] = None
  72. _data_init_lock: Optional[LockType] = None
  73. # Manager for all keyed locks
  74. _storage_keyed_lock: Optional["KeyedUnifiedLock"] = None
  75. # async locks for coroutine synchronization in multiprocess mode
  76. _async_locks: Optional[Dict[str, asyncio.Lock]] = None
  77. _debug_n_locks_acquired: int = 0
  78. def get_final_namespace(namespace: str, workspace: str | None = None):
  79. global _default_workspace
  80. if workspace is None:
  81. workspace = _default_workspace
  82. if workspace is None:
  83. direct_log(
  84. f"Error: Invoke namespace operation without workspace, pid={os.getpid()}",
  85. level="ERROR",
  86. )
  87. raise ValueError("Invoke namespace operation without workspace")
  88. final_namespace = f"{workspace}:{namespace}" if workspace else f"{namespace}"
  89. return final_namespace
  90. def inc_debug_n_locks_acquired():
  91. global _debug_n_locks_acquired
  92. if DEBUG_LOCKS:
  93. _debug_n_locks_acquired += 1
  94. print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}")
  95. def dec_debug_n_locks_acquired():
  96. global _debug_n_locks_acquired
  97. if DEBUG_LOCKS:
  98. if _debug_n_locks_acquired > 0:
  99. _debug_n_locks_acquired -= 1
  100. print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}")
  101. else:
  102. raise RuntimeError("Attempting to release lock when no locks are acquired")
  103. def get_debug_n_locks_acquired():
  104. global _debug_n_locks_acquired
  105. return _debug_n_locks_acquired
  106. class UnifiedLock(Generic[T]):
  107. """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
  108. def __init__(
  109. self,
  110. lock: Union[ProcessLock, asyncio.Lock],
  111. is_async: bool,
  112. name: str = "unnamed",
  113. enable_logging: bool = True,
  114. async_lock: Optional[asyncio.Lock] = None,
  115. ):
  116. self._lock = lock
  117. self._is_async = is_async
  118. self._pid = os.getpid() # for debug only
  119. self._name = name # for debug only
  120. self._enable_logging = enable_logging # for debug only
  121. self._async_lock = async_lock # auxiliary lock for coroutine synchronization
  122. async def __aenter__(self) -> "UnifiedLock[T]":
  123. try:
  124. # If in multiprocess mode and async lock exists, acquire it first
  125. if not self._is_async and self._async_lock is not None:
  126. await self._async_lock.acquire()
  127. direct_log(
  128. f"== Lock == Process {self._pid}: Acquired async lock '{self._name}",
  129. level="DEBUG",
  130. enable_output=self._enable_logging,
  131. )
  132. # Acquire the main lock
  133. # Note: self._lock should never be None here as the check has been moved
  134. # to get_internal_lock() and get_data_init_lock() functions
  135. if self._is_async:
  136. await self._lock.acquire()
  137. else:
  138. self._lock.acquire()
  139. direct_log(
  140. f"== Lock == Process {self._pid}: Acquired lock {self._name} (async={self._is_async})",
  141. level="INFO",
  142. enable_output=self._enable_logging,
  143. )
  144. return self
  145. except Exception as e:
  146. # If main lock acquisition fails, release the async lock if it was acquired
  147. if (
  148. not self._is_async
  149. and self._async_lock is not None
  150. and self._async_lock.locked()
  151. ):
  152. self._async_lock.release()
  153. direct_log(
  154. f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
  155. level="ERROR",
  156. enable_output=True,
  157. )
  158. raise
  159. async def __aexit__(self, exc_type, exc_val, exc_tb):
  160. main_lock_released = False
  161. async_lock_released = False
  162. try:
  163. # Release main lock first
  164. if self._lock is not None:
  165. if self._is_async:
  166. self._lock.release()
  167. else:
  168. self._lock.release()
  169. direct_log(
  170. f"== Lock == Process {self._pid}: Released lock {self._name} (async={self._is_async})",
  171. level="INFO",
  172. enable_output=self._enable_logging,
  173. )
  174. main_lock_released = True
  175. # Then release async lock if in multiprocess mode
  176. if not self._is_async and self._async_lock is not None:
  177. self._async_lock.release()
  178. direct_log(
  179. f"== Lock == Process {self._pid}: Released async lock {self._name}",
  180. level="DEBUG",
  181. enable_output=self._enable_logging,
  182. )
  183. async_lock_released = True
  184. except Exception as e:
  185. direct_log(
  186. f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
  187. level="ERROR",
  188. enable_output=True,
  189. )
  190. # If main lock release failed but async lock hasn't been attempted yet, try to release it
  191. if (
  192. not main_lock_released
  193. and not async_lock_released
  194. and not self._is_async
  195. and self._async_lock is not None
  196. ):
  197. try:
  198. direct_log(
  199. f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
  200. level="DEBUG",
  201. enable_output=self._enable_logging,
  202. )
  203. self._async_lock.release()
  204. direct_log(
  205. f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
  206. level="INFO",
  207. enable_output=self._enable_logging,
  208. )
  209. except Exception as inner_e:
  210. direct_log(
  211. f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
  212. level="ERROR",
  213. enable_output=True,
  214. )
  215. raise
  216. def __enter__(self) -> "UnifiedLock[T]":
  217. """For backward compatibility"""
  218. try:
  219. if self._is_async:
  220. raise RuntimeError("Use 'async with' for shared_storage lock")
  221. # Acquire the main lock
  222. # Note: self._lock should never be None here as the check has been moved
  223. # to get_internal_lock() and get_data_init_lock() functions
  224. direct_log(
  225. f"== Lock == Process {self._pid}: Acquiring lock {self._name} (sync)",
  226. level="DEBUG",
  227. enable_output=self._enable_logging,
  228. )
  229. self._lock.acquire()
  230. direct_log(
  231. f"== Lock == Process {self._pid}: Acquired lock {self._name} (sync)",
  232. level="INFO",
  233. enable_output=self._enable_logging,
  234. )
  235. return self
  236. except Exception as e:
  237. direct_log(
  238. f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}",
  239. level="ERROR",
  240. enable_output=True,
  241. )
  242. raise
  243. def __exit__(self, exc_type, exc_val, exc_tb):
  244. """For backward compatibility"""
  245. try:
  246. if self._is_async:
  247. raise RuntimeError("Use 'async with' for shared_storage lock")
  248. direct_log(
  249. f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)",
  250. level="DEBUG",
  251. enable_output=self._enable_logging,
  252. )
  253. self._lock.release()
  254. direct_log(
  255. f"== Lock == Process {self._pid}: Released lock {self._name} (sync)",
  256. level="INFO",
  257. enable_output=self._enable_logging,
  258. )
  259. except Exception as e:
  260. direct_log(
  261. f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}",
  262. level="ERROR",
  263. enable_output=True,
  264. )
  265. raise
  266. def locked(self) -> bool:
  267. if self._is_async:
  268. return self._lock.locked()
  269. else:
  270. return self._lock.locked()
  271. def _get_combined_key(factory_name: str, key: str) -> str:
  272. """Return the combined key for the factory and key."""
  273. return f"{factory_name}:{key}"
  274. def _perform_lock_cleanup(
  275. lock_type: str,
  276. cleanup_data: Dict[str, float],
  277. lock_registry: Optional[Dict[str, Any]],
  278. lock_count: Optional[Dict[str, int]],
  279. earliest_cleanup_time: Optional[float],
  280. last_cleanup_time: Optional[float],
  281. current_time: float,
  282. threshold_check: bool = True,
  283. ) -> tuple[int, Optional[float], Optional[float]]:
  284. """
  285. Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks.
  286. Args:
  287. lock_type: Lock type identifier ("mp" or "async")
  288. cleanup_data: Cleanup data dictionary
  289. lock_registry: Lock registry dictionary (can be None for async locks)
  290. lock_count: Lock count dictionary (can be None for async locks)
  291. earliest_cleanup_time: Earliest cleanup time
  292. last_cleanup_time: Last cleanup time
  293. current_time: Current time
  294. threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks)
  295. Returns:
  296. tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time)
  297. """
  298. if len(cleanup_data) == 0:
  299. return 0, earliest_cleanup_time, last_cleanup_time
  300. # If threshold check is needed and threshold not reached, return directly
  301. if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD:
  302. return 0, earliest_cleanup_time, last_cleanup_time
  303. # Time rollback detection
  304. if last_cleanup_time is not None and current_time < last_cleanup_time:
  305. direct_log(
  306. f"== {lock_type} Lock == Time rollback detected, resetting cleanup time",
  307. level="WARNING",
  308. enable_output=False,
  309. )
  310. last_cleanup_time = None
  311. # Check cleanup conditions
  312. has_expired_locks = (
  313. earliest_cleanup_time is not None
  314. and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
  315. )
  316. interval_satisfied = (
  317. last_cleanup_time is None
  318. or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
  319. )
  320. if not (has_expired_locks and interval_satisfied):
  321. return 0, earliest_cleanup_time, last_cleanup_time
  322. try:
  323. cleaned_count = 0
  324. new_earliest_time = None
  325. # Calculate total count before cleanup
  326. total_cleanup_len = len(cleanup_data)
  327. # Perform cleanup operation
  328. for cleanup_key, cleanup_time in list(cleanup_data.items()):
  329. if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
  330. # Remove from cleanup data
  331. cleanup_data.pop(cleanup_key, None)
  332. # Remove from lock registry if exists
  333. if lock_registry is not None:
  334. lock_registry.pop(cleanup_key, None)
  335. if lock_count is not None:
  336. lock_count.pop(cleanup_key, None)
  337. cleaned_count += 1
  338. else:
  339. # Track the earliest time among remaining locks
  340. if new_earliest_time is None or cleanup_time < new_earliest_time:
  341. new_earliest_time = cleanup_time
  342. # Update state only after successful cleanup
  343. if cleaned_count > 0:
  344. new_last_cleanup_time = current_time
  345. # Log cleanup results
  346. next_cleanup_in = max(
  347. (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time)
  348. if new_earliest_time
  349. else float("inf"),
  350. MIN_CLEANUP_INTERVAL_SECONDS,
  351. )
  352. if lock_type == "async":
  353. direct_log(
  354. f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, "
  355. f"next cleanup in {next_cleanup_in:.1f}s",
  356. enable_output=False,
  357. level="INFO",
  358. )
  359. else:
  360. direct_log(
  361. f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
  362. f"next cleanup in {next_cleanup_in:.1f}s",
  363. enable_output=False,
  364. level="INFO",
  365. )
  366. return cleaned_count, new_earliest_time, new_last_cleanup_time
  367. else:
  368. return 0, earliest_cleanup_time, last_cleanup_time
  369. except Exception as e:
  370. direct_log(
  371. f"== {lock_type} Lock == Cleanup failed: {e}",
  372. level="ERROR",
  373. enable_output=True,
  374. )
  375. return 0, earliest_cleanup_time, last_cleanup_time
  376. def _get_or_create_shared_raw_mp_lock(
  377. factory_name: str, key: str
  378. ) -> Optional[mp.synchronize.Lock]:
  379. """Return the *singleton* manager.Lock() proxy for keyed lock, creating if needed."""
  380. if not _is_multiprocess:
  381. return None
  382. with _registry_guard:
  383. combined_key = _get_combined_key(factory_name, key)
  384. raw = _lock_registry.get(combined_key)
  385. count = _lock_registry_count.get(combined_key)
  386. if raw is None:
  387. raw = _manager.Lock()
  388. _lock_registry[combined_key] = raw
  389. count = 0
  390. else:
  391. if count is None:
  392. raise RuntimeError(
  393. f"Shared-Data lock registry for {factory_name} is corrupted for key {key}"
  394. )
  395. if (
  396. count == 0 and combined_key in _lock_cleanup_data
  397. ): # Reusing an key waiting for cleanup, remove it from cleanup list
  398. _lock_cleanup_data.pop(combined_key)
  399. count += 1
  400. _lock_registry_count[combined_key] = count
  401. return raw
  402. def _release_shared_raw_mp_lock(factory_name: str, key: str):
  403. """Release the *singleton* manager.Lock() proxy for *key*."""
  404. if not _is_multiprocess:
  405. return
  406. global _earliest_mp_cleanup_time, _last_mp_cleanup_time
  407. with _registry_guard:
  408. combined_key = _get_combined_key(factory_name, key)
  409. raw = _lock_registry.get(combined_key)
  410. count = _lock_registry_count.get(combined_key)
  411. if raw is None and count is None:
  412. return
  413. elif raw is None or count is None:
  414. raise RuntimeError(
  415. f"Shared-Data lock registry for {factory_name} is corrupted for key {key}"
  416. )
  417. count -= 1
  418. if count < 0:
  419. raise RuntimeError(
  420. f"Attempting to release lock for {key} more times than it was acquired"
  421. )
  422. _lock_registry_count[combined_key] = count
  423. current_time = time.time()
  424. if count == 0:
  425. _lock_cleanup_data[combined_key] = current_time
  426. # Update earliest multiprocess cleanup time (only when earlier)
  427. if (
  428. _earliest_mp_cleanup_time is None
  429. or current_time < _earliest_mp_cleanup_time
  430. ):
  431. _earliest_mp_cleanup_time = current_time
  432. # Use generic cleanup function
  433. cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
  434. lock_type="mp",
  435. cleanup_data=_lock_cleanup_data,
  436. lock_registry=_lock_registry,
  437. lock_count=_lock_registry_count,
  438. earliest_cleanup_time=_earliest_mp_cleanup_time,
  439. last_cleanup_time=_last_mp_cleanup_time,
  440. current_time=current_time,
  441. threshold_check=True,
  442. )
  443. # Update global state if cleanup was performed
  444. if cleaned_count > 0:
  445. _earliest_mp_cleanup_time = new_earliest_time
  446. _last_mp_cleanup_time = new_last_cleanup_time
  447. class KeyedUnifiedLock:
  448. """
  449. Manager for unified keyed locks, supporting both single and multi-process
  450. • Keeps only a table of async keyed locks locally
  451. • Fetches the multi-process keyed lock on every acquire
  452. • Builds a fresh `UnifiedLock` each time, so `enable_logging`
  453. (or future options) can vary per call.
  454. • Supports dynamic namespaces specified at lock usage time
  455. """
  456. def __init__(self, *, default_enable_logging: bool = True) -> None:
  457. self._default_enable_logging = default_enable_logging
  458. self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks
  459. self._async_lock_count: Dict[
  460. str, int
  461. ] = {} # local keyed locks referenced count
  462. self._async_lock_cleanup_data: Dict[
  463. str, time.time
  464. ] = {} # local keyed locks timeout
  465. self._mp_locks: Dict[
  466. str, mp.synchronize.Lock
  467. ] = {} # multi-process lock proxies
  468. self._earliest_async_cleanup_time: Optional[float] = (
  469. None # track earliest async cleanup time
  470. )
  471. self._last_async_cleanup_time: Optional[float] = (
  472. None # track last async cleanup time for minimum interval
  473. )
  474. def __call__(
  475. self, namespace: str, keys: list[str], *, enable_logging: Optional[bool] = None
  476. ):
  477. """
  478. Ergonomic helper so you can write:
  479. async with storage_keyed_lock("namespace", ["key1", "key2"]):
  480. ...
  481. """
  482. if enable_logging is None:
  483. enable_logging = self._default_enable_logging
  484. return _KeyedLockContext(
  485. self,
  486. namespace=namespace,
  487. keys=keys,
  488. enable_logging=enable_logging,
  489. )
  490. def _get_or_create_async_lock(self, combined_key: str) -> asyncio.Lock:
  491. async_lock = self._async_lock.get(combined_key)
  492. count = self._async_lock_count.get(combined_key, 0)
  493. if async_lock is None:
  494. async_lock = asyncio.Lock()
  495. self._async_lock[combined_key] = async_lock
  496. elif count == 0 and combined_key in self._async_lock_cleanup_data:
  497. self._async_lock_cleanup_data.pop(combined_key)
  498. count += 1
  499. self._async_lock_count[combined_key] = count
  500. return async_lock
  501. def _release_async_lock(self, combined_key: str):
  502. count = self._async_lock_count.get(combined_key, 0)
  503. count -= 1
  504. current_time = time.time()
  505. if count == 0:
  506. self._async_lock_cleanup_data[combined_key] = current_time
  507. # Update earliest async cleanup time (only when earlier)
  508. if (
  509. self._earliest_async_cleanup_time is None
  510. or current_time < self._earliest_async_cleanup_time
  511. ):
  512. self._earliest_async_cleanup_time = current_time
  513. self._async_lock_count[combined_key] = count
  514. # Use generic cleanup function
  515. cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
  516. lock_type="async",
  517. cleanup_data=self._async_lock_cleanup_data,
  518. lock_registry=self._async_lock,
  519. lock_count=self._async_lock_count,
  520. earliest_cleanup_time=self._earliest_async_cleanup_time,
  521. last_cleanup_time=self._last_async_cleanup_time,
  522. current_time=current_time,
  523. threshold_check=True,
  524. )
  525. # Update instance state if cleanup was performed
  526. if cleaned_count > 0:
  527. self._earliest_async_cleanup_time = new_earliest_time
  528. self._last_async_cleanup_time = new_last_cleanup_time
  529. def _get_lock_for_key(
  530. self, namespace: str, key: str, enable_logging: bool = False
  531. ) -> UnifiedLock:
  532. # 1. Create combined key for this namespace:key combination
  533. combined_key = _get_combined_key(namespace, key)
  534. # 2. get (or create) the per‑process async gate for this combined key
  535. # Is synchronous, so no need to acquire a lock
  536. async_lock = self._get_or_create_async_lock(combined_key)
  537. # 3. fetch the shared raw lock
  538. raw_lock = _get_or_create_shared_raw_mp_lock(namespace, key)
  539. is_multiprocess = raw_lock is not None
  540. if not is_multiprocess:
  541. raw_lock = async_lock
  542. # 4. build a *fresh* UnifiedLock with the chosen logging flag
  543. if is_multiprocess:
  544. return UnifiedLock(
  545. lock=raw_lock,
  546. is_async=False, # manager.Lock is synchronous
  547. name=combined_key,
  548. enable_logging=enable_logging,
  549. async_lock=async_lock, # prevents event‑loop blocking
  550. )
  551. else:
  552. return UnifiedLock(
  553. lock=raw_lock,
  554. is_async=True,
  555. name=combined_key,
  556. enable_logging=enable_logging,
  557. async_lock=None, # No need for async lock in single process mode
  558. )
  559. def _release_lock_for_key(self, namespace: str, key: str):
  560. combined_key = _get_combined_key(namespace, key)
  561. self._release_async_lock(combined_key)
  562. _release_shared_raw_mp_lock(namespace, key)
  563. def cleanup_expired_locks(self) -> Dict[str, Any]:
  564. """
  565. Cleanup expired locks for both async and multiprocess locks following the same
  566. conditions as _release_shared_raw_mp_lock and _release_async_lock functions.
  567. Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met
  568. to avoid too frequent cleanup operations.
  569. Since async and multiprocess locks work together, this method cleans up
  570. both types of expired locks and returns comprehensive statistics.
  571. Returns:
  572. Dict containing cleanup statistics and current status:
  573. {
  574. "process_id": 12345,
  575. "cleanup_performed": {
  576. "mp_cleaned": 5,
  577. "async_cleaned": 3
  578. },
  579. "current_status": {
  580. "total_mp_locks": 10,
  581. "pending_mp_cleanup": 2,
  582. "total_async_locks": 8,
  583. "pending_async_cleanup": 1
  584. }
  585. }
  586. """
  587. global _lock_registry, _lock_registry_count, _lock_cleanup_data
  588. global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time
  589. cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0}
  590. current_time = time.time()
  591. # 1. Cleanup multiprocess locks using generic function
  592. if (
  593. _is_multiprocess
  594. and _lock_registry is not None
  595. and _registry_guard is not None
  596. ):
  597. try:
  598. with _registry_guard:
  599. if _lock_cleanup_data is not None:
  600. # Use generic cleanup function without threshold check
  601. cleaned_count, new_earliest_time, new_last_cleanup_time = (
  602. _perform_lock_cleanup(
  603. lock_type="mp",
  604. cleanup_data=_lock_cleanup_data,
  605. lock_registry=_lock_registry,
  606. lock_count=_lock_registry_count,
  607. earliest_cleanup_time=_earliest_mp_cleanup_time,
  608. last_cleanup_time=_last_mp_cleanup_time,
  609. current_time=current_time,
  610. threshold_check=False, # Force cleanup in cleanup_expired_locks
  611. )
  612. )
  613. # Update global state if cleanup was performed
  614. if cleaned_count > 0:
  615. _earliest_mp_cleanup_time = new_earliest_time
  616. _last_mp_cleanup_time = new_last_cleanup_time
  617. cleanup_stats["mp_cleaned"] = cleaned_count
  618. except Exception as e:
  619. direct_log(
  620. f"Error during multiprocess lock cleanup: {e}",
  621. level="ERROR",
  622. enable_output=True,
  623. )
  624. # 2. Cleanup async locks using generic function
  625. try:
  626. # Use generic cleanup function without threshold check
  627. cleaned_count, new_earliest_time, new_last_cleanup_time = (
  628. _perform_lock_cleanup(
  629. lock_type="async",
  630. cleanup_data=self._async_lock_cleanup_data,
  631. lock_registry=self._async_lock,
  632. lock_count=self._async_lock_count,
  633. earliest_cleanup_time=self._earliest_async_cleanup_time,
  634. last_cleanup_time=self._last_async_cleanup_time,
  635. current_time=current_time,
  636. threshold_check=False, # Force cleanup in cleanup_expired_locks
  637. )
  638. )
  639. # Update instance state if cleanup was performed
  640. if cleaned_count > 0:
  641. self._earliest_async_cleanup_time = new_earliest_time
  642. self._last_async_cleanup_time = new_last_cleanup_time
  643. cleanup_stats["async_cleaned"] = cleaned_count
  644. except Exception as e:
  645. direct_log(
  646. f"Error during async lock cleanup: {e}",
  647. level="ERROR",
  648. enable_output=True,
  649. )
  650. # 3. Get current status after cleanup
  651. current_status = self.get_lock_status()
  652. return {
  653. "process_id": os.getpid(),
  654. "cleanup_performed": cleanup_stats,
  655. "current_status": current_status,
  656. }
  657. def get_lock_status(self) -> Dict[str, int]:
  658. """
  659. Get current status of both async and multiprocess locks.
  660. Returns comprehensive lock counts for both types of locks since
  661. they work together in the keyed lock system.
  662. Returns:
  663. Dict containing lock counts:
  664. {
  665. "total_mp_locks": 10,
  666. "pending_mp_cleanup": 2,
  667. "total_async_locks": 8,
  668. "pending_async_cleanup": 1
  669. }
  670. """
  671. global _lock_registry_count, _lock_cleanup_data, _registry_guard
  672. status = {
  673. "total_mp_locks": 0,
  674. "pending_mp_cleanup": 0,
  675. "total_async_locks": 0,
  676. "pending_async_cleanup": 0,
  677. }
  678. try:
  679. # Count multiprocess locks
  680. if _is_multiprocess and _lock_registry_count is not None:
  681. if _registry_guard is not None:
  682. with _registry_guard:
  683. status["total_mp_locks"] = len(_lock_registry_count)
  684. if _lock_cleanup_data is not None:
  685. status["pending_mp_cleanup"] = len(_lock_cleanup_data)
  686. # Count async locks
  687. status["total_async_locks"] = len(self._async_lock_count)
  688. status["pending_async_cleanup"] = len(self._async_lock_cleanup_data)
  689. except Exception as e:
  690. direct_log(
  691. f"Error getting keyed lock status: {e}",
  692. level="ERROR",
  693. enable_output=True,
  694. )
  695. return status
  696. class _KeyedLockContext:
  697. def __init__(
  698. self,
  699. parent: KeyedUnifiedLock,
  700. namespace: str,
  701. keys: list[str],
  702. enable_logging: bool,
  703. ) -> None:
  704. self._parent = parent
  705. self._namespace = namespace
  706. # The sorting is critical to ensure proper lock and release order
  707. # to avoid deadlocks
  708. self._keys = sorted(keys)
  709. self._enable_logging = (
  710. enable_logging
  711. if enable_logging is not None
  712. else parent._default_enable_logging
  713. )
  714. self._ul: Optional[List[Dict[str, Any]]] = None # set in __aenter__
  715. # ----- enter -----
  716. async def __aenter__(self):
  717. if self._ul is not None:
  718. raise RuntimeError("KeyedUnifiedLock already acquired in current context")
  719. self._ul = []
  720. try:
  721. # Acquire locks for all keys in the namespace
  722. for key in self._keys:
  723. lock = None
  724. entry = None
  725. try:
  726. # 1. Get lock object (reference count is incremented here)
  727. lock = self._parent._get_lock_for_key(
  728. self._namespace, key, enable_logging=self._enable_logging
  729. )
  730. # 2. Immediately create and add entry to list (critical for rollback to work)
  731. entry = {
  732. "key": key,
  733. "lock": lock,
  734. "entered": False,
  735. "debug_inc": False,
  736. "ref_incremented": True, # Mark that reference count has been incremented
  737. }
  738. self._ul.append(
  739. entry
  740. ) # Add immediately after _get_lock_for_key for rollback to work
  741. # 3. Try to acquire the lock
  742. # Use try-finally to ensure state is updated atomically
  743. lock_acquired = False
  744. try:
  745. await lock.__aenter__()
  746. lock_acquired = True # Lock successfully acquired
  747. finally:
  748. if lock_acquired:
  749. entry["entered"] = True
  750. inc_debug_n_locks_acquired()
  751. entry["debug_inc"] = True
  752. except asyncio.CancelledError:
  753. # Lock acquisition was cancelled
  754. # The finally block above ensures entry["entered"] is correct
  755. direct_log(
  756. f"Lock acquisition cancelled for key {key}",
  757. level="WARNING",
  758. enable_output=self._enable_logging,
  759. )
  760. raise
  761. except Exception as e:
  762. # Other exceptions, log and re-raise
  763. direct_log(
  764. f"Lock acquisition failed for key {key}: {e}",
  765. level="ERROR",
  766. enable_output=True,
  767. )
  768. raise
  769. return self
  770. except BaseException:
  771. # Critical: if any exception occurs (including CancelledError) during lock acquisition,
  772. # we must rollback all already acquired locks to prevent lock leaks
  773. # Use shield to ensure rollback completes
  774. await asyncio.shield(self._rollback_acquired_locks())
  775. raise
  776. async def _rollback_acquired_locks(self):
  777. """Rollback all acquired locks in case of exception during __aenter__"""
  778. if not self._ul:
  779. return
  780. async def rollback_single_entry(entry):
  781. """Rollback a single lock acquisition"""
  782. key = entry["key"]
  783. lock = entry["lock"]
  784. debug_inc = entry["debug_inc"]
  785. entered = entry["entered"]
  786. ref_incremented = entry.get(
  787. "ref_incremented", True
  788. ) # Default to True for safety
  789. errors = []
  790. # 1. If lock was acquired, release it
  791. if entered:
  792. try:
  793. await lock.__aexit__(None, None, None)
  794. except Exception as e:
  795. errors.append(("lock_exit", e))
  796. direct_log(
  797. f"Lock rollback error for key {key}: {e}",
  798. level="ERROR",
  799. enable_output=True,
  800. )
  801. # 2. Release reference count (if it was incremented)
  802. if ref_incremented:
  803. try:
  804. self._parent._release_lock_for_key(self._namespace, key)
  805. except Exception as e:
  806. errors.append(("ref_release", e))
  807. direct_log(
  808. f"Lock rollback reference release error for key {key}: {e}",
  809. level="ERROR",
  810. enable_output=True,
  811. )
  812. # 3. Decrement debug counter
  813. if debug_inc:
  814. try:
  815. dec_debug_n_locks_acquired()
  816. except Exception as e:
  817. errors.append(("debug_dec", e))
  818. direct_log(
  819. f"Lock rollback counter decrementing error for key {key}: {e}",
  820. level="ERROR",
  821. enable_output=True,
  822. )
  823. return errors
  824. # Release already acquired locks in reverse order
  825. for entry in reversed(self._ul):
  826. # Use shield to protect each lock's rollback
  827. try:
  828. await asyncio.shield(rollback_single_entry(entry))
  829. except Exception as e:
  830. # Log but continue rolling back other locks
  831. direct_log(
  832. f"Lock rollback unexpected error for {entry['key']}: {e}",
  833. level="ERROR",
  834. enable_output=True,
  835. )
  836. self._ul = None
  837. # ----- exit -----
  838. async def __aexit__(self, exc_type, exc, tb):
  839. if self._ul is None:
  840. return
  841. async def release_all_locks():
  842. """Release all locks with comprehensive error handling, protected from cancellation"""
  843. async def release_single_entry(entry, exc_type, exc, tb):
  844. """Release a single lock with full protection"""
  845. key = entry["key"]
  846. lock = entry["lock"]
  847. debug_inc = entry["debug_inc"]
  848. entered = entry["entered"]
  849. errors = []
  850. # 1. Release the lock
  851. if entered:
  852. try:
  853. await lock.__aexit__(exc_type, exc, tb)
  854. except Exception as e:
  855. errors.append(("lock_exit", e))
  856. direct_log(
  857. f"Lock release error for key {key}: {e}",
  858. level="ERROR",
  859. enable_output=True,
  860. )
  861. # 2. Release reference count
  862. try:
  863. self._parent._release_lock_for_key(self._namespace, key)
  864. except Exception as e:
  865. errors.append(("ref_release", e))
  866. direct_log(
  867. f"Lock release reference error for key {key}: {e}",
  868. level="ERROR",
  869. enable_output=True,
  870. )
  871. # 3. Decrement debug counter
  872. if debug_inc:
  873. try:
  874. dec_debug_n_locks_acquired()
  875. except Exception as e:
  876. errors.append(("debug_dec", e))
  877. direct_log(
  878. f"Lock release counter decrementing error for key {key}: {e}",
  879. level="ERROR",
  880. enable_output=True,
  881. )
  882. return errors
  883. all_errors = []
  884. # Release locks in reverse order
  885. # This entire loop is protected by the outer shield
  886. for entry in reversed(self._ul):
  887. try:
  888. errors = await release_single_entry(entry, exc_type, exc, tb)
  889. for error_type, error in errors:
  890. all_errors.append((entry["key"], error_type, error))
  891. except Exception as e:
  892. all_errors.append((entry["key"], "unexpected", e))
  893. direct_log(
  894. f"Lock release unexpected error for {entry['key']}: {e}",
  895. level="ERROR",
  896. enable_output=True,
  897. )
  898. return all_errors
  899. # CRITICAL: Protect the entire release process with shield
  900. # This ensures that even if cancellation occurs, all locks are released
  901. try:
  902. all_errors = await asyncio.shield(release_all_locks())
  903. except Exception as e:
  904. direct_log(
  905. f"Critical error during __aexit__ cleanup: {e}",
  906. level="ERROR",
  907. enable_output=True,
  908. )
  909. all_errors = []
  910. finally:
  911. # Always clear the lock list, even if shield was cancelled
  912. self._ul = None
  913. # If there were release errors and no other exception, raise the first release error
  914. if all_errors and exc_type is None:
  915. raise all_errors[0][2] # (key, error_type, error)
  916. def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
  917. """return unified storage lock for data consistency"""
  918. if _internal_lock is None:
  919. raise RuntimeError(
  920. "Shared data not initialized. Call initialize_share_data() before using locks!"
  921. )
  922. async_lock = _async_locks.get("internal_lock") if _is_multiprocess else None
  923. return UnifiedLock(
  924. lock=_internal_lock,
  925. is_async=not _is_multiprocess,
  926. name="internal_lock",
  927. enable_logging=enable_logging,
  928. async_lock=async_lock,
  929. )
  930. # Workspace based storage_lock is implemented by get_storage_keyed_lock instead.
  931. # Workspace based pipeline_status_lock is implemented by get_storage_keyed_lock instead.
  932. # No need to implement graph_db_lock:
  933. # data integrity is ensured by entity level keyed-lock and allowing only one process to hold pipeline at a time.
  934. def get_storage_keyed_lock(
  935. keys: str | list[str], namespace: str = "default", enable_logging: bool = False
  936. ) -> _KeyedLockContext:
  937. """Return unified storage keyed lock for ensuring atomic operations across different namespaces"""
  938. global _storage_keyed_lock
  939. if _storage_keyed_lock is None:
  940. raise RuntimeError("Shared-Data is not initialized")
  941. if isinstance(keys, str):
  942. keys = [keys]
  943. return _storage_keyed_lock(namespace, keys, enable_logging=enable_logging)
  944. def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
  945. """return unified data initialization lock for ensuring atomic data initialization"""
  946. if _data_init_lock is None:
  947. raise RuntimeError(
  948. "Shared data not initialized. Call initialize_share_data() before using locks!"
  949. )
  950. async_lock = _async_locks.get("data_init_lock") if _is_multiprocess else None
  951. return UnifiedLock(
  952. lock=_data_init_lock,
  953. is_async=not _is_multiprocess,
  954. name="data_init_lock",
  955. enable_logging=enable_logging,
  956. async_lock=async_lock,
  957. )
  958. def cleanup_keyed_lock() -> Dict[str, Any]:
  959. """
  960. Force cleanup of expired keyed locks and return comprehensive status information.
  961. This function actively cleans up expired locks for both async and multiprocess locks,
  962. then returns detailed statistics about the cleanup operation and current lock status.
  963. Returns:
  964. Same as cleanup_expired_locks in KeyedUnifiedLock
  965. """
  966. global _storage_keyed_lock
  967. # Check if shared storage is initialized
  968. if not _initialized or _storage_keyed_lock is None:
  969. return {
  970. "process_id": os.getpid(),
  971. "cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0},
  972. "current_status": {
  973. "total_mp_locks": 0,
  974. "pending_mp_cleanup": 0,
  975. "total_async_locks": 0,
  976. "pending_async_cleanup": 0,
  977. },
  978. }
  979. return _storage_keyed_lock.cleanup_expired_locks()
  980. def get_keyed_lock_status() -> Dict[str, Any]:
  981. """
  982. Get current status of keyed locks without performing cleanup.
  983. This function provides a read-only view of the current lock counts
  984. for both multiprocess and async locks, including pending cleanup counts.
  985. Returns:
  986. Same as get_lock_status in KeyedUnifiedLock
  987. """
  988. global _storage_keyed_lock
  989. # Check if shared storage is initialized
  990. if not _initialized or _storage_keyed_lock is None:
  991. return {
  992. "process_id": os.getpid(),
  993. "total_mp_locks": 0,
  994. "pending_mp_cleanup": 0,
  995. "total_async_locks": 0,
  996. "pending_async_cleanup": 0,
  997. }
  998. status = _storage_keyed_lock.get_lock_status()
  999. status["process_id"] = os.getpid()
  1000. return status
  1001. def initialize_share_data(workers: int = 1):
  1002. """
  1003. Initialize shared storage data for single or multi-process mode.
  1004. When used with Gunicorn's preload feature, this function is called once in the
  1005. master process before forking worker processes, allowing all workers to share
  1006. the same initialized data.
  1007. In single-process mode, this function is called in FASTAPI lifespan function.
  1008. The function determines whether to use cross-process shared variables for data storage
  1009. based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
  1010. If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
  1011. Args:
  1012. workers (int): Number of worker processes. If 1, single-process mode is used.
  1013. If > 1, multi-process mode with shared memory is used.
  1014. """
  1015. global \
  1016. _manager, \
  1017. _workers, \
  1018. _is_multiprocess, \
  1019. _lock_registry, \
  1020. _lock_registry_count, \
  1021. _lock_cleanup_data, \
  1022. _registry_guard, \
  1023. _internal_lock, \
  1024. _data_init_lock, \
  1025. _shared_dicts, \
  1026. _init_flags, \
  1027. _initialized, \
  1028. _update_flags, \
  1029. _async_locks, \
  1030. _storage_keyed_lock, \
  1031. _earliest_mp_cleanup_time, \
  1032. _last_mp_cleanup_time
  1033. # Check if already initialized
  1034. if _initialized:
  1035. direct_log(
  1036. f"Process {os.getpid()} Shared-Data already initialized (multiprocess={_is_multiprocess})"
  1037. )
  1038. return
  1039. _workers = workers
  1040. if workers > 1:
  1041. _is_multiprocess = True
  1042. _manager = Manager()
  1043. _lock_registry = _manager.dict()
  1044. _lock_registry_count = _manager.dict()
  1045. _lock_cleanup_data = _manager.dict()
  1046. _registry_guard = _manager.RLock()
  1047. _internal_lock = _manager.Lock()
  1048. _data_init_lock = _manager.Lock()
  1049. _shared_dicts = _manager.dict()
  1050. _init_flags = _manager.dict()
  1051. _update_flags = _manager.dict()
  1052. _storage_keyed_lock = KeyedUnifiedLock()
  1053. # Initialize async locks for multiprocess mode
  1054. _async_locks = {
  1055. "internal_lock": asyncio.Lock(),
  1056. "graph_db_lock": asyncio.Lock(),
  1057. "data_init_lock": asyncio.Lock(),
  1058. }
  1059. direct_log(
  1060. f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
  1061. )
  1062. else:
  1063. _is_multiprocess = False
  1064. _internal_lock = asyncio.Lock()
  1065. _data_init_lock = asyncio.Lock()
  1066. _shared_dicts = {}
  1067. _init_flags = {}
  1068. _update_flags = {}
  1069. _async_locks = None # No need for async locks in single process mode
  1070. _storage_keyed_lock = KeyedUnifiedLock()
  1071. direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
  1072. # Initialize multiprocess cleanup times
  1073. _earliest_mp_cleanup_time = None
  1074. _last_mp_cleanup_time = None
  1075. # Mark as initialized
  1076. _initialized = True
  1077. async def initialize_pipeline_status(workspace: str | None = None):
  1078. """
  1079. Initialize pipeline_status share data with default values.
  1080. This function could be called before during FASTAPI lifespan for each worker.
  1081. Args:
  1082. workspace: Optional workspace identifier for pipeline_status of specific workspace.
  1083. If None or empty string, uses the default workspace set by
  1084. set_default_workspace().
  1085. """
  1086. pipeline_namespace = await get_namespace_data(
  1087. "pipeline_status", first_init=True, workspace=workspace
  1088. )
  1089. async with get_internal_lock():
  1090. # Check if already initialized by checking for required fields
  1091. if "busy" in pipeline_namespace:
  1092. return
  1093. # Create a shared list object for history_messages
  1094. history_messages = _manager.list() if _is_multiprocess else []
  1095. pipeline_namespace.update(
  1096. {
  1097. "autoscanned": False, # Auto-scan started
  1098. "busy": False, # Control concurrent processes
  1099. # Destructive subset of ``busy``: clear / delete jobs that
  1100. # DROP storages or remove input files. Concurrent enqueue
  1101. # would race against the drop and silently lose the
  1102. # accepted document, so reservation and the enqueue
  1103. # last-line guard reject when this is True. ``busy`` on
  1104. # its own (the processing loop) remains compatible with
  1105. # concurrent enqueue via request_pending.
  1106. "destructive_busy": False,
  1107. "scanning": False, # /documents/scan task running (whole lifecycle)
  1108. # Exclusive subset of ``scanning``: only True during the
  1109. # scan's *classification* phase, when run_scanning_process
  1110. # is reading doc_status to classify files (PROCESSED →
  1111. # archive, FAILED-without-full_docs → retry-as-new, etc.)
  1112. # and possibly deleting stale stubs. After classification
  1113. # the scan transitions to its processing phase (which
  1114. # behaves like any other busy processing run) and clears
  1115. # this flag, allowing concurrent uploads to land in
  1116. # doc_status while the scan-driven processing finishes.
  1117. "scanning_exclusive": False,
  1118. # Counter of upload/insert endpoints that have passed the
  1119. # idle preflight but whose background enqueue has not yet
  1120. # run. Closes the preflight-to-background race: scan
  1121. # refuses to start while this is > 0 so the bg task is
  1122. # guaranteed to see scanning=False at enqueue time.
  1123. "pending_enqueues": 0,
  1124. "job_name": "-", # Current job name (indexing files/indexing texts)
  1125. "job_start": None, # Job start time
  1126. "docs": 0, # Total number of documents to be indexed
  1127. "batchs": 0, # Number of batches for processing documents
  1128. "cur_batch": 0, # Current processing batch
  1129. "request_pending": False, # Flag for pending request for processing
  1130. "latest_message": "", # Latest message from pipeline processing
  1131. "history_messages": history_messages, # 使用共享列表对象
  1132. }
  1133. )
  1134. final_namespace = get_final_namespace("pipeline_status", workspace)
  1135. direct_log(
  1136. f"Process {os.getpid()} Pipeline namespace '{final_namespace}' initialized"
  1137. )
  1138. async def get_update_flag(namespace: str, workspace: str | None = None):
  1139. """
  1140. Create a namespace's update flag for a workers.
  1141. Returen the update flag to caller for referencing or reset.
  1142. """
  1143. global _update_flags
  1144. if _update_flags is None:
  1145. raise ValueError("Try to create namespace before Shared-Data is initialized")
  1146. final_namespace = get_final_namespace(namespace, workspace)
  1147. async with get_internal_lock():
  1148. if final_namespace not in _update_flags:
  1149. if _is_multiprocess and _manager is not None:
  1150. _update_flags[final_namespace] = _manager.list()
  1151. else:
  1152. _update_flags[final_namespace] = []
  1153. direct_log(
  1154. f"Process {os.getpid()} initialized updated flags for namespace: [{final_namespace}]"
  1155. )
  1156. if _is_multiprocess and _manager is not None:
  1157. new_update_flag = _manager.Value("b", False)
  1158. else:
  1159. # Create a simple mutable object to store boolean value for compatibility with mutiprocess
  1160. class MutableBoolean:
  1161. def __init__(self, initial_value=False):
  1162. self.value = initial_value
  1163. new_update_flag = MutableBoolean(False)
  1164. _update_flags[final_namespace].append(new_update_flag)
  1165. return new_update_flag
  1166. async def set_all_update_flags(namespace: str, workspace: str | None = None):
  1167. """Set all update flag of namespace indicating all workers need to reload data from files"""
  1168. global _update_flags
  1169. if _update_flags is None:
  1170. raise ValueError("Try to create namespace before Shared-Data is initialized")
  1171. final_namespace = get_final_namespace(namespace, workspace)
  1172. async with get_internal_lock():
  1173. if final_namespace not in _update_flags:
  1174. raise ValueError(f"Namespace {final_namespace} not found in update flags")
  1175. # Update flags for both modes
  1176. for i in range(len(_update_flags[final_namespace])):
  1177. _update_flags[final_namespace][i].value = True
  1178. async def clear_all_update_flags(namespace: str, workspace: str | None = None):
  1179. """Clear all update flag of namespace indicating all workers need to reload data from files"""
  1180. global _update_flags
  1181. if _update_flags is None:
  1182. raise ValueError("Try to create namespace before Shared-Data is initialized")
  1183. final_namespace = get_final_namespace(namespace, workspace)
  1184. async with get_internal_lock():
  1185. if final_namespace not in _update_flags:
  1186. raise ValueError(f"Namespace {final_namespace} not found in update flags")
  1187. # Update flags for both modes
  1188. for i in range(len(_update_flags[final_namespace])):
  1189. _update_flags[final_namespace][i].value = False
  1190. async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, list]:
  1191. """
  1192. Get update flags status for all namespaces.
  1193. Returns:
  1194. Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses
  1195. """
  1196. if _update_flags is None:
  1197. return {}
  1198. if workspace is None:
  1199. workspace = get_default_workspace()
  1200. result = {}
  1201. async with get_internal_lock():
  1202. for namespace, flags in _update_flags.items():
  1203. # Check if namespace has a workspace prefix (contains ':')
  1204. if ":" in namespace:
  1205. # Namespace has workspace prefix like "space1:pipeline_status"
  1206. # Only include if workspace matches the prefix
  1207. # Use rsplit to split from the right since workspace can contain colons
  1208. namespace_split = namespace.rsplit(":", 1)
  1209. if not workspace or namespace_split[0] != workspace:
  1210. continue
  1211. else:
  1212. # Namespace has no workspace prefix like "pipeline_status"
  1213. # Only include if we're querying the default (empty) workspace
  1214. if workspace:
  1215. continue
  1216. worker_statuses = []
  1217. for flag in flags:
  1218. if _is_multiprocess:
  1219. worker_statuses.append(flag.value)
  1220. else:
  1221. worker_statuses.append(flag)
  1222. result[namespace] = worker_statuses
  1223. return result
  1224. async def try_initialize_namespace(
  1225. namespace: str, workspace: str | None = None
  1226. ) -> bool:
  1227. """
  1228. Returns True if the current worker(process) gets initialization permission for loading data later.
  1229. The worker does not get the permission is prohibited to load data from files.
  1230. """
  1231. global _init_flags, _manager
  1232. if _init_flags is None:
  1233. raise ValueError("Try to create nanmespace before Shared-Data is initialized")
  1234. final_namespace = get_final_namespace(namespace, workspace)
  1235. async with get_internal_lock():
  1236. if final_namespace not in _init_flags:
  1237. _init_flags[final_namespace] = True
  1238. direct_log(
  1239. f"Process {os.getpid()} ready to initialize storage namespace: [{final_namespace}]"
  1240. )
  1241. return True
  1242. direct_log(
  1243. f"Process {os.getpid()} storage namespace already initialized: [{final_namespace}]"
  1244. )
  1245. return False
  1246. async def get_namespace_data(
  1247. namespace: str, first_init: bool = False, workspace: str | None = None
  1248. ) -> Dict[str, Any]:
  1249. """get the shared data reference for specific namespace
  1250. Args:
  1251. namespace: The namespace to retrieve
  1252. first_init: If True, allows pipeline_status namespace to create namespace if it doesn't exist.
  1253. Prevent getting pipeline_status namespace without initialize_pipeline_status().
  1254. This parameter is used internally by initialize_pipeline_status().
  1255. workspace: Workspace identifier (may be empty string for global namespace)
  1256. """
  1257. if _shared_dicts is None:
  1258. direct_log(
  1259. f"Error: Try to getnanmespace before it is initialized, pid={os.getpid()}",
  1260. level="ERROR",
  1261. )
  1262. raise ValueError("Shared dictionaries not initialized")
  1263. final_namespace = get_final_namespace(namespace, workspace)
  1264. async with get_internal_lock():
  1265. if final_namespace not in _shared_dicts:
  1266. # Special handling for pipeline_status namespace
  1267. if (
  1268. final_namespace.endswith(":pipeline_status")
  1269. or final_namespace == "pipeline_status"
  1270. ) and not first_init:
  1271. # Check if pipeline_status should have been initialized but wasn't
  1272. # This helps users to call initialize_pipeline_status() before get_namespace_data()
  1273. raise PipelineNotInitializedError(final_namespace)
  1274. # For other namespaces or when allow_create=True, create them dynamically
  1275. if _is_multiprocess and _manager is not None:
  1276. _shared_dicts[final_namespace] = _manager.dict()
  1277. else:
  1278. _shared_dicts[final_namespace] = {}
  1279. return _shared_dicts[final_namespace]
  1280. class NamespaceLock:
  1281. """
  1282. Reusable namespace lock wrapper that creates a fresh context on each use.
  1283. This class solves the lock re-entrance and concurrent coroutine issues by using
  1284. contextvars.ContextVar to provide per-coroutine storage. Each coroutine gets its
  1285. own independent lock context, preventing state interference between concurrent
  1286. coroutines using the same NamespaceLock instance.
  1287. Example:
  1288. lock = NamespaceLock("my_namespace", "workspace1")
  1289. # Can be used multiple times safely
  1290. async with lock:
  1291. await do_something()
  1292. # Can even be used concurrently without deadlock
  1293. await asyncio.gather(
  1294. coroutine_1(lock), # Each gets its own context
  1295. coroutine_2(lock) # No state interference
  1296. )
  1297. """
  1298. def __init__(
  1299. self, namespace: str, workspace: str | None = None, enable_logging: bool = False
  1300. ):
  1301. self._namespace = namespace
  1302. self._workspace = workspace
  1303. self._enable_logging = enable_logging
  1304. # Use ContextVar to provide per-coroutine storage for lock context
  1305. # This ensures each coroutine has its own independent context
  1306. self._ctx_var: ContextVar[Optional[_KeyedLockContext]] = ContextVar(
  1307. "lock_ctx", default=None
  1308. )
  1309. async def __aenter__(self):
  1310. """Create a fresh context each time we enter"""
  1311. # Check if this coroutine already has an active lock context
  1312. if self._ctx_var.get() is not None:
  1313. raise RuntimeError(
  1314. "NamespaceLock already acquired in current coroutine context"
  1315. )
  1316. final_namespace = get_final_namespace(self._namespace, self._workspace)
  1317. ctx = get_storage_keyed_lock(
  1318. ["default_key"],
  1319. namespace=final_namespace,
  1320. enable_logging=self._enable_logging,
  1321. )
  1322. # Acquire the lock first, then store context only after successful acquisition
  1323. # This prevents the ContextVar from being set if acquisition fails (e.g., due to cancellation),
  1324. # which would permanently brick the lock
  1325. result = await ctx.__aenter__()
  1326. self._ctx_var.set(ctx)
  1327. return result
  1328. async def __aexit__(self, exc_type, exc_val, exc_tb):
  1329. """Exit the current context and clean up"""
  1330. # Retrieve this coroutine's context
  1331. ctx = self._ctx_var.get()
  1332. if ctx is None:
  1333. raise RuntimeError("NamespaceLock exited without being entered")
  1334. result = await ctx.__aexit__(exc_type, exc_val, exc_tb)
  1335. # Clear this coroutine's context
  1336. self._ctx_var.set(None)
  1337. return result
  1338. def get_namespace_lock(
  1339. namespace: str, workspace: str | None = None, enable_logging: bool = False
  1340. ) -> NamespaceLock:
  1341. """Get a reusable namespace lock wrapper.
  1342. This function returns a NamespaceLock instance that can be used multiple times
  1343. safely, even in concurrent scenarios. Each use creates a fresh lock context
  1344. internally, preventing lock re-entrance errors.
  1345. Args:
  1346. namespace: The namespace to get the lock for.
  1347. workspace: Workspace identifier (may be empty string for global namespace)
  1348. enable_logging: Whether to enable lock operation logging
  1349. Returns:
  1350. NamespaceLock: A reusable lock wrapper that can be used with 'async with'
  1351. Example:
  1352. lock = get_namespace_lock("pipeline_status", workspace="space1")
  1353. # Can be used multiple times
  1354. async with lock:
  1355. await do_something()
  1356. async with lock:
  1357. await do_something_else()
  1358. """
  1359. return NamespaceLock(namespace, workspace, enable_logging)
  1360. def finalize_share_data():
  1361. """
  1362. Release shared resources and clean up.
  1363. This function should be called when the application is shutting down
  1364. to properly release shared resources and avoid memory leaks.
  1365. In multi-process mode, it shuts down the Manager and releases all shared objects.
  1366. In single-process mode, it simply resets the global variables.
  1367. """
  1368. global \
  1369. _manager, \
  1370. _is_multiprocess, \
  1371. _internal_lock, \
  1372. _data_init_lock, \
  1373. _shared_dicts, \
  1374. _init_flags, \
  1375. _initialized, \
  1376. _update_flags, \
  1377. _async_locks, \
  1378. _default_workspace
  1379. # Check if already initialized
  1380. if not _initialized:
  1381. direct_log(
  1382. f"Process {os.getpid()} storage data not initialized, nothing to finalize"
  1383. )
  1384. return
  1385. direct_log(
  1386. f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})"
  1387. )
  1388. # In multi-process mode, shut down the Manager
  1389. if _is_multiprocess and _manager is not None:
  1390. try:
  1391. # Clear shared resources before shutting down Manager
  1392. if _shared_dicts is not None:
  1393. # Clear pipeline status history messages first if exists
  1394. try:
  1395. pipeline_status = _shared_dicts.get("pipeline_status", {})
  1396. if "history_messages" in pipeline_status:
  1397. pipeline_status["history_messages"].clear()
  1398. except Exception:
  1399. pass # Ignore any errors during history messages cleanup
  1400. _shared_dicts.clear()
  1401. if _init_flags is not None:
  1402. _init_flags.clear()
  1403. if _update_flags is not None:
  1404. # Clear each namespace's update flags list and Value objects
  1405. try:
  1406. for namespace in _update_flags:
  1407. flags_list = _update_flags[namespace]
  1408. if isinstance(flags_list, list):
  1409. # Clear Value objects in the list
  1410. for flag in flags_list:
  1411. if hasattr(
  1412. flag, "value"
  1413. ): # Check if it's a Value object
  1414. flag.value = False
  1415. flags_list.clear()
  1416. except Exception:
  1417. pass # Ignore any errors during update flags cleanup
  1418. _update_flags.clear()
  1419. # Shut down the Manager - this will automatically clean up all shared resources
  1420. _manager.shutdown()
  1421. direct_log(f"Process {os.getpid()} Manager shutdown complete")
  1422. except Exception as e:
  1423. direct_log(
  1424. f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR"
  1425. )
  1426. # Reset global variables
  1427. _manager = None
  1428. _initialized = None
  1429. _is_multiprocess = None
  1430. _shared_dicts = None
  1431. _init_flags = None
  1432. _internal_lock = None
  1433. _data_init_lock = None
  1434. _update_flags = None
  1435. _async_locks = None
  1436. _default_workspace = None
  1437. direct_log(f"Process {os.getpid()} storage data finalization complete")
  1438. def set_default_workspace(workspace: str | None = None):
  1439. """
  1440. Set default workspace for namespace operations for backward compatibility.
  1441. This allows get_namespace_data(),get_namespace_lock() or initialize_pipeline_status() to
  1442. automatically use the correct workspace when called without workspace parameters,
  1443. maintaining compatibility with legacy code that doesn't pass workspace explicitly.
  1444. Args:
  1445. workspace: Workspace identifier (may be empty string for global namespace)
  1446. """
  1447. global _default_workspace
  1448. if workspace is None:
  1449. workspace = ""
  1450. _default_workspace = workspace
  1451. direct_log(
  1452. f"Default workspace set to: '{_default_workspace}' (empty means global)",
  1453. level="DEBUG",
  1454. )
  1455. def get_default_workspace() -> str:
  1456. """
  1457. Get default workspace for backward compatibility.
  1458. Returns:
  1459. The default workspace string. Empty string means global namespace. None means not set.
  1460. """
  1461. global _default_workspace
  1462. return _default_workspace
  1463. def get_pipeline_status_lock(
  1464. enable_logging: bool = False, workspace: str = None
  1465. ) -> NamespaceLock:
  1466. """Return unified storage lock for pipeline status data consistency.
  1467. This function is for compatibility with legacy code only.
  1468. """
  1469. global _default_workspace
  1470. actual_workspace = workspace if workspace else _default_workspace
  1471. return get_namespace_lock(
  1472. "pipeline_status", workspace=actual_workspace, enable_logging=enable_logging
  1473. )