clean_llm_query_cache.py 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249
  1. #!/usr/bin/env python3
  2. """
  3. LLM Query Cache Cleanup Tool for LightRAG
  4. This tool cleans up LLM query cache (mix:*, hybrid:*, local:*, global:*)
  5. from KV storage implementations while preserving workspace isolation.
  6. Usage:
  7. python -m lightrag.tools.clean_llm_query_cache
  8. # or
  9. python lightrag/tools/clean_llm_query_cache.py
  10. Supported KV Storage Types:
  11. - JsonKVStorage
  12. - RedisKVStorage
  13. - PGKVStorage
  14. - MongoKVStorage
  15. - OpenSearchKVStorage
  16. """
  17. import asyncio
  18. import os
  19. import sys
  20. import time
  21. from typing import Any, Dict, List
  22. from dataclasses import dataclass, field
  23. from dotenv import load_dotenv
  24. # Add project root to path for imports
  25. sys.path.insert(
  26. 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  27. )
  28. from lightrag.kg import STORAGE_ENV_REQUIREMENTS
  29. from lightrag.kg.shared_storage import set_all_update_flags
  30. from lightrag.namespace import NameSpace
  31. from lightrag.utils import setup_logger
  32. # Load environment variables
  33. load_dotenv(dotenv_path=".env", override=False)
  34. # Setup logger
  35. setup_logger("lightrag", level="INFO")
  36. # Storage type configurations
  37. STORAGE_TYPES = {
  38. "1": "JsonKVStorage",
  39. "2": "RedisKVStorage",
  40. "3": "PGKVStorage",
  41. "4": "MongoKVStorage",
  42. "5": "OpenSearchKVStorage",
  43. }
  44. # Workspace environment variable mapping
  45. WORKSPACE_ENV_MAP = {
  46. "PGKVStorage": "POSTGRES_WORKSPACE",
  47. "MongoKVStorage": "MONGODB_WORKSPACE",
  48. "RedisKVStorage": "REDIS_WORKSPACE",
  49. "OpenSearchKVStorage": "OPENSEARCH_WORKSPACE",
  50. }
  51. # Query cache modes
  52. QUERY_MODES = ["mix", "hybrid", "local", "global"]
  53. # Query cache types
  54. CACHE_TYPES = ["query", "keywords"]
  55. # Default batch size for deletion
  56. DEFAULT_BATCH_SIZE = 1000
  57. # ANSI color codes for terminal output
  58. BOLD_CYAN = "\033[1;36m"
  59. BOLD_RED = "\033[1;31m"
  60. BOLD_GREEN = "\033[1;32m"
  61. RESET = "\033[0m"
  62. @dataclass
  63. class CleanupStats:
  64. """Cleanup statistics and error tracking"""
  65. # Count by mode and cache_type before cleanup
  66. counts_before: Dict[str, Dict[str, int]] = field(default_factory=dict)
  67. # Deletion statistics
  68. total_to_delete: int = 0
  69. total_batches: int = 0
  70. successful_batches: int = 0
  71. failed_batches: int = 0
  72. successfully_deleted: int = 0
  73. failed_to_delete: int = 0
  74. # Count by mode and cache_type after cleanup
  75. counts_after: Dict[str, Dict[str, int]] = field(default_factory=dict)
  76. # Error tracking
  77. errors: List[Dict[str, Any]] = field(default_factory=list)
  78. def add_error(self, batch_idx: int, error: Exception, batch_size: int):
  79. """Record batch error"""
  80. self.errors.append(
  81. {
  82. "batch": batch_idx,
  83. "error_type": type(error).__name__,
  84. "error_msg": str(error),
  85. "records_lost": batch_size,
  86. "timestamp": time.time(),
  87. }
  88. )
  89. self.failed_batches += 1
  90. self.failed_to_delete += batch_size
  91. def initialize_counts(self):
  92. """Initialize count dictionaries"""
  93. for mode in QUERY_MODES:
  94. self.counts_before[mode] = {"query": 0, "keywords": 0}
  95. self.counts_after[mode] = {"query": 0, "keywords": 0}
  96. class CleanupTool:
  97. """LLM Query Cache Cleanup Tool"""
  98. def __init__(self):
  99. self.storage = None
  100. self.workspace = ""
  101. self.batch_size = DEFAULT_BATCH_SIZE
  102. def get_workspace_for_storage(self, storage_name: str) -> str:
  103. """Get workspace for a specific storage type
  104. Priority: Storage-specific env var > WORKSPACE env var > empty string
  105. Args:
  106. storage_name: Storage implementation name
  107. Returns:
  108. Workspace name
  109. """
  110. # Check storage-specific workspace
  111. if storage_name in WORKSPACE_ENV_MAP:
  112. specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name])
  113. if specific_workspace:
  114. return specific_workspace
  115. # Check generic WORKSPACE
  116. workspace = os.getenv("WORKSPACE", "")
  117. return workspace
  118. def check_config_ini_for_storage(self, storage_name: str) -> bool:
  119. """Check if config.ini has configuration for the storage type
  120. Args:
  121. storage_name: Storage implementation name
  122. Returns:
  123. True if config.ini has the necessary configuration
  124. """
  125. try:
  126. import configparser
  127. config = configparser.ConfigParser()
  128. config.read("config.ini", "utf-8")
  129. if storage_name == "RedisKVStorage":
  130. return config.has_option("redis", "uri")
  131. elif storage_name == "PGKVStorage":
  132. return (
  133. config.has_option("postgres", "user")
  134. and config.has_option("postgres", "password")
  135. and config.has_option("postgres", "database")
  136. )
  137. elif storage_name == "MongoKVStorage":
  138. return config.has_option("mongodb", "uri") and config.has_option(
  139. "mongodb", "database"
  140. )
  141. elif storage_name == "OpenSearchKVStorage":
  142. return config.has_option("opensearch", "hosts")
  143. return False
  144. except Exception:
  145. return False
  146. def check_env_vars(self, storage_name: str) -> bool:
  147. """Check environment variables, show warnings if missing but don't fail
  148. Args:
  149. storage_name: Storage implementation name
  150. Returns:
  151. Always returns True (warnings only, no hard failure)
  152. """
  153. required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
  154. if not required_vars:
  155. print("✓ No environment variables required")
  156. return True
  157. missing_vars = [var for var in required_vars if var not in os.environ]
  158. if missing_vars:
  159. print(
  160. f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}"
  161. )
  162. # Check if config.ini has configuration
  163. has_config = self.check_config_ini_for_storage(storage_name)
  164. if has_config:
  165. print(" ✓ Found configuration in config.ini")
  166. else:
  167. print(f" Will attempt to use defaults for {storage_name}")
  168. return True
  169. print("✓ All required environment variables are set")
  170. return True
  171. def get_storage_class(self, storage_name: str):
  172. """Dynamically import and return storage class
  173. Args:
  174. storage_name: Storage implementation name
  175. Returns:
  176. Storage class
  177. """
  178. if storage_name == "JsonKVStorage":
  179. from lightrag.kg.json_kv_impl import JsonKVStorage
  180. return JsonKVStorage
  181. elif storage_name == "RedisKVStorage":
  182. from lightrag.kg.redis_impl import RedisKVStorage
  183. return RedisKVStorage
  184. elif storage_name == "PGKVStorage":
  185. from lightrag.kg.postgres_impl import PGKVStorage
  186. return PGKVStorage
  187. elif storage_name == "MongoKVStorage":
  188. from lightrag.kg.mongo_impl import MongoKVStorage
  189. return MongoKVStorage
  190. elif storage_name == "OpenSearchKVStorage":
  191. from lightrag.kg.opensearch_impl import OpenSearchKVStorage
  192. return OpenSearchKVStorage
  193. else:
  194. raise ValueError(f"Unsupported storage type: {storage_name}")
  195. async def initialize_storage(self, storage_name: str, workspace: str):
  196. """Initialize storage instance with fallback to config.ini and defaults
  197. Args:
  198. storage_name: Storage implementation name
  199. workspace: Workspace name
  200. Returns:
  201. Initialized storage instance
  202. Raises:
  203. Exception: If initialization fails
  204. """
  205. storage_class = self.get_storage_class(storage_name)
  206. # Create global config
  207. global_config = {
  208. "working_dir": os.getenv("WORKING_DIR", "./rag_storage"),
  209. "embedding_batch_num": 10,
  210. }
  211. # Initialize storage
  212. storage = storage_class(
  213. namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
  214. workspace=workspace,
  215. global_config=global_config,
  216. embedding_func=None,
  217. )
  218. # Initialize the storage (may raise exception if connection fails)
  219. await storage.initialize()
  220. return storage
  221. async def count_query_caches_json(self, storage) -> Dict[str, Dict[str, int]]:
  222. """Count query caches in JsonKVStorage by mode and cache_type
  223. Args:
  224. storage: JsonKVStorage instance
  225. Returns:
  226. Dictionary with counts for each mode and cache_type
  227. """
  228. counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES}
  229. async with storage._storage_lock:
  230. for key in storage._data.keys():
  231. for mode in QUERY_MODES:
  232. if key.startswith(f"{mode}:query:"):
  233. counts[mode]["query"] += 1
  234. elif key.startswith(f"{mode}:keywords:"):
  235. counts[mode]["keywords"] += 1
  236. return counts
  237. async def count_query_caches_redis(self, storage) -> Dict[str, Dict[str, int]]:
  238. """Count query caches in RedisKVStorage by mode and cache_type
  239. Args:
  240. storage: RedisKVStorage instance
  241. Returns:
  242. Dictionary with counts for each mode and cache_type
  243. """
  244. counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES}
  245. print("Scanning Redis keys...", end="", flush=True)
  246. async with storage._get_redis_connection() as redis:
  247. for mode in QUERY_MODES:
  248. for cache_type in CACHE_TYPES:
  249. pattern = f"{mode}:{cache_type}:*"
  250. prefixed_pattern = f"{storage.final_namespace}:{pattern}"
  251. cursor = 0
  252. while True:
  253. cursor, keys = await redis.scan(
  254. cursor, match=prefixed_pattern, count=DEFAULT_BATCH_SIZE
  255. )
  256. counts[mode][cache_type] += len(keys)
  257. if cursor == 0:
  258. break
  259. print() # New line after progress
  260. return counts
  261. async def count_query_caches_pg(self, storage) -> Dict[str, Dict[str, int]]:
  262. """Count query caches in PostgreSQL by mode and cache_type
  263. Args:
  264. storage: PGKVStorage instance
  265. Returns:
  266. Dictionary with counts for each mode and cache_type
  267. """
  268. from lightrag.kg.postgres_impl import namespace_to_table_name
  269. counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES}
  270. table_name = namespace_to_table_name(storage.namespace)
  271. print("Counting PostgreSQL records...", end="", flush=True)
  272. start_time = time.time()
  273. for mode in QUERY_MODES:
  274. for cache_type in CACHE_TYPES:
  275. query = f"""
  276. SELECT COUNT(*) as count
  277. FROM {table_name}
  278. WHERE workspace = $1
  279. AND id LIKE $2
  280. """
  281. pattern = f"{mode}:{cache_type}:%"
  282. result = await storage.db.query(query, [storage.workspace, pattern])
  283. counts[mode][cache_type] = result["count"] if result else 0
  284. elapsed = time.time() - start_time
  285. if elapsed > 1:
  286. print(f" (took {elapsed:.1f}s)", end="")
  287. print() # New line
  288. return counts
  289. async def count_query_caches_mongo(self, storage) -> Dict[str, Dict[str, int]]:
  290. """Count query caches in MongoDB by mode and cache_type
  291. Args:
  292. storage: MongoKVStorage instance
  293. Returns:
  294. Dictionary with counts for each mode and cache_type
  295. """
  296. counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES}
  297. print("Counting MongoDB documents...", end="", flush=True)
  298. start_time = time.time()
  299. for mode in QUERY_MODES:
  300. for cache_type in CACHE_TYPES:
  301. pattern = f"^{mode}:{cache_type}:"
  302. query = {"_id": {"$regex": pattern}}
  303. count = await storage._data.count_documents(query)
  304. counts[mode][cache_type] = count
  305. elapsed = time.time() - start_time
  306. if elapsed > 1:
  307. print(f" (took {elapsed:.1f}s)", end="")
  308. print() # New line
  309. return counts
  310. async def count_query_caches_opensearch(self, storage) -> Dict[str, Dict[str, int]]:
  311. """Count query caches in OpenSearch by mode and cache_type."""
  312. counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES}
  313. print("Scanning OpenSearch documents...", end="", flush=True)
  314. start_time = time.time()
  315. async for hits in storage._iter_raw_docs(batch_size=DEFAULT_BATCH_SIZE):
  316. for hit in hits:
  317. key = hit["_id"]
  318. for mode in QUERY_MODES:
  319. if key.startswith(f"{mode}:query:"):
  320. counts[mode]["query"] += 1
  321. elif key.startswith(f"{mode}:keywords:"):
  322. counts[mode]["keywords"] += 1
  323. elapsed = time.time() - start_time
  324. if elapsed > 1:
  325. print(f" (took {elapsed:.1f}s)", end="")
  326. print()
  327. return counts
  328. async def count_query_caches(
  329. self, storage, storage_name: str
  330. ) -> Dict[str, Dict[str, int]]:
  331. """Count query caches from any storage type efficiently
  332. Args:
  333. storage: Storage instance
  334. storage_name: Storage type name
  335. Returns:
  336. Dictionary with counts for each mode and cache_type
  337. """
  338. if storage_name == "JsonKVStorage":
  339. return await self.count_query_caches_json(storage)
  340. elif storage_name == "RedisKVStorage":
  341. return await self.count_query_caches_redis(storage)
  342. elif storage_name == "PGKVStorage":
  343. return await self.count_query_caches_pg(storage)
  344. elif storage_name == "MongoKVStorage":
  345. return await self.count_query_caches_mongo(storage)
  346. elif storage_name == "OpenSearchKVStorage":
  347. return await self.count_query_caches_opensearch(storage)
  348. else:
  349. raise ValueError(f"Unsupported storage type: {storage_name}")
  350. async def delete_query_caches_json(
  351. self, storage, cleanup_type: str, stats: CleanupStats
  352. ):
  353. """Delete query caches from JsonKVStorage
  354. Args:
  355. storage: JsonKVStorage instance
  356. cleanup_type: 'all', 'query', or 'keywords'
  357. stats: CleanupStats object to track progress
  358. """
  359. # Collect keys to delete
  360. async with storage._storage_lock:
  361. keys_to_delete = []
  362. for key in storage._data.keys():
  363. should_delete = False
  364. for mode in QUERY_MODES:
  365. if cleanup_type == "all":
  366. if key.startswith(f"{mode}:query:") or key.startswith(
  367. f"{mode}:keywords:"
  368. ):
  369. should_delete = True
  370. elif cleanup_type == "query":
  371. if key.startswith(f"{mode}:query:"):
  372. should_delete = True
  373. elif cleanup_type == "keywords":
  374. if key.startswith(f"{mode}:keywords:"):
  375. should_delete = True
  376. if should_delete:
  377. keys_to_delete.append(key)
  378. # Delete in batches
  379. total_keys = len(keys_to_delete)
  380. stats.total_batches = (total_keys + self.batch_size - 1) // self.batch_size
  381. print("\n=== Starting Cleanup ===")
  382. print(
  383. f"💡 Processing {self.batch_size:,} records at a time from JsonKVStorage\n"
  384. )
  385. for batch_idx in range(stats.total_batches):
  386. start_idx = batch_idx * self.batch_size
  387. end_idx = min((batch_idx + 1) * self.batch_size, total_keys)
  388. batch_keys = keys_to_delete[start_idx:end_idx]
  389. try:
  390. async with storage._storage_lock:
  391. for key in batch_keys:
  392. del storage._data[key]
  393. # CRITICAL: Set update flag so changes persist to disk
  394. # Without this, deletions remain in-memory only and are lost on exit
  395. await set_all_update_flags(
  396. storage.namespace, workspace=storage.workspace
  397. )
  398. # Success
  399. stats.successful_batches += 1
  400. stats.successfully_deleted += len(batch_keys)
  401. # Calculate progress
  402. progress = (stats.successfully_deleted / total_keys) * 100
  403. bar_length = 20
  404. filled_length = int(
  405. bar_length * stats.successfully_deleted // total_keys
  406. )
  407. bar = "█" * filled_length + "░" * (bar_length - filled_length)
  408. print(
  409. f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
  410. f"{stats.successfully_deleted:,}/{total_keys:,} ({progress:.1f}%) ✓"
  411. )
  412. except Exception as e:
  413. stats.add_error(batch_idx + 1, e, len(batch_keys))
  414. print(
  415. f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
  416. f"{type(e).__name__}: {str(e)}"
  417. )
  418. async def delete_query_caches_redis(
  419. self, storage, cleanup_type: str, stats: CleanupStats
  420. ):
  421. """Delete query caches from RedisKVStorage
  422. Args:
  423. storage: RedisKVStorage instance
  424. cleanup_type: 'all', 'query', or 'keywords'
  425. stats: CleanupStats object to track progress
  426. """
  427. # Build patterns to delete
  428. patterns = []
  429. for mode in QUERY_MODES:
  430. if cleanup_type == "all":
  431. patterns.append(f"{mode}:query:*")
  432. patterns.append(f"{mode}:keywords:*")
  433. elif cleanup_type == "query":
  434. patterns.append(f"{mode}:query:*")
  435. elif cleanup_type == "keywords":
  436. patterns.append(f"{mode}:keywords:*")
  437. print("\n=== Starting Cleanup ===")
  438. print(f"💡 Processing Redis keys in batches of {self.batch_size:,}\n")
  439. batch_idx = 0
  440. total_deleted = 0
  441. async with storage._get_redis_connection() as redis:
  442. for pattern in patterns:
  443. prefixed_pattern = f"{storage.final_namespace}:{pattern}"
  444. cursor = 0
  445. while True:
  446. cursor, keys = await redis.scan(
  447. cursor, match=prefixed_pattern, count=self.batch_size
  448. )
  449. if keys:
  450. batch_idx += 1
  451. stats.total_batches += 1
  452. try:
  453. # Delete batch using pipeline
  454. pipe = redis.pipeline()
  455. for key in keys:
  456. pipe.delete(key)
  457. await pipe.execute()
  458. # Success
  459. stats.successful_batches += 1
  460. stats.successfully_deleted += len(keys)
  461. total_deleted += len(keys)
  462. # Progress
  463. print(
  464. f"Batch {batch_idx}: Deleted {len(keys):,} keys "
  465. f"(Total: {total_deleted:,}) ✓"
  466. )
  467. except Exception as e:
  468. stats.add_error(batch_idx, e, len(keys))
  469. print(
  470. f"Batch {batch_idx}: ✗ FAILED - "
  471. f"{type(e).__name__}: {str(e)}"
  472. )
  473. if cursor == 0:
  474. break
  475. await asyncio.sleep(0)
  476. async def delete_query_caches_pg(
  477. self, storage, cleanup_type: str, stats: CleanupStats
  478. ):
  479. """Delete query caches from PostgreSQL
  480. Args:
  481. storage: PGKVStorage instance
  482. cleanup_type: 'all', 'query', or 'keywords'
  483. stats: CleanupStats object to track progress
  484. """
  485. from lightrag.kg.postgres_impl import namespace_to_table_name
  486. table_name = namespace_to_table_name(storage.namespace)
  487. # Build WHERE conditions
  488. conditions = []
  489. for mode in QUERY_MODES:
  490. if cleanup_type == "all":
  491. conditions.append(f"id LIKE '{mode}:query:%'")
  492. conditions.append(f"id LIKE '{mode}:keywords:%'")
  493. elif cleanup_type == "query":
  494. conditions.append(f"id LIKE '{mode}:query:%'")
  495. elif cleanup_type == "keywords":
  496. conditions.append(f"id LIKE '{mode}:keywords:%'")
  497. where_clause = " OR ".join(conditions)
  498. print("\n=== Starting Cleanup ===")
  499. print("💡 Executing PostgreSQL DELETE query\n")
  500. try:
  501. query = f"""
  502. DELETE FROM {table_name}
  503. WHERE workspace = $1
  504. AND ({where_clause})
  505. """
  506. start_time = time.time()
  507. # Fix: Pass dict instead of list for execute() method
  508. await storage.db.execute(query, {"workspace": storage.workspace})
  509. elapsed = time.time() - start_time
  510. # PostgreSQL returns deletion count
  511. stats.total_batches = 1
  512. stats.successful_batches = 1
  513. stats.successfully_deleted = stats.total_to_delete
  514. print(f"✓ Deleted {stats.successfully_deleted:,} records in {elapsed:.2f}s")
  515. except Exception as e:
  516. stats.add_error(1, e, stats.total_to_delete)
  517. print(f"✗ DELETE failed: {type(e).__name__}: {str(e)}")
  518. async def delete_query_caches_mongo(
  519. self, storage, cleanup_type: str, stats: CleanupStats
  520. ):
  521. """Delete query caches from MongoDB
  522. Args:
  523. storage: MongoKVStorage instance
  524. cleanup_type: 'all', 'query', or 'keywords'
  525. stats: CleanupStats object to track progress
  526. """
  527. # Build regex patterns
  528. patterns = []
  529. for mode in QUERY_MODES:
  530. if cleanup_type == "all":
  531. patterns.append(f"^{mode}:query:")
  532. patterns.append(f"^{mode}:keywords:")
  533. elif cleanup_type == "query":
  534. patterns.append(f"^{mode}:query:")
  535. elif cleanup_type == "keywords":
  536. patterns.append(f"^{mode}:keywords:")
  537. print("\n=== Starting Cleanup ===")
  538. print("💡 Executing MongoDB deleteMany operations\n")
  539. total_deleted = 0
  540. for idx, pattern in enumerate(patterns, 1):
  541. try:
  542. query = {"_id": {"$regex": pattern}}
  543. result = await storage._data.delete_many(query)
  544. deleted_count = result.deleted_count
  545. stats.total_batches += 1
  546. stats.successful_batches += 1
  547. stats.successfully_deleted += deleted_count
  548. total_deleted += deleted_count
  549. print(
  550. f"Pattern {idx}/{len(patterns)}: Deleted {deleted_count:,} records ✓"
  551. )
  552. except Exception as e:
  553. stats.add_error(idx, e, 0)
  554. print(
  555. f"Pattern {idx}/{len(patterns)}: ✗ FAILED - "
  556. f"{type(e).__name__}: {str(e)}"
  557. )
  558. print(f"\nTotal deleted: {total_deleted:,} records")
  559. async def delete_query_caches_opensearch(
  560. self, storage, cleanup_type: str, stats: CleanupStats
  561. ):
  562. """Delete query caches from OpenSearchKVStorage."""
  563. keys_to_delete = []
  564. async for hits in storage._iter_raw_docs(batch_size=self.batch_size):
  565. for hit in hits:
  566. key = hit["_id"]
  567. should_delete = False
  568. for mode in QUERY_MODES:
  569. if cleanup_type == "all":
  570. if key.startswith(f"{mode}:query:") or key.startswith(
  571. f"{mode}:keywords:"
  572. ):
  573. should_delete = True
  574. elif cleanup_type == "query":
  575. if key.startswith(f"{mode}:query:"):
  576. should_delete = True
  577. elif cleanup_type == "keywords":
  578. if key.startswith(f"{mode}:keywords:"):
  579. should_delete = True
  580. if should_delete:
  581. keys_to_delete.append(key)
  582. total_keys = len(keys_to_delete)
  583. stats.total_batches = (total_keys + self.batch_size - 1) // self.batch_size
  584. print("\n=== Starting Cleanup ===")
  585. print(
  586. f"💡 Processing {self.batch_size:,} records at a time from OpenSearchKVStorage\n"
  587. )
  588. for batch_idx in range(stats.total_batches):
  589. start_idx = batch_idx * self.batch_size
  590. end_idx = min((batch_idx + 1) * self.batch_size, total_keys)
  591. batch_keys = keys_to_delete[start_idx:end_idx]
  592. try:
  593. await storage.delete(batch_keys)
  594. stats.successful_batches += 1
  595. stats.successfully_deleted += len(batch_keys)
  596. progress = (stats.successfully_deleted / total_keys) * 100
  597. bar_length = 20
  598. filled_length = int(
  599. bar_length * stats.successfully_deleted // total_keys
  600. )
  601. bar = "█" * filled_length + "░" * (bar_length - filled_length)
  602. print(
  603. f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
  604. f"{stats.successfully_deleted:,}/{total_keys:,} ({progress:.1f}%) ✓"
  605. )
  606. except Exception as e:
  607. stats.add_error(batch_idx + 1, e, len(batch_keys))
  608. print(
  609. f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
  610. f"{type(e).__name__}: {str(e)}"
  611. )
  612. async def delete_query_caches(
  613. self, storage, storage_name: str, cleanup_type: str, stats: CleanupStats
  614. ):
  615. """Delete query caches from any storage type
  616. Args:
  617. storage: Storage instance
  618. storage_name: Storage type name
  619. cleanup_type: 'all', 'query', or 'keywords'
  620. stats: CleanupStats object to track progress
  621. """
  622. if storage_name == "JsonKVStorage":
  623. await self.delete_query_caches_json(storage, cleanup_type, stats)
  624. elif storage_name == "RedisKVStorage":
  625. await self.delete_query_caches_redis(storage, cleanup_type, stats)
  626. elif storage_name == "PGKVStorage":
  627. await self.delete_query_caches_pg(storage, cleanup_type, stats)
  628. elif storage_name == "MongoKVStorage":
  629. await self.delete_query_caches_mongo(storage, cleanup_type, stats)
  630. elif storage_name == "OpenSearchKVStorage":
  631. await self.delete_query_caches_opensearch(storage, cleanup_type, stats)
  632. else:
  633. raise ValueError(f"Unsupported storage type: {storage_name}")
  634. def print_header(self):
  635. """Print tool header"""
  636. print("\n" + "=" * 60)
  637. print("LLM Query Cache Cleanup Tool - LightRAG")
  638. print("=" * 60)
  639. def print_storage_types(self):
  640. """Print available storage types"""
  641. print("\nSupported KV Storage Types:")
  642. for key, value in STORAGE_TYPES.items():
  643. print(f"[{key}] {value}")
  644. def format_workspace(self, workspace: str) -> str:
  645. """Format workspace name with highlighting
  646. Args:
  647. workspace: Workspace name (may be empty)
  648. Returns:
  649. Formatted workspace string with ANSI color codes
  650. """
  651. if workspace:
  652. return f"{BOLD_CYAN}{workspace}{RESET}"
  653. else:
  654. return f"{BOLD_CYAN}(default){RESET}"
  655. def print_cache_statistics(self, counts: Dict[str, Dict[str, int]], title: str):
  656. """Print cache statistics in a formatted table
  657. Args:
  658. counts: Dictionary with counts for each mode and cache_type
  659. title: Title for the statistics display
  660. """
  661. print(f"\n{title}")
  662. print("┌" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┐")
  663. print(f"│ {'Mode':<10} │ {'Query':>10} │ {'Keywords':>10} │ {'Total':>10} │")
  664. print("├" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┤")
  665. total_query = 0
  666. total_keywords = 0
  667. for mode in QUERY_MODES:
  668. query_count = counts[mode]["query"]
  669. keywords_count = counts[mode]["keywords"]
  670. mode_total = query_count + keywords_count
  671. total_query += query_count
  672. total_keywords += keywords_count
  673. print(
  674. f"│ {mode:<10} │ {query_count:>10,} │ {keywords_count:>10,} │ {mode_total:>10,} │"
  675. )
  676. print("├" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┤")
  677. grand_total = total_query + total_keywords
  678. print(
  679. f"│ {'Total':<10} │ {total_query:>10,} │ {total_keywords:>10,} │ {grand_total:>10,} │"
  680. )
  681. print("└" + "─" * 12 + "┴" + "─" * 12 + "┴" + "─" * 12 + "┴" + "─" * 12 + "┘")
  682. def calculate_total_to_delete(
  683. self, counts: Dict[str, Dict[str, int]], cleanup_type: str
  684. ) -> int:
  685. """Calculate total number of records to delete
  686. Args:
  687. counts: Dictionary with counts for each mode and cache_type
  688. cleanup_type: 'all', 'query', or 'keywords'
  689. Returns:
  690. Total number of records to delete
  691. """
  692. total = 0
  693. for mode in QUERY_MODES:
  694. if cleanup_type == "all":
  695. total += counts[mode]["query"] + counts[mode]["keywords"]
  696. elif cleanup_type == "query":
  697. total += counts[mode]["query"]
  698. elif cleanup_type == "keywords":
  699. total += counts[mode]["keywords"]
  700. return total
  701. def print_cleanup_report(self, stats: CleanupStats):
  702. """Print comprehensive cleanup report
  703. Args:
  704. stats: CleanupStats object with cleanup results
  705. """
  706. print("\n" + "=" * 60)
  707. print("Cleanup Complete - Final Report")
  708. print("=" * 60)
  709. # Overall statistics
  710. print("\n📊 Statistics:")
  711. print(f" Total records to delete: {stats.total_to_delete:,}")
  712. print(f" Total batches: {stats.total_batches:,}")
  713. print(f" Successful batches: {stats.successful_batches:,}")
  714. print(f" Failed batches: {stats.failed_batches:,}")
  715. print(f" Successfully deleted: {stats.successfully_deleted:,}")
  716. print(f" Failed to delete: {stats.failed_to_delete:,}")
  717. # Success rate
  718. success_rate = (
  719. (stats.successfully_deleted / stats.total_to_delete * 100)
  720. if stats.total_to_delete > 0
  721. else 0
  722. )
  723. print(f" Success rate: {success_rate:.2f}%")
  724. # Before/After comparison
  725. print("\n📈 Before/After Comparison:")
  726. total_before = sum(
  727. counts["query"] + counts["keywords"]
  728. for counts in stats.counts_before.values()
  729. )
  730. total_after = sum(
  731. counts["query"] + counts["keywords"]
  732. for counts in stats.counts_after.values()
  733. )
  734. print(f" Total caches before: {total_before:,}")
  735. print(f" Total caches after: {total_after:,}")
  736. print(f" Net reduction: {total_before - total_after:,}")
  737. # Error details
  738. if stats.errors:
  739. print(f"\n⚠️ Errors encountered: {len(stats.errors)}")
  740. print("\nError Details:")
  741. print("-" * 60)
  742. # Group errors by type
  743. error_types = {}
  744. for error in stats.errors:
  745. err_type = error["error_type"]
  746. error_types[err_type] = error_types.get(err_type, 0) + 1
  747. print("\nError Summary:")
  748. for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]):
  749. print(f" - {err_type}: {count} occurrence(s)")
  750. print("\nFirst 5 errors:")
  751. for i, error in enumerate(stats.errors[:5], 1):
  752. print(f"\n {i}. Batch {error['batch']}")
  753. print(f" Type: {error['error_type']}")
  754. print(f" Message: {error['error_msg']}")
  755. print(f" Records lost: {error['records_lost']:,}")
  756. if len(stats.errors) > 5:
  757. print(f"\n ... and {len(stats.errors) - 5} more errors")
  758. print("\n" + "=" * 60)
  759. print(f"{BOLD_RED}⚠️ WARNING: Cleanup completed with errors!{RESET}")
  760. print(" Please review the error details above.")
  761. print("=" * 60)
  762. else:
  763. print("\n" + "=" * 60)
  764. print(f"{BOLD_GREEN}✓ SUCCESS: All records cleaned up successfully!{RESET}")
  765. print("=" * 60)
  766. async def setup_storage(self) -> tuple:
  767. """Setup and initialize storage
  768. Returns:
  769. Tuple of (storage_instance, storage_name, workspace)
  770. Returns (None, None, None) if user chooses to exit
  771. """
  772. print("\n=== Storage Setup ===")
  773. self.print_storage_types()
  774. num_options = len(STORAGE_TYPES)
  775. prompt_range = "1" if num_options == 1 else f"1-{num_options}"
  776. # Custom input handling with exit support
  777. while True:
  778. choice = input(
  779. f"\nSelect storage type ({prompt_range}) (Press Enter to exit): "
  780. ).strip()
  781. # Check for exit
  782. if choice == "" or choice == "0":
  783. print("\n✓ Cleanup cancelled by user")
  784. return None, None, None
  785. # Check if choice is valid
  786. if choice in STORAGE_TYPES:
  787. break
  788. print(
  789. f"✗ Invalid choice. Please enter one of: {', '.join(STORAGE_TYPES.keys())}"
  790. )
  791. storage_name = STORAGE_TYPES[choice]
  792. # Special warning for JsonKVStorage about concurrent access
  793. if storage_name == "JsonKVStorage":
  794. print("\n" + "=" * 60)
  795. print(f"{BOLD_RED}⚠️ IMPORTANT WARNING - JsonKVStorage Concurrency{RESET}")
  796. print("=" * 60)
  797. print("\nJsonKVStorage is an in-memory database that does NOT support")
  798. print("concurrent access to the same file by multiple programs.")
  799. print("\nBefore proceeding, please ensure that:")
  800. print(" • LightRAG Server is completely shut down")
  801. print(" • No other programs are accessing the storage files")
  802. print("\n" + "=" * 60)
  803. confirm = (
  804. input("\nHas LightRAG Server been shut down? (yes/no): ")
  805. .strip()
  806. .lower()
  807. )
  808. if confirm != "yes":
  809. print(
  810. "\n✓ Operation cancelled - Please shut down LightRAG Server first"
  811. )
  812. return None, None, None
  813. print("✓ Proceeding with JsonKVStorage cleanup...")
  814. # Check configuration (warnings only, doesn't block)
  815. print("\nChecking configuration...")
  816. self.check_env_vars(storage_name)
  817. # Get workspace
  818. workspace = self.get_workspace_for_storage(storage_name)
  819. # Initialize storage (real validation point)
  820. print("\nInitializing storage...")
  821. try:
  822. storage = await self.initialize_storage(storage_name, workspace)
  823. workspace = storage.workspace
  824. print(f"- Storage Type: {storage_name}")
  825. print(f"- Workspace: {workspace if workspace else '(default)'}")
  826. print("- Connection Status: ✓ Success")
  827. except Exception as e:
  828. print(f"✗ Initialization failed: {e}")
  829. print(f"\nFor {storage_name}, you can configure using:")
  830. print(" 1. Environment variables (highest priority)")
  831. # Show specific environment variable requirements
  832. if storage_name in STORAGE_ENV_REQUIREMENTS:
  833. for var in STORAGE_ENV_REQUIREMENTS[storage_name]:
  834. print(f" - {var}")
  835. print(" 2. config.ini file (medium priority)")
  836. if storage_name == "RedisKVStorage":
  837. print(" [redis]")
  838. print(" uri = redis://localhost:6379")
  839. elif storage_name == "PGKVStorage":
  840. print(" [postgres]")
  841. print(" host = localhost")
  842. print(" port = 5432")
  843. print(" user = postgres")
  844. print(" password = yourpassword")
  845. print(" database = lightrag")
  846. elif storage_name == "MongoKVStorage":
  847. print(" [mongodb]")
  848. print(" uri = mongodb://root:root@localhost:27017/")
  849. print(" database = LightRAG")
  850. elif storage_name == "OpenSearchKVStorage":
  851. print(" [opensearch]")
  852. print(" hosts = localhost:9200")
  853. return None, None, None
  854. return storage, storage_name, workspace
  855. async def run(self):
  856. """Run the cleanup tool"""
  857. try:
  858. # Initialize shared storage (REQUIRED for storage classes to work)
  859. from lightrag.kg.shared_storage import initialize_share_data
  860. initialize_share_data(workers=1)
  861. # Print header
  862. self.print_header()
  863. # Setup storage
  864. self.storage, storage_name, self.workspace = await self.setup_storage()
  865. # Check if user cancelled
  866. if self.storage is None:
  867. return
  868. # Count query caches
  869. print("\nCounting query cache records...")
  870. try:
  871. counts = await self.count_query_caches(self.storage, storage_name)
  872. except Exception as e:
  873. print(f"✗ Counting failed: {e}")
  874. await self.storage.finalize()
  875. return
  876. # Initialize stats
  877. stats = CleanupStats()
  878. stats.initialize_counts()
  879. stats.counts_before = counts
  880. # Print statistics
  881. self.print_cache_statistics(
  882. counts, "📊 Query Cache Statistics (Before Cleanup):"
  883. )
  884. # Calculate total
  885. total_caches = sum(
  886. counts[mode]["query"] + counts[mode]["keywords"] for mode in QUERY_MODES
  887. )
  888. if total_caches == 0:
  889. print("\n⚠️ No query caches found in storage")
  890. await self.storage.finalize()
  891. return
  892. # Select cleanup type
  893. print("\n=== Cleanup Options ===")
  894. print("[1] Delete all query caches (both query and keywords)")
  895. print("[2] Delete query caches only (keep keywords)")
  896. print("[3] Delete keywords caches only (keep query)")
  897. print("[0] Cancel")
  898. while True:
  899. choice = input("\nSelect cleanup option (0-3): ").strip()
  900. if choice == "0" or choice == "":
  901. print("\n✓ Cleanup cancelled")
  902. await self.storage.finalize()
  903. return
  904. elif choice == "1":
  905. cleanup_type = "all"
  906. elif choice == "2":
  907. cleanup_type = "query"
  908. elif choice == "3":
  909. cleanup_type = "keywords"
  910. else:
  911. print("✗ Invalid choice. Please enter 0, 1, 2, or 3")
  912. continue
  913. # Calculate total to delete for the selected type
  914. stats.total_to_delete = self.calculate_total_to_delete(
  915. counts, cleanup_type
  916. )
  917. # Check if there are any records to delete
  918. if stats.total_to_delete == 0:
  919. if cleanup_type == "all":
  920. print(f"\n{BOLD_RED}⚠️ No query caches found to delete!{RESET}")
  921. elif cleanup_type == "query":
  922. print(
  923. f"\n{BOLD_RED}⚠️ No query caches found to delete! (Only keywords exist){RESET}"
  924. )
  925. elif cleanup_type == "keywords":
  926. print(
  927. f"\n{BOLD_RED}⚠️ No keywords caches found to delete! (Only query caches exist){RESET}"
  928. )
  929. print(" Please select a different cleanup option.\n")
  930. continue
  931. # Valid selection with records to delete
  932. break
  933. # Confirm deletion
  934. print("\n" + "=" * 60)
  935. print("Cleanup Confirmation")
  936. print("=" * 60)
  937. print(
  938. f"Storage: {BOLD_CYAN}{storage_name}{RESET} "
  939. f"(workspace: {self.format_workspace(self.workspace)})"
  940. )
  941. print(f"Cleanup Type: {BOLD_CYAN}{cleanup_type}{RESET}")
  942. print(
  943. f"Records to Delete: {BOLD_RED}{stats.total_to_delete:,}{RESET} / {total_caches:,}"
  944. )
  945. if cleanup_type == "all":
  946. print(
  947. f"\n{BOLD_RED}⚠️ WARNING: This will delete ALL query caches across all modes!{RESET}"
  948. )
  949. elif cleanup_type == "query":
  950. print("\n⚠️ This will delete query caches only (keywords will be kept)")
  951. elif cleanup_type == "keywords":
  952. print("\n⚠️ This will delete keywords caches only (query will be kept)")
  953. confirm = input("\nContinue with deletion? (y/n): ").strip().lower()
  954. if confirm != "y":
  955. print("\n✓ Cleanup cancelled")
  956. await self.storage.finalize()
  957. return
  958. # Perform deletion
  959. await self.delete_query_caches(
  960. self.storage, storage_name, cleanup_type, stats
  961. )
  962. # Persist changes
  963. print("\nPersisting changes to storage...")
  964. try:
  965. await self.storage.index_done_callback()
  966. print("✓ Changes persisted successfully")
  967. except Exception as e:
  968. print(f"✗ Persist failed: {e}")
  969. stats.add_error(0, e, 0)
  970. # Count again to verify
  971. print("\nVerifying cleanup results...")
  972. try:
  973. stats.counts_after = await self.count_query_caches(
  974. self.storage, storage_name
  975. )
  976. except Exception as e:
  977. print(f"⚠️ Verification failed: {e}")
  978. # Use zero counts if verification fails
  979. stats.counts_after = {
  980. mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES
  981. }
  982. # Print final report
  983. self.print_cleanup_report(stats)
  984. # Print after statistics
  985. self.print_cache_statistics(
  986. stats.counts_after, "\n📊 Query Cache Statistics (After Cleanup):"
  987. )
  988. # Cleanup
  989. await self.storage.finalize()
  990. except KeyboardInterrupt:
  991. print("\n\n✗ Cleanup interrupted by user")
  992. except Exception as e:
  993. print(f"\n✗ Cleanup failed: {e}")
  994. import traceback
  995. traceback.print_exc()
  996. finally:
  997. # Ensure cleanup
  998. if self.storage:
  999. try:
  1000. await self.storage.finalize()
  1001. except Exception:
  1002. pass
  1003. # Finalize shared storage
  1004. try:
  1005. from lightrag.kg.shared_storage import finalize_share_data
  1006. finalize_share_data()
  1007. except Exception:
  1008. pass
  1009. async def async_main():
  1010. """Async main entry point"""
  1011. tool = CleanupTool()
  1012. await tool.run()
  1013. def main():
  1014. """Synchronous entry point for CLI command"""
  1015. asyncio.run(async_main())
  1016. if __name__ == "__main__":
  1017. main()