copy_llm_cache_to_another_storage.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. """
  2. Sometimes you need to switch a storage solution, but you want to save LLM token and time.
  3. This handy script helps you to copy the LLM caches from one storage solution to another.
  4. (Not all the storage impl are supported)
  5. """
  6. import asyncio
  7. import logging
  8. import os
  9. from dotenv import load_dotenv
  10. from lightrag.kg.postgres_impl import PostgreSQLDB, PGKVStorage
  11. from lightrag.kg.json_kv_impl import JsonKVStorage
  12. from lightrag.namespace import NameSpace
  13. load_dotenv()
  14. ROOT_DIR = os.environ.get("ROOT_DIR")
  15. WORKING_DIR = f"{ROOT_DIR}/dickens"
  16. logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
  17. if not os.path.exists(WORKING_DIR):
  18. os.mkdir(WORKING_DIR)
  19. # AGE
  20. os.environ["AGE_GRAPH_NAME"] = "chinese"
  21. postgres_db = PostgreSQLDB(
  22. config={
  23. "host": "localhost",
  24. "port": 15432,
  25. "user": "rag",
  26. "password": "rag",
  27. "database": "r2",
  28. }
  29. )
  30. async def copy_from_postgres_to_json():
  31. await postgres_db.initdb()
  32. from_llm_response_cache = PGKVStorage(
  33. namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
  34. global_config={"embedding_batch_num": 6},
  35. embedding_func=None,
  36. db=postgres_db,
  37. )
  38. to_llm_response_cache = JsonKVStorage(
  39. namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
  40. global_config={"working_dir": WORKING_DIR},
  41. embedding_func=None,
  42. )
  43. # Get all cache data using the new flattened structure
  44. all_data = await from_llm_response_cache.get_all()
  45. # Convert flattened data to hierarchical structure for JsonKVStorage
  46. kv = {}
  47. for flattened_key, cache_entry in all_data.items():
  48. # Parse flattened key: {mode}:{cache_type}:{hash}
  49. parts = flattened_key.split(":", 2)
  50. if len(parts) == 3:
  51. mode, cache_type, hash_value = parts
  52. if mode not in kv:
  53. kv[mode] = {}
  54. kv[mode][hash_value] = cache_entry
  55. print(f"Copying {flattened_key} -> {mode}[{hash_value}]")
  56. else:
  57. print(f"Skipping invalid key format: {flattened_key}")
  58. await to_llm_response_cache.upsert(kv)
  59. await to_llm_response_cache.index_done_callback()
  60. print("Mission accomplished!")
  61. async def copy_from_json_to_postgres():
  62. await postgres_db.initdb()
  63. from_llm_response_cache = JsonKVStorage(
  64. namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
  65. global_config={"working_dir": WORKING_DIR},
  66. embedding_func=None,
  67. )
  68. to_llm_response_cache = PGKVStorage(
  69. namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
  70. global_config={"embedding_batch_num": 6},
  71. embedding_func=None,
  72. db=postgres_db,
  73. )
  74. # Get all cache data from JsonKVStorage (hierarchical structure)
  75. all_data = await from_llm_response_cache.get_all()
  76. # Convert hierarchical data to flattened structure for PGKVStorage
  77. flattened_data = {}
  78. for mode, mode_data in all_data.items():
  79. print(f"Processing mode: {mode}")
  80. for hash_value, cache_entry in mode_data.items():
  81. # Determine cache_type from cache entry or use default
  82. cache_type = cache_entry.get("cache_type", "extract")
  83. # Create flattened key: {mode}:{cache_type}:{hash}
  84. flattened_key = f"{mode}:{cache_type}:{hash_value}"
  85. flattened_data[flattened_key] = cache_entry
  86. print(f"\tConverting {mode}[{hash_value}] -> {flattened_key}")
  87. # Upsert the flattened data
  88. await to_llm_response_cache.upsert(flattened_data)
  89. print("Mission accomplished!")
  90. if __name__ == "__main__":
  91. asyncio.run(copy_from_json_to_postgres())