lightrag_gemini_workspace_demo.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """
  2. LightRAG Data Isolation Demo: Workspace Management
  3. This example demonstrates how to maintain multiple isolated knowledge bases
  4. within a single application using LightRAG's 'workspace' feature.
  5. Key Concepts:
  6. - Workspace Isolation: Each RAG instance is assigned a unique workspace name,
  7. which ensures that Knowledge Graphs, Vector Databases, and Chunks are
  8. stored in separate, non-conflicting directories.
  9. - Independent Configuration: Different workspaces can utilize different
  10. entity type guidance and document sets simultaneously.
  11. Prerequisites:
  12. 1. Set the following environment variables:
  13. - GEMINI_API_KEY: Your Google Gemini API key.
  14. 2. Ensure your data directory contains:
  15. - Data/book-small.txt
  16. - Data/HR_policies.txt
  17. Usage:
  18. python lightrag_workspace_demo.py
  19. """
  20. import os
  21. import asyncio
  22. import numpy as np
  23. from lightrag import LightRAG, QueryParam
  24. from lightrag.llm.gemini import gemini_model_complete, gemini_embed
  25. from lightrag.utils import wrap_embedding_func_with_attrs
  26. async def llm_model_func(
  27. prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
  28. ) -> str:
  29. """Wrapper for Gemini LLM completion."""
  30. return await gemini_model_complete(
  31. prompt,
  32. system_prompt=system_prompt,
  33. history_messages=history_messages,
  34. api_key=os.getenv("GEMINI_API_KEY"),
  35. model_name="gemini-2.0-flash-exp",
  36. **kwargs,
  37. )
  38. @wrap_embedding_func_with_attrs(
  39. embedding_dim=768, max_token_size=2048, model_name="models/text-embedding-004"
  40. )
  41. async def embedding_func(texts: list[str]) -> np.ndarray:
  42. """Wrapper for Gemini embedding model."""
  43. return await gemini_embed.func(
  44. texts, api_key=os.getenv("GEMINI_API_KEY"), model="models/text-embedding-004"
  45. )
  46. async def initialize_rag(
  47. workspace: str = "default_workspace",
  48. ) -> LightRAG:
  49. """
  50. Initializes a LightRAG instance with data isolation.
  51. Entity type guidance can be customized by passing
  52. addon_params={'entity_types_guidance': '...'} to LightRAG.
  53. """
  54. rag = LightRAG(
  55. workspace=workspace,
  56. llm_model_name="gemini-2.0-flash",
  57. llm_model_func=llm_model_func,
  58. embedding_func=embedding_func,
  59. embedding_func_max_async=4,
  60. embedding_batch_num=8,
  61. llm_model_max_async=2,
  62. )
  63. await rag.initialize_storages()
  64. return rag
  65. async def main():
  66. rag_1 = None
  67. rag_2 = None
  68. try:
  69. # 1. Initialize Isolated Workspaces
  70. # Instance 1: Dedicated to literary analysis
  71. # Instance 2: Dedicated to corporate HR documentation
  72. print("Initializing isolated LightRAG workspaces...")
  73. rag_1 = await initialize_rag("rag_workspace_book")
  74. rag_2 = await initialize_rag("rag_workspace_hr")
  75. # 2. Populate Workspace 1 (Literature)
  76. book_path = "Data/book-small.txt"
  77. if os.path.exists(book_path):
  78. with open(book_path, "r", encoding="utf-8") as f:
  79. print(f"Indexing {book_path} into Literature Workspace...")
  80. await rag_1.ainsert(f.read())
  81. # 3. Populate Workspace 2 (Corporate)
  82. hr_path = "Data/HR_policies.txt"
  83. if os.path.exists(hr_path):
  84. with open(hr_path, "r", encoding="utf-8") as f:
  85. print(f"Indexing {hr_path} into HR Workspace...")
  86. await rag_2.ainsert(f.read())
  87. # 4. Context-Specific Querying
  88. print("\n--- Querying Literature Workspace ---")
  89. res1 = await rag_1.aquery(
  90. "What is the main theme?",
  91. param=QueryParam(mode="hybrid", stream=False),
  92. )
  93. print(f"Book Analysis: {res1[:200]}...")
  94. print("\n--- Querying HR Workspace ---")
  95. res2 = await rag_2.aquery(
  96. "What is the leave policy?", param=QueryParam(mode="hybrid")
  97. )
  98. print(f"HR Response: {res2[:200]}...")
  99. except Exception as e:
  100. print(f"An error occurred: {e}")
  101. finally:
  102. # Finalize storage to safely close DB connections and write buffers
  103. if rag_1:
  104. await rag_1.finalize_storages()
  105. if rag_2:
  106. await rag_2.finalize_storages()
  107. if __name__ == "__main__":
  108. asyncio.run(main())