migrate_llm_cache.py 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560
  1. #!/usr/bin/env python3
  2. """
  3. LLM Cache Migration Tool for LightRAG
  4. This tool migrates LLM response cache (default:extract:* and default:summary:*)
  5. between different KV storage implementations while preserving workspace isolation.
  6. Usage:
  7. python -m lightrag.tools.migrate_llm_cache
  8. # or
  9. python lightrag/tools/migrate_llm_cache.py
  10. Supported KV Storage Types:
  11. - JsonKVStorage
  12. - RedisKVStorage
  13. - PGKVStorage
  14. - MongoKVStorage
  15. - OpenSearchKVStorage
  16. """
  17. import asyncio
  18. import os
  19. import sys
  20. import time
  21. from typing import Any, Dict, List
  22. from dataclasses import dataclass, field
  23. from dotenv import load_dotenv
  24. # Add project root to path for imports
  25. sys.path.insert(
  26. 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  27. )
  28. from lightrag.kg import STORAGE_ENV_REQUIREMENTS
  29. from lightrag.namespace import NameSpace
  30. from lightrag.utils import setup_logger
  31. # Load environment variables
  32. # use the .env that is inside the current folder
  33. # allows to use different .env file for each lightrag instance
  34. # the OS environment variables take precedence over the .env file
  35. load_dotenv(dotenv_path=".env", override=False)
  36. # Setup logger
  37. setup_logger("lightrag", level="INFO")
  38. # Storage type configurations
  39. STORAGE_TYPES = {
  40. "1": "JsonKVStorage",
  41. "2": "RedisKVStorage",
  42. "3": "PGKVStorage",
  43. "4": "MongoKVStorage",
  44. "5": "OpenSearchKVStorage",
  45. }
  46. # Workspace environment variable mapping
  47. WORKSPACE_ENV_MAP = {
  48. "PGKVStorage": "POSTGRES_WORKSPACE",
  49. "MongoKVStorage": "MONGODB_WORKSPACE",
  50. "RedisKVStorage": "REDIS_WORKSPACE",
  51. "OpenSearchKVStorage": "OPENSEARCH_WORKSPACE",
  52. }
  53. # Default batch size for migration
  54. DEFAULT_BATCH_SIZE = 1000
  55. # Default count batch size for efficient counting
  56. DEFAULT_COUNT_BATCH_SIZE = 1000
  57. # ANSI color codes for terminal output
  58. BOLD_CYAN = "\033[1;36m"
  59. RESET = "\033[0m"
  60. @dataclass
  61. class MigrationStats:
  62. """Migration statistics and error tracking"""
  63. total_source_records: int = 0
  64. total_batches: int = 0
  65. successful_batches: int = 0
  66. failed_batches: int = 0
  67. successful_records: int = 0
  68. failed_records: int = 0
  69. errors: List[Dict[str, Any]] = field(default_factory=list)
  70. def add_error(self, batch_idx: int, error: Exception, batch_size: int):
  71. """Record batch error"""
  72. self.errors.append(
  73. {
  74. "batch": batch_idx,
  75. "error_type": type(error).__name__,
  76. "error_msg": str(error),
  77. "records_lost": batch_size,
  78. "timestamp": time.time(),
  79. }
  80. )
  81. self.failed_batches += 1
  82. self.failed_records += batch_size
  83. class MigrationTool:
  84. """LLM Cache Migration Tool"""
  85. def __init__(self):
  86. self.source_storage = None
  87. self.target_storage = None
  88. self.source_workspace = ""
  89. self.target_workspace = ""
  90. self.batch_size = DEFAULT_BATCH_SIZE
  91. def get_workspace_for_storage(self, storage_name: str) -> str:
  92. """Get workspace for a specific storage type
  93. Priority: Storage-specific env var > WORKSPACE env var > empty string
  94. Args:
  95. storage_name: Storage implementation name
  96. Returns:
  97. Workspace name
  98. """
  99. # Check storage-specific workspace
  100. if storage_name in WORKSPACE_ENV_MAP:
  101. specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name])
  102. if specific_workspace:
  103. return specific_workspace
  104. # Check generic WORKSPACE
  105. workspace = os.getenv("WORKSPACE", "")
  106. return workspace
  107. def check_config_ini_for_storage(self, storage_name: str) -> bool:
  108. """Check if config.ini has configuration for the storage type
  109. Args:
  110. storage_name: Storage implementation name
  111. Returns:
  112. True if config.ini has the necessary configuration
  113. """
  114. try:
  115. import configparser
  116. config = configparser.ConfigParser()
  117. config.read("config.ini", "utf-8")
  118. if storage_name == "RedisKVStorage":
  119. return config.has_option("redis", "uri")
  120. elif storage_name == "PGKVStorage":
  121. return (
  122. config.has_option("postgres", "user")
  123. and config.has_option("postgres", "password")
  124. and config.has_option("postgres", "database")
  125. )
  126. elif storage_name == "MongoKVStorage":
  127. return config.has_option("mongodb", "uri") and config.has_option(
  128. "mongodb", "database"
  129. )
  130. elif storage_name == "OpenSearchKVStorage":
  131. return config.has_option("opensearch", "hosts")
  132. return False
  133. except Exception:
  134. return False
  135. def check_env_vars(self, storage_name: str) -> bool:
  136. """Check environment variables, show warnings if missing but don't fail
  137. Args:
  138. storage_name: Storage implementation name
  139. Returns:
  140. Always returns True (warnings only, no hard failure)
  141. """
  142. required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
  143. if not required_vars:
  144. print("✓ No environment variables required")
  145. return True
  146. missing_vars = [var for var in required_vars if var not in os.environ]
  147. if missing_vars:
  148. print(
  149. f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}"
  150. )
  151. # Check if config.ini has configuration
  152. has_config = self.check_config_ini_for_storage(storage_name)
  153. if has_config:
  154. print(" ✓ Found configuration in config.ini")
  155. else:
  156. print(f" Will attempt to use defaults for {storage_name}")
  157. return True
  158. print("✓ All required environment variables are set")
  159. return True
  160. def count_available_storage_types(self) -> int:
  161. """Count available storage types (with env vars, config.ini, or defaults)
  162. Returns:
  163. Number of available storage types
  164. """
  165. available_count = 0
  166. for storage_name in STORAGE_TYPES.values():
  167. # Check if storage requires configuration
  168. required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
  169. if not required_vars:
  170. # JsonKVStorage, MongoKVStorage etc. - no config needed
  171. available_count += 1
  172. else:
  173. # Check if has environment variables
  174. has_env = all(var in os.environ for var in required_vars)
  175. if has_env:
  176. available_count += 1
  177. else:
  178. # Check if has config.ini configuration
  179. has_config = self.check_config_ini_for_storage(storage_name)
  180. if has_config:
  181. available_count += 1
  182. return available_count
  183. def get_storage_class(self, storage_name: str):
  184. """Dynamically import and return storage class
  185. Args:
  186. storage_name: Storage implementation name
  187. Returns:
  188. Storage class
  189. """
  190. if storage_name == "JsonKVStorage":
  191. from lightrag.kg.json_kv_impl import JsonKVStorage
  192. return JsonKVStorage
  193. elif storage_name == "RedisKVStorage":
  194. from lightrag.kg.redis_impl import RedisKVStorage
  195. return RedisKVStorage
  196. elif storage_name == "PGKVStorage":
  197. from lightrag.kg.postgres_impl import PGKVStorage
  198. return PGKVStorage
  199. elif storage_name == "MongoKVStorage":
  200. from lightrag.kg.mongo_impl import MongoKVStorage
  201. return MongoKVStorage
  202. elif storage_name == "OpenSearchKVStorage":
  203. from lightrag.kg.opensearch_impl import OpenSearchKVStorage
  204. return OpenSearchKVStorage
  205. else:
  206. raise ValueError(f"Unsupported storage type: {storage_name}")
  207. async def initialize_storage(self, storage_name: str, workspace: str):
  208. """Initialize storage instance with fallback to config.ini and defaults
  209. Args:
  210. storage_name: Storage implementation name
  211. workspace: Workspace name
  212. Returns:
  213. Initialized storage instance
  214. Raises:
  215. Exception: If initialization fails
  216. """
  217. storage_class = self.get_storage_class(storage_name)
  218. # Create global config
  219. global_config = {
  220. "working_dir": os.getenv("WORKING_DIR", "./rag_storage"),
  221. "embedding_batch_num": 10,
  222. }
  223. # Initialize storage
  224. storage = storage_class(
  225. namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
  226. workspace=workspace,
  227. global_config=global_config,
  228. embedding_func=None,
  229. )
  230. # Initialize the storage (may raise exception if connection fails)
  231. await storage.initialize()
  232. return storage
  233. async def get_default_caches_json(self, storage) -> Dict[str, Any]:
  234. """Get default caches from JsonKVStorage
  235. Args:
  236. storage: JsonKVStorage instance
  237. Returns:
  238. Dictionary of cache entries with default:extract:* or default:summary:* keys
  239. """
  240. # Access _data directly - it's a dict from shared_storage
  241. async with storage._storage_lock:
  242. filtered = {}
  243. for key, value in storage._data.items():
  244. if key.startswith("default:extract:") or key.startswith(
  245. "default:summary:"
  246. ):
  247. filtered[key] = value.copy()
  248. return filtered
  249. async def get_default_caches_redis(
  250. self, storage, batch_size: int = 1000
  251. ) -> Dict[str, Any]:
  252. """Get default caches from RedisKVStorage with pagination
  253. Args:
  254. storage: RedisKVStorage instance
  255. batch_size: Number of keys to process per batch
  256. Returns:
  257. Dictionary of cache entries with default:extract:* or default:summary:* keys
  258. """
  259. import json
  260. cache_data = {}
  261. # Use _get_redis_connection() context manager
  262. async with storage._get_redis_connection() as redis:
  263. for pattern in ["default:extract:*", "default:summary:*"]:
  264. # Add namespace prefix to pattern
  265. prefixed_pattern = f"{storage.final_namespace}:{pattern}"
  266. cursor = 0
  267. while True:
  268. # SCAN already implements cursor-based pagination
  269. cursor, keys = await redis.scan(
  270. cursor, match=prefixed_pattern, count=batch_size
  271. )
  272. if keys:
  273. # Process this batch using pipeline with error handling
  274. try:
  275. pipe = redis.pipeline()
  276. for key in keys:
  277. pipe.get(key)
  278. values = await pipe.execute()
  279. for key, value in zip(keys, values):
  280. if value:
  281. key_str = (
  282. key.decode() if isinstance(key, bytes) else key
  283. )
  284. # Remove namespace prefix to get original key
  285. original_key = key_str.replace(
  286. f"{storage.final_namespace}:", "", 1
  287. )
  288. cache_data[original_key] = json.loads(value)
  289. except Exception as e:
  290. # Pipeline execution failed, fall back to individual gets
  291. print(
  292. f"⚠️ Pipeline execution failed for batch, using individual gets: {e}"
  293. )
  294. for key in keys:
  295. try:
  296. value = await redis.get(key)
  297. if value:
  298. key_str = (
  299. key.decode()
  300. if isinstance(key, bytes)
  301. else key
  302. )
  303. original_key = key_str.replace(
  304. f"{storage.final_namespace}:", "", 1
  305. )
  306. cache_data[original_key] = json.loads(value)
  307. except Exception as individual_error:
  308. print(
  309. f"⚠️ Failed to get individual key {key}: {individual_error}"
  310. )
  311. continue
  312. if cursor == 0:
  313. break
  314. # Yield control periodically to avoid blocking
  315. await asyncio.sleep(0)
  316. return cache_data
  317. async def get_default_caches_pg(
  318. self, storage, batch_size: int = 1000
  319. ) -> Dict[str, Any]:
  320. """Get default caches from PGKVStorage with pagination
  321. Args:
  322. storage: PGKVStorage instance
  323. batch_size: Number of records to fetch per batch
  324. Returns:
  325. Dictionary of cache entries with default:extract:* or default:summary:* keys
  326. """
  327. from lightrag.kg.postgres_impl import namespace_to_table_name
  328. cache_data = {}
  329. table_name = namespace_to_table_name(storage.namespace)
  330. offset = 0
  331. while True:
  332. # Use LIMIT and OFFSET for pagination
  333. query = f"""
  334. SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam,
  335. EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
  336. EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
  337. FROM {table_name}
  338. WHERE workspace = $1
  339. AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
  340. ORDER BY id
  341. LIMIT $2 OFFSET $3
  342. """
  343. results = await storage.db.query(
  344. query, [storage.workspace, batch_size, offset], multirows=True
  345. )
  346. if not results:
  347. break
  348. for row in results:
  349. # Map PostgreSQL fields to cache format
  350. cache_entry = {
  351. "return": row.get("return_value", ""),
  352. "cache_type": row.get("cache_type"),
  353. "original_prompt": row.get("original_prompt", ""),
  354. "chunk_id": row.get("chunk_id"),
  355. "queryparam": row.get("queryparam"),
  356. "create_time": row.get("create_time", 0),
  357. "update_time": row.get("update_time", 0),
  358. }
  359. cache_data[row["key"]] = cache_entry
  360. # If we got fewer results than batch_size, we're done
  361. if len(results) < batch_size:
  362. break
  363. offset += batch_size
  364. # Yield control periodically
  365. await asyncio.sleep(0)
  366. return cache_data
  367. async def get_default_caches_mongo(
  368. self, storage, batch_size: int = 1000
  369. ) -> Dict[str, Any]:
  370. """Get default caches from MongoKVStorage with cursor-based pagination
  371. Args:
  372. storage: MongoKVStorage instance
  373. batch_size: Number of documents to process per batch
  374. Returns:
  375. Dictionary of cache entries with default:extract:* or default:summary:* keys
  376. """
  377. cache_data = {}
  378. # MongoDB query with regex - use _data not collection
  379. query = {"_id": {"$regex": "^default:(extract|summary):"}}
  380. # Use cursor without to_list() - process in batches
  381. cursor = storage._data.find(query).batch_size(batch_size)
  382. async for doc in cursor:
  383. # Process each document as it comes
  384. doc_copy = doc.copy()
  385. key = doc_copy.pop("_id")
  386. # Filter ALL MongoDB/database-specific fields
  387. # Following .clinerules: "Always filter deprecated/incompatible fields during deserialization"
  388. for field_name in ["namespace", "workspace", "_id", "content"]:
  389. doc_copy.pop(field_name, None)
  390. cache_data[key] = doc_copy.copy()
  391. # Periodically yield control (every batch_size documents)
  392. if len(cache_data) % batch_size == 0:
  393. await asyncio.sleep(0)
  394. return cache_data
  395. async def get_default_caches_opensearch(
  396. self, storage, batch_size: int = 1000
  397. ) -> Dict[str, Any]:
  398. """Get default caches from OpenSearchKVStorage."""
  399. cache_data = {}
  400. async for hits in storage._iter_raw_docs(batch_size=batch_size):
  401. for hit in hits:
  402. key = hit["_id"]
  403. if key.startswith("default:extract:") or key.startswith(
  404. "default:summary:"
  405. ):
  406. cache_data[key] = hit["_source"].copy()
  407. return cache_data
  408. async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]:
  409. """Get default caches from any storage type
  410. Args:
  411. storage: Storage instance
  412. storage_name: Storage type name
  413. Returns:
  414. Dictionary of cache entries
  415. """
  416. if storage_name == "JsonKVStorage":
  417. return await self.get_default_caches_json(storage)
  418. elif storage_name == "RedisKVStorage":
  419. return await self.get_default_caches_redis(storage)
  420. elif storage_name == "PGKVStorage":
  421. return await self.get_default_caches_pg(storage)
  422. elif storage_name == "MongoKVStorage":
  423. return await self.get_default_caches_mongo(storage)
  424. elif storage_name == "OpenSearchKVStorage":
  425. return await self.get_default_caches_opensearch(storage)
  426. else:
  427. raise ValueError(f"Unsupported storage type: {storage_name}")
  428. async def count_default_caches_json(self, storage) -> int:
  429. """Count default caches in JsonKVStorage - O(N) but very fast in-memory
  430. Args:
  431. storage: JsonKVStorage instance
  432. Returns:
  433. Total count of cache records
  434. """
  435. async with storage._storage_lock:
  436. return sum(
  437. 1
  438. for key in storage._data.keys()
  439. if key.startswith("default:extract:")
  440. or key.startswith("default:summary:")
  441. )
  442. async def count_default_caches_redis(self, storage) -> int:
  443. """Count default caches in RedisKVStorage using SCAN with progress display
  444. Args:
  445. storage: RedisKVStorage instance
  446. Returns:
  447. Total count of cache records
  448. """
  449. count = 0
  450. print("Scanning Redis keys...", end="", flush=True)
  451. async with storage._get_redis_connection() as redis:
  452. for pattern in ["default:extract:*", "default:summary:*"]:
  453. prefixed_pattern = f"{storage.final_namespace}:{pattern}"
  454. cursor = 0
  455. while True:
  456. cursor, keys = await redis.scan(
  457. cursor, match=prefixed_pattern, count=DEFAULT_COUNT_BATCH_SIZE
  458. )
  459. count += len(keys)
  460. # Show progress
  461. print(
  462. f"\rScanning Redis keys... found {count:,} records",
  463. end="",
  464. flush=True,
  465. )
  466. if cursor == 0:
  467. break
  468. print() # New line after progress
  469. return count
  470. async def count_default_caches_pg(self, storage) -> int:
  471. """Count default caches in PostgreSQL using COUNT(*) with progress indicator
  472. Args:
  473. storage: PGKVStorage instance
  474. Returns:
  475. Total count of cache records
  476. """
  477. from lightrag.kg.postgres_impl import namespace_to_table_name
  478. table_name = namespace_to_table_name(storage.namespace)
  479. query = f"""
  480. SELECT COUNT(*) as count
  481. FROM {table_name}
  482. WHERE workspace = $1
  483. AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
  484. """
  485. print("Counting PostgreSQL records...", end="", flush=True)
  486. start_time = time.time()
  487. result = await storage.db.query(query, [storage.workspace])
  488. elapsed = time.time() - start_time
  489. if elapsed > 1:
  490. print(f" (took {elapsed:.1f}s)", end="")
  491. print() # New line
  492. return result["count"] if result else 0
  493. async def count_default_caches_mongo(self, storage) -> int:
  494. """Count default caches in MongoDB using count_documents with progress indicator
  495. Args:
  496. storage: MongoKVStorage instance
  497. Returns:
  498. Total count of cache records
  499. """
  500. query = {"_id": {"$regex": "^default:(extract|summary):"}}
  501. print("Counting MongoDB documents...", end="", flush=True)
  502. start_time = time.time()
  503. count = await storage._data.count_documents(query)
  504. elapsed = time.time() - start_time
  505. if elapsed > 1:
  506. print(f" (took {elapsed:.1f}s)", end="")
  507. print() # New line
  508. return count
  509. async def count_default_caches_opensearch(self, storage) -> int:
  510. """Count default caches in OpenSearch using PIT pagination."""
  511. count = 0
  512. print("Scanning OpenSearch documents...", end="", flush=True)
  513. start_time = time.time()
  514. async for hits in storage._iter_raw_docs(batch_size=DEFAULT_COUNT_BATCH_SIZE):
  515. for hit in hits:
  516. key = hit["_id"]
  517. if key.startswith("default:extract:") or key.startswith(
  518. "default:summary:"
  519. ):
  520. count += 1
  521. elapsed = time.time() - start_time
  522. if elapsed > 1:
  523. print(f" (took {elapsed:.1f}s)", end="")
  524. print()
  525. return count
  526. async def count_default_caches(self, storage, storage_name: str) -> int:
  527. """Count default caches from any storage type efficiently
  528. Args:
  529. storage: Storage instance
  530. storage_name: Storage type name
  531. Returns:
  532. Total count of cache records
  533. """
  534. if storage_name == "JsonKVStorage":
  535. return await self.count_default_caches_json(storage)
  536. elif storage_name == "RedisKVStorage":
  537. return await self.count_default_caches_redis(storage)
  538. elif storage_name == "PGKVStorage":
  539. return await self.count_default_caches_pg(storage)
  540. elif storage_name == "MongoKVStorage":
  541. return await self.count_default_caches_mongo(storage)
  542. elif storage_name == "OpenSearchKVStorage":
  543. return await self.count_default_caches_opensearch(storage)
  544. else:
  545. raise ValueError(f"Unsupported storage type: {storage_name}")
  546. async def stream_default_caches_json(self, storage, batch_size: int):
  547. """Stream default caches from JsonKVStorage - yields batches
  548. Args:
  549. storage: JsonKVStorage instance
  550. batch_size: Size of each batch to yield
  551. Yields:
  552. Dictionary batches of cache entries
  553. Note:
  554. This method creates a snapshot of matching items while holding the lock,
  555. then releases the lock before yielding batches. This prevents deadlock
  556. when the target storage (also JsonKVStorage) tries to acquire the same
  557. lock during upsert operations.
  558. """
  559. # Create a snapshot of matching items while holding the lock
  560. async with storage._storage_lock:
  561. matching_items = [
  562. (key, value)
  563. for key, value in storage._data.items()
  564. if key.startswith("default:extract:")
  565. or key.startswith("default:summary:")
  566. ]
  567. # Now iterate over snapshot without holding lock
  568. batch = {}
  569. for key, value in matching_items:
  570. batch[key] = value.copy()
  571. if len(batch) >= batch_size:
  572. yield batch
  573. batch = {}
  574. # Yield remaining items
  575. if batch:
  576. yield batch
  577. async def stream_default_caches_redis(self, storage, batch_size: int):
  578. """Stream default caches from RedisKVStorage - yields batches
  579. Args:
  580. storage: RedisKVStorage instance
  581. batch_size: Size of each batch to yield
  582. Yields:
  583. Dictionary batches of cache entries
  584. """
  585. import json
  586. async with storage._get_redis_connection() as redis:
  587. for pattern in ["default:extract:*", "default:summary:*"]:
  588. prefixed_pattern = f"{storage.final_namespace}:{pattern}"
  589. cursor = 0
  590. while True:
  591. cursor, keys = await redis.scan(
  592. cursor, match=prefixed_pattern, count=batch_size
  593. )
  594. if keys:
  595. try:
  596. pipe = redis.pipeline()
  597. for key in keys:
  598. pipe.get(key)
  599. values = await pipe.execute()
  600. batch = {}
  601. for key, value in zip(keys, values):
  602. if value:
  603. key_str = (
  604. key.decode() if isinstance(key, bytes) else key
  605. )
  606. original_key = key_str.replace(
  607. f"{storage.final_namespace}:", "", 1
  608. )
  609. batch[original_key] = json.loads(value)
  610. if batch:
  611. yield batch
  612. except Exception as e:
  613. print(f"⚠️ Pipeline execution failed for batch: {e}")
  614. # Fall back to individual gets
  615. batch = {}
  616. for key in keys:
  617. try:
  618. value = await redis.get(key)
  619. if value:
  620. key_str = (
  621. key.decode()
  622. if isinstance(key, bytes)
  623. else key
  624. )
  625. original_key = key_str.replace(
  626. f"{storage.final_namespace}:", "", 1
  627. )
  628. batch[original_key] = json.loads(value)
  629. except Exception as individual_error:
  630. print(
  631. f"⚠️ Failed to get individual key {key}: {individual_error}"
  632. )
  633. continue
  634. if batch:
  635. yield batch
  636. if cursor == 0:
  637. break
  638. await asyncio.sleep(0)
  639. async def stream_default_caches_pg(self, storage, batch_size: int):
  640. """Stream default caches from PostgreSQL - yields batches
  641. Args:
  642. storage: PGKVStorage instance
  643. batch_size: Size of each batch to yield
  644. Yields:
  645. Dictionary batches of cache entries
  646. """
  647. from lightrag.kg.postgres_impl import namespace_to_table_name
  648. table_name = namespace_to_table_name(storage.namespace)
  649. offset = 0
  650. while True:
  651. query = f"""
  652. SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam,
  653. EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
  654. EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
  655. FROM {table_name}
  656. WHERE workspace = $1
  657. AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
  658. ORDER BY id
  659. LIMIT $2 OFFSET $3
  660. """
  661. results = await storage.db.query(
  662. query, [storage.workspace, batch_size, offset], multirows=True
  663. )
  664. if not results:
  665. break
  666. batch = {}
  667. for row in results:
  668. cache_entry = {
  669. "return": row.get("return_value", ""),
  670. "cache_type": row.get("cache_type"),
  671. "original_prompt": row.get("original_prompt", ""),
  672. "chunk_id": row.get("chunk_id"),
  673. "queryparam": row.get("queryparam"),
  674. "create_time": row.get("create_time", 0),
  675. "update_time": row.get("update_time", 0),
  676. }
  677. batch[row["key"]] = cache_entry
  678. if batch:
  679. yield batch
  680. if len(results) < batch_size:
  681. break
  682. offset += batch_size
  683. await asyncio.sleep(0)
  684. async def stream_default_caches_mongo(self, storage, batch_size: int):
  685. """Stream default caches from MongoDB - yields batches
  686. Args:
  687. storage: MongoKVStorage instance
  688. batch_size: Size of each batch to yield
  689. Yields:
  690. Dictionary batches of cache entries
  691. """
  692. query = {"_id": {"$regex": "^default:(extract|summary):"}}
  693. cursor = storage._data.find(query).batch_size(batch_size)
  694. batch = {}
  695. async for doc in cursor:
  696. doc_copy = doc.copy()
  697. key = doc_copy.pop("_id")
  698. # Filter MongoDB/database-specific fields
  699. for field_name in ["namespace", "workspace", "_id", "content"]:
  700. doc_copy.pop(field_name, None)
  701. batch[key] = doc_copy.copy()
  702. if len(batch) >= batch_size:
  703. yield batch
  704. batch = {}
  705. # Yield remaining items
  706. if batch:
  707. yield batch
  708. async def stream_default_caches_opensearch(self, storage, batch_size: int):
  709. """Stream default caches from OpenSearchKVStorage - yields batches."""
  710. batch = {}
  711. async for hits in storage._iter_raw_docs(batch_size=batch_size):
  712. for hit in hits:
  713. key = hit["_id"]
  714. if key.startswith("default:extract:") or key.startswith(
  715. "default:summary:"
  716. ):
  717. batch[key] = hit["_source"].copy()
  718. if len(batch) >= batch_size:
  719. yield batch
  720. batch = {}
  721. if batch:
  722. yield batch
  723. async def stream_default_caches(
  724. self, storage, storage_name: str, batch_size: int = None
  725. ):
  726. """Stream default caches from any storage type - unified interface
  727. Args:
  728. storage: Storage instance
  729. storage_name: Storage type name
  730. batch_size: Size of each batch to yield (defaults to self.batch_size)
  731. Yields:
  732. Dictionary batches of cache entries
  733. """
  734. if batch_size is None:
  735. batch_size = self.batch_size
  736. if storage_name == "JsonKVStorage":
  737. async for batch in self.stream_default_caches_json(storage, batch_size):
  738. yield batch
  739. elif storage_name == "RedisKVStorage":
  740. async for batch in self.stream_default_caches_redis(storage, batch_size):
  741. yield batch
  742. elif storage_name == "PGKVStorage":
  743. async for batch in self.stream_default_caches_pg(storage, batch_size):
  744. yield batch
  745. elif storage_name == "MongoKVStorage":
  746. async for batch in self.stream_default_caches_mongo(storage, batch_size):
  747. yield batch
  748. elif storage_name == "OpenSearchKVStorage":
  749. async for batch in self.stream_default_caches_opensearch(
  750. storage, batch_size
  751. ):
  752. yield batch
  753. else:
  754. raise ValueError(f"Unsupported storage type: {storage_name}")
  755. async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]:
  756. """Count cache entries by type
  757. Args:
  758. cache_data: Dictionary of cache entries
  759. Returns:
  760. Dictionary with counts for each cache type
  761. """
  762. counts = {
  763. "extract": 0,
  764. "summary": 0,
  765. }
  766. for key in cache_data.keys():
  767. if key.startswith("default:extract:"):
  768. counts["extract"] += 1
  769. elif key.startswith("default:summary:"):
  770. counts["summary"] += 1
  771. return counts
  772. def print_header(self):
  773. """Print tool header"""
  774. print("\n" + "=" * 50)
  775. print("LLM Cache Migration Tool - LightRAG")
  776. print("=" * 50)
  777. def print_storage_types(self):
  778. """Print available storage types"""
  779. print("\nSupported KV Storage Types:")
  780. for key, value in STORAGE_TYPES.items():
  781. print(f"[{key}] {value}")
  782. def format_workspace(self, workspace: str) -> str:
  783. """Format workspace name with highlighting
  784. Args:
  785. workspace: Workspace name (may be empty)
  786. Returns:
  787. Formatted workspace string with ANSI color codes
  788. """
  789. if workspace:
  790. return f"{BOLD_CYAN}{workspace}{RESET}"
  791. else:
  792. return f"{BOLD_CYAN}(default){RESET}"
  793. def format_storage_name(self, storage_name: str) -> str:
  794. """Format storage type name with highlighting
  795. Args:
  796. storage_name: Storage type name
  797. Returns:
  798. Formatted storage name string with ANSI color codes
  799. """
  800. return f"{BOLD_CYAN}{storage_name}{RESET}"
  801. async def setup_storage(
  802. self,
  803. storage_type: str,
  804. use_streaming: bool = False,
  805. exclude_storage_name: str = None,
  806. ) -> tuple:
  807. """Setup and initialize storage with config.ini fallback support
  808. Args:
  809. storage_type: Type label (source/target)
  810. use_streaming: If True, only count records without loading. If False, load all data (legacy mode)
  811. exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source)
  812. Returns:
  813. Tuple of (storage_instance, storage_name, workspace, total_count)
  814. Returns (None, None, None, 0) if user chooses to exit
  815. """
  816. print(f"\n=== {storage_type} Storage Setup ===")
  817. # Filter and remap available storage types if exclusion is specified
  818. if exclude_storage_name:
  819. # Get available storage types (excluding source)
  820. available_list = [
  821. (k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name
  822. ]
  823. # Remap to sequential numbering (1, 2, 3...)
  824. remapped_types = {
  825. str(i + 1): name for i, (_, name) in enumerate(available_list)
  826. }
  827. # Print available types with new sequential numbers
  828. print(
  829. f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):"
  830. )
  831. for key, value in remapped_types.items():
  832. print(f"[{key}] {value}")
  833. available_types = remapped_types
  834. else:
  835. # For source storage, use original numbering
  836. available_types = STORAGE_TYPES.copy()
  837. self.print_storage_types()
  838. # Generate dynamic prompt based on number of options
  839. num_options = len(available_types)
  840. if num_options == 1:
  841. prompt_range = "1"
  842. else:
  843. prompt_range = f"1-{num_options}"
  844. # Custom input handling with exit support
  845. while True:
  846. choice = input(
  847. f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter to exit): "
  848. ).strip()
  849. # Check for exit
  850. if choice == "" or choice == "0":
  851. print("\n✓ Migration cancelled by user")
  852. return None, None, None, 0
  853. # Check if choice is valid
  854. if choice in available_types:
  855. break
  856. print(
  857. f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}"
  858. )
  859. storage_name = available_types[choice]
  860. # Check configuration (warnings only, doesn't block)
  861. print("\nChecking configuration...")
  862. self.check_env_vars(storage_name)
  863. # Get workspace
  864. workspace = self.get_workspace_for_storage(storage_name)
  865. # Initialize storage (real validation point)
  866. print(f"\nInitializing {storage_type} storage...")
  867. try:
  868. storage = await self.initialize_storage(storage_name, workspace)
  869. workspace = storage.workspace
  870. print(f"- Storage Type: {storage_name}")
  871. print(f"- Workspace: {workspace if workspace else '(default)'}")
  872. print("- Connection Status: ✓ Success")
  873. # Show configuration source for transparency
  874. if storage_name == "RedisKVStorage":
  875. config_source = (
  876. "environment variable"
  877. if "REDIS_URI" in os.environ
  878. else "config.ini or default"
  879. )
  880. print(f"- Configuration Source: {config_source}")
  881. elif storage_name == "PGKVStorage":
  882. config_source = (
  883. "environment variables"
  884. if all(
  885. var in os.environ
  886. for var in STORAGE_ENV_REQUIREMENTS[storage_name]
  887. )
  888. else "config.ini or defaults"
  889. )
  890. print(f"- Configuration Source: {config_source}")
  891. elif storage_name == "MongoKVStorage":
  892. config_source = (
  893. "environment variables"
  894. if all(
  895. var in os.environ
  896. for var in STORAGE_ENV_REQUIREMENTS[storage_name]
  897. )
  898. else "config.ini or defaults"
  899. )
  900. print(f"- Configuration Source: {config_source}")
  901. elif storage_name == "OpenSearchKVStorage":
  902. config_source = (
  903. "environment variables"
  904. if all(
  905. var in os.environ
  906. for var in STORAGE_ENV_REQUIREMENTS[storage_name]
  907. )
  908. else "config.ini or defaults"
  909. )
  910. print(f"- Configuration Source: {config_source}")
  911. except Exception as e:
  912. print(f"✗ Initialization failed: {e}")
  913. print(f"\nFor {storage_name}, you can configure using:")
  914. print(" 1. Environment variables (highest priority)")
  915. # Show specific environment variable requirements
  916. if storage_name in STORAGE_ENV_REQUIREMENTS:
  917. for var in STORAGE_ENV_REQUIREMENTS[storage_name]:
  918. print(f" - {var}")
  919. print(" 2. config.ini file (medium priority)")
  920. if storage_name == "RedisKVStorage":
  921. print(" [redis]")
  922. print(" uri = redis://localhost:6379")
  923. elif storage_name == "PGKVStorage":
  924. print(" [postgres]")
  925. print(" host = localhost")
  926. print(" port = 5432")
  927. print(" user = postgres")
  928. print(" password = yourpassword")
  929. print(" database = lightrag")
  930. elif storage_name == "MongoKVStorage":
  931. print(" [mongodb]")
  932. print(" uri = mongodb://root:root@localhost:27017/")
  933. print(" database = LightRAG")
  934. elif storage_name == "OpenSearchKVStorage":
  935. print(" [opensearch]")
  936. print(" hosts = localhost:9200")
  937. return None, None, None, 0
  938. # Count cache records efficiently
  939. print(f"\n{'Counting' if use_streaming else 'Loading'} cache records...")
  940. try:
  941. if use_streaming:
  942. # Use efficient counting without loading data
  943. total_count = await self.count_default_caches(storage, storage_name)
  944. print(f"- Total: {total_count:,} records")
  945. else:
  946. # Legacy mode: load all data
  947. cache_data = await self.get_default_caches(storage, storage_name)
  948. counts = await self.count_cache_types(cache_data)
  949. total_count = len(cache_data)
  950. print(f"- default:extract: {counts['extract']:,} records")
  951. print(f"- default:summary: {counts['summary']:,} records")
  952. print(f"- Total: {total_count:,} records")
  953. except Exception as e:
  954. print(f"✗ {'Counting' if use_streaming else 'Loading'} failed: {e}")
  955. return None, None, None, 0
  956. return storage, storage_name, workspace, total_count
  957. async def migrate_caches(
  958. self, source_data: Dict[str, Any], target_storage, target_storage_name: str
  959. ) -> MigrationStats:
  960. """Migrate caches in batches with error tracking (Legacy mode - loads all data)
  961. Args:
  962. source_data: Source cache data
  963. target_storage: Target storage instance
  964. target_storage_name: Target storage type name
  965. Returns:
  966. MigrationStats object with migration results and errors
  967. """
  968. stats = MigrationStats()
  969. stats.total_source_records = len(source_data)
  970. if stats.total_source_records == 0:
  971. print("\nNo records to migrate")
  972. return stats
  973. # Convert to list for batching
  974. items = list(source_data.items())
  975. stats.total_batches = (
  976. stats.total_source_records + self.batch_size - 1
  977. ) // self.batch_size
  978. print("\n=== Starting Migration ===")
  979. for batch_idx in range(stats.total_batches):
  980. start_idx = batch_idx * self.batch_size
  981. end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records)
  982. batch_items = items[start_idx:end_idx]
  983. batch_data = dict(batch_items)
  984. # Determine current cache type for display
  985. current_key = batch_items[0][0]
  986. cache_type = "extract" if "extract" in current_key else "summary"
  987. try:
  988. # Attempt to write batch
  989. await target_storage.upsert(batch_data)
  990. # Success - update stats
  991. stats.successful_batches += 1
  992. stats.successful_records += len(batch_data)
  993. # Calculate progress
  994. progress = (end_idx / stats.total_source_records) * 100
  995. bar_length = 20
  996. filled_length = int(bar_length * end_idx // stats.total_source_records)
  997. bar = "█" * filled_length + "░" * (bar_length - filled_length)
  998. print(
  999. f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
  1000. f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - "
  1001. f"default:{cache_type} ✓"
  1002. )
  1003. except Exception as e:
  1004. # Error - record and continue
  1005. stats.add_error(batch_idx + 1, e, len(batch_data))
  1006. print(
  1007. f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
  1008. f"{type(e).__name__}: {str(e)}"
  1009. )
  1010. # Final persist
  1011. print("\nPersisting data to disk...")
  1012. try:
  1013. await target_storage.index_done_callback()
  1014. print("✓ Data persisted successfully")
  1015. except Exception as e:
  1016. print(f"✗ Persist failed: {e}")
  1017. stats.add_error(0, e, 0) # batch 0 = persist error
  1018. return stats
  1019. async def migrate_caches_streaming(
  1020. self,
  1021. source_storage,
  1022. source_storage_name: str,
  1023. target_storage,
  1024. target_storage_name: str,
  1025. total_records: int,
  1026. ) -> MigrationStats:
  1027. """Migrate caches using streaming approach - minimal memory footprint
  1028. Args:
  1029. source_storage: Source storage instance
  1030. source_storage_name: Source storage type name
  1031. target_storage: Target storage instance
  1032. target_storage_name: Target storage type name
  1033. total_records: Total number of records to migrate
  1034. Returns:
  1035. MigrationStats object with migration results and errors
  1036. """
  1037. stats = MigrationStats()
  1038. stats.total_source_records = total_records
  1039. if stats.total_source_records == 0:
  1040. print("\nNo records to migrate")
  1041. return stats
  1042. # Calculate total batches
  1043. stats.total_batches = (total_records + self.batch_size - 1) // self.batch_size
  1044. print("\n=== Starting Streaming Migration ===")
  1045. print(
  1046. f"💡 Memory-optimized mode: Processing {self.batch_size:,} records at a time\n"
  1047. )
  1048. batch_idx = 0
  1049. # Stream batches from source and write to target immediately
  1050. async for batch in self.stream_default_caches(
  1051. source_storage, source_storage_name
  1052. ):
  1053. batch_idx += 1
  1054. # Determine current cache type for display
  1055. if batch:
  1056. first_key = next(iter(batch.keys()))
  1057. cache_type = "extract" if "extract" in first_key else "summary"
  1058. else:
  1059. cache_type = "unknown"
  1060. try:
  1061. # Write batch to target storage
  1062. await target_storage.upsert(batch)
  1063. # Success - update stats
  1064. stats.successful_batches += 1
  1065. stats.successful_records += len(batch)
  1066. # Calculate progress with known total
  1067. progress = (stats.successful_records / total_records) * 100
  1068. bar_length = 20
  1069. filled_length = int(
  1070. bar_length * stats.successful_records // total_records
  1071. )
  1072. bar = "█" * filled_length + "░" * (bar_length - filled_length)
  1073. print(
  1074. f"Batch {batch_idx}/{stats.total_batches}: {bar} "
  1075. f"{stats.successful_records:,}/{total_records:,} ({progress:.1f}%) - "
  1076. f"default:{cache_type} ✓"
  1077. )
  1078. except Exception as e:
  1079. # Error - record and continue
  1080. stats.add_error(batch_idx, e, len(batch))
  1081. print(
  1082. f"Batch {batch_idx}/{stats.total_batches}: ✗ FAILED - "
  1083. f"{type(e).__name__}: {str(e)}"
  1084. )
  1085. # Final persist
  1086. print("\nPersisting data to disk...")
  1087. try:
  1088. await target_storage.index_done_callback()
  1089. print("✓ Data persisted successfully")
  1090. except Exception as e:
  1091. print(f"✗ Persist failed: {e}")
  1092. stats.add_error(0, e, 0) # batch 0 = persist error
  1093. return stats
  1094. def print_migration_report(self, stats: MigrationStats):
  1095. """Print comprehensive migration report
  1096. Args:
  1097. stats: MigrationStats object with migration results
  1098. """
  1099. print("\n" + "=" * 60)
  1100. print("Migration Complete - Final Report")
  1101. print("=" * 60)
  1102. # Overall statistics
  1103. print("\n📊 Statistics:")
  1104. print(f" Total source records: {stats.total_source_records:,}")
  1105. print(f" Total batches: {stats.total_batches:,}")
  1106. print(f" Successful batches: {stats.successful_batches:,}")
  1107. print(f" Failed batches: {stats.failed_batches:,}")
  1108. print(f" Successfully migrated: {stats.successful_records:,}")
  1109. print(f" Failed to migrate: {stats.failed_records:,}")
  1110. # Success rate
  1111. success_rate = (
  1112. (stats.successful_records / stats.total_source_records * 100)
  1113. if stats.total_source_records > 0
  1114. else 0
  1115. )
  1116. print(f" Success rate: {success_rate:.2f}%")
  1117. # Error details
  1118. if stats.errors:
  1119. print(f"\n⚠️ Errors encountered: {len(stats.errors)}")
  1120. print("\nError Details:")
  1121. print("-" * 60)
  1122. # Group errors by type
  1123. error_types = {}
  1124. for error in stats.errors:
  1125. err_type = error["error_type"]
  1126. error_types[err_type] = error_types.get(err_type, 0) + 1
  1127. print("\nError Summary:")
  1128. for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]):
  1129. print(f" - {err_type}: {count} occurrence(s)")
  1130. print("\nFirst 5 errors:")
  1131. for i, error in enumerate(stats.errors[:5], 1):
  1132. print(f"\n {i}. Batch {error['batch']}")
  1133. print(f" Type: {error['error_type']}")
  1134. print(f" Message: {error['error_msg']}")
  1135. print(f" Records lost: {error['records_lost']:,}")
  1136. if len(stats.errors) > 5:
  1137. print(f"\n ... and {len(stats.errors) - 5} more errors")
  1138. print("\n" + "=" * 60)
  1139. print("⚠️ WARNING: Migration completed with errors!")
  1140. print(" Please review the error details above.")
  1141. print("=" * 60)
  1142. else:
  1143. print("\n" + "=" * 60)
  1144. print("✓ SUCCESS: All records migrated successfully!")
  1145. print("=" * 60)
  1146. async def run(self):
  1147. """Run the migration tool with streaming approach and early validation"""
  1148. try:
  1149. # Initialize shared storage (REQUIRED for storage classes to work)
  1150. from lightrag.kg.shared_storage import initialize_share_data
  1151. initialize_share_data(workers=1)
  1152. # Print header
  1153. self.print_header()
  1154. # Setup source storage with streaming (only count, don't load all data)
  1155. (
  1156. self.source_storage,
  1157. source_storage_name,
  1158. self.source_workspace,
  1159. source_count,
  1160. ) = await self.setup_storage("Source", use_streaming=True)
  1161. # Check if user cancelled (setup_storage returns None for all fields)
  1162. if self.source_storage is None:
  1163. return
  1164. # Check if there are at least 2 storage types available
  1165. available_count = self.count_available_storage_types()
  1166. if available_count <= 1:
  1167. print("\n" + "=" * 60)
  1168. print("⚠️ Warning: Migration Not Possible")
  1169. print("=" * 60)
  1170. print(f"Only {available_count} storage type(s) available.")
  1171. print("Migration requires at least 2 different storage types.")
  1172. print("\nTo enable migration, configure additional storage:")
  1173. print(" 1. Set environment variables, OR")
  1174. print(" 2. Update config.ini file")
  1175. print("\nSupported storage types:")
  1176. for name in STORAGE_TYPES.values():
  1177. if name != source_storage_name:
  1178. print(f" - {name}")
  1179. if name in STORAGE_ENV_REQUIREMENTS:
  1180. for var in STORAGE_ENV_REQUIREMENTS[name]:
  1181. print(f" Required: {var}")
  1182. print("=" * 60)
  1183. # Cleanup
  1184. await self.source_storage.finalize()
  1185. return
  1186. if source_count == 0:
  1187. print("\n⚠️ Source storage has no cache records to migrate")
  1188. # Cleanup
  1189. await self.source_storage.finalize()
  1190. return
  1191. # Setup target storage with streaming (only count, don't load all data)
  1192. # Exclude source storage type from target selection
  1193. (
  1194. self.target_storage,
  1195. target_storage_name,
  1196. self.target_workspace,
  1197. target_count,
  1198. ) = await self.setup_storage(
  1199. "Target", use_streaming=True, exclude_storage_name=source_storage_name
  1200. )
  1201. if not self.target_storage:
  1202. print("\n✗ Target storage setup failed")
  1203. # Cleanup source
  1204. await self.source_storage.finalize()
  1205. return
  1206. # Show migration summary
  1207. print("\n" + "=" * 50)
  1208. print("Migration Confirmation")
  1209. print("=" * 50)
  1210. print(
  1211. f"Source: {self.format_storage_name(source_storage_name)} (workspace: {self.format_workspace(self.source_workspace)}) - {source_count:,} records"
  1212. )
  1213. print(
  1214. f"Target: {self.format_storage_name(target_storage_name)} (workspace: {self.format_workspace(self.target_workspace)}) - {target_count:,} records"
  1215. )
  1216. print(f"Batch Size: {self.batch_size:,} records/batch")
  1217. print("Memory Mode: Streaming (memory-optimized)")
  1218. if target_count > 0:
  1219. print(
  1220. f"\n⚠️ Warning: Target storage already has {target_count:,} records"
  1221. )
  1222. print("Migration will overwrite records with the same keys")
  1223. # Confirm migration
  1224. confirm = input("\nContinue? (y/n): ").strip().lower()
  1225. if confirm != "y":
  1226. print("\n✗ Migration cancelled")
  1227. # Cleanup
  1228. await self.source_storage.finalize()
  1229. await self.target_storage.finalize()
  1230. return
  1231. # Perform streaming migration with error tracking
  1232. stats = await self.migrate_caches_streaming(
  1233. self.source_storage,
  1234. source_storage_name,
  1235. self.target_storage,
  1236. target_storage_name,
  1237. source_count,
  1238. )
  1239. # Print comprehensive migration report
  1240. self.print_migration_report(stats)
  1241. # Cleanup
  1242. await self.source_storage.finalize()
  1243. await self.target_storage.finalize()
  1244. except KeyboardInterrupt:
  1245. print("\n\n✗ Migration interrupted by user")
  1246. except Exception as e:
  1247. print(f"\n✗ Migration failed: {e}")
  1248. import traceback
  1249. traceback.print_exc()
  1250. finally:
  1251. # Ensure cleanup
  1252. if self.source_storage:
  1253. try:
  1254. await self.source_storage.finalize()
  1255. except Exception:
  1256. pass
  1257. if self.target_storage:
  1258. try:
  1259. await self.target_storage.finalize()
  1260. except Exception:
  1261. pass
  1262. # Finalize shared storage
  1263. try:
  1264. from lightrag.kg.shared_storage import finalize_share_data
  1265. finalize_share_data()
  1266. except Exception:
  1267. pass
  1268. async def main():
  1269. """Main entry point"""
  1270. tool = MigrationTool()
  1271. await tool.run()
  1272. if __name__ == "__main__":
  1273. asyncio.run(main())