lightrag_ag2_multiagent_demo.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. """LightRAG + AG2 Multi-Agent Demo.
  2. Demonstrates how AG2 agents can use LightRAG's knowledge graph retrieval
  3. as a tool. Multiple specialized agents collaborate to answer complex
  4. questions over indexed documents.
  5. Architecture:
  6. User -> AG2 GroupChat (Researcher + Analyst + Writer) -> LightRAG queries
  7. - Researcher: uses LightRAG hybrid search to gather facts
  8. - Analyst: uses LightRAG naive (vector) search for complementary results
  9. - Writer: synthesizes findings into a final answer
  10. Requires:
  11. pip install lightrag-hku "ag2[openai]>=0.11.4,<1.0"
  12. export OPENAI_API_KEY="..."
  13. Usage:
  14. python examples/lightrag_ag2_multiagent_demo.py
  15. """
  16. import asyncio
  17. import json
  18. import os
  19. import shutil
  20. import threading
  21. from autogen import (
  22. AssistantAgent,
  23. GroupChat,
  24. GroupChatManager,
  25. LLMConfig,
  26. UserProxyAgent,
  27. )
  28. from lightrag import LightRAG, QueryParam
  29. from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
  30. # --- Configuration ---
  31. WORKING_DIR = "./ag2_demo_workdir"
  32. SAMPLE_TEXT = """
  33. Artificial intelligence has transformed multiple industries. Machine learning,
  34. a subset of AI, enables systems to learn from data without explicit programming.
  35. Deep learning, using neural networks with many layers, has achieved breakthroughs
  36. in computer vision, natural language processing, and speech recognition.
  37. Transformer architectures, introduced in the 2017 paper "Attention Is All You Need"
  38. by Vaswani et al., revolutionized NLP. Models like GPT and BERT are built on
  39. transformers. GPT (Generative Pre-trained Transformer) uses decoder-only architecture
  40. for text generation, while BERT (Bidirectional Encoder Representations) uses
  41. encoder-only architecture for understanding tasks.
  42. Retrieval-Augmented Generation (RAG) combines the strengths of retrieval systems
  43. and generative models. Instead of relying solely on parametric knowledge, RAG
  44. systems retrieve relevant documents from a knowledge base and use them as context
  45. for generation. This approach reduces hallucination and enables models to access
  46. up-to-date information.
  47. Knowledge graphs represent information as entities and relationships. When combined
  48. with RAG, knowledge graphs enable structured reasoning over document collections.
  49. LightRAG implements this approach with dual-level retrieval: local search focuses
  50. on specific entities, while global search captures broader themes and relationships.
  51. """
  52. # --- LightRAG Setup ---
  53. async def setup_lightrag() -> LightRAG:
  54. """Initialize LightRAG and index sample documents."""
  55. if os.path.exists(WORKING_DIR):
  56. shutil.rmtree(WORKING_DIR)
  57. os.makedirs(WORKING_DIR, exist_ok=True)
  58. rag = LightRAG(
  59. working_dir=WORKING_DIR,
  60. embedding_func=openai_embed,
  61. llm_model_func=gpt_4o_mini_complete,
  62. )
  63. await rag.initialize_storages()
  64. await rag.ainsert(SAMPLE_TEXT)
  65. print("LightRAG initialized and documents indexed.\n")
  66. return rag
  67. # --- Async Bridge ---
  68. # AG2 runs tools in a background thread without an event loop.
  69. # We maintain a dedicated event loop in a separate thread for LightRAG async calls.
  70. _bg_loop: asyncio.AbstractEventLoop = None
  71. def _start_background_loop(loop: asyncio.AbstractEventLoop):
  72. asyncio.set_event_loop(loop)
  73. loop.run_forever()
  74. def _run_async(coro):
  75. """Submit a coroutine to the background event loop and wait for the result."""
  76. future = asyncio.run_coroutine_threadsafe(coro, _bg_loop)
  77. return future.result(timeout=120)
  78. # --- AG2 Agent Tools ---
  79. # Global reference to LightRAG instance (set in main)
  80. _rag_instance: LightRAG = None
  81. def create_agents():
  82. """Create AG2 agents with LightRAG tools."""
  83. llm_config = LLMConfig(
  84. {
  85. "model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
  86. "api_key": os.environ["OPENAI_API_KEY"],
  87. "api_type": "openai",
  88. }
  89. )
  90. researcher = AssistantAgent(
  91. name="Researcher",
  92. system_message=(
  93. "You are a research specialist. Use the lightrag_query tool to search "
  94. "the knowledge base. Start with 'hybrid' mode for comprehensive results. "
  95. "If you need specific entity details, use 'local' mode. "
  96. "Present your findings as structured bullet points. "
  97. "Always call the tool -- do NOT answer from your own knowledge."
  98. ),
  99. llm_config=llm_config,
  100. )
  101. analyst = AssistantAgent(
  102. name="Analyst",
  103. system_message=(
  104. "You are a knowledge graph analyst. Your FIRST action MUST be calling "
  105. "the lightrag_query tool with mode='naive' to run a direct vector search. "
  106. "This gives different results from the Researcher's hybrid search. "
  107. "After receiving the naive search results, compare them with the "
  108. "Researcher's findings and highlight any additional insights. "
  109. "You MUST call the tool before writing any analysis."
  110. ),
  111. llm_config=llm_config,
  112. )
  113. writer = AssistantAgent(
  114. name="Writer",
  115. system_message=(
  116. "You are a technical writer. Synthesize the findings from the "
  117. "Researcher and Analyst into a clear, well-structured answer. "
  118. "Do NOT use the search tool -- work only with what the other agents "
  119. "have found. End your response with TERMINATE."
  120. ),
  121. llm_config=llm_config,
  122. )
  123. def is_termination(msg):
  124. return "TERMINATE" in (msg.get("content") or "")
  125. user_proxy = UserProxyAgent(
  126. name="User",
  127. human_input_mode="NEVER",
  128. max_consecutive_auto_reply=10,
  129. code_execution_config=False,
  130. is_termination_msg=is_termination,
  131. )
  132. # --- Register LightRAG as a tool ---
  133. @user_proxy.register_for_execution()
  134. @researcher.register_for_llm(
  135. description=(
  136. "Query the LightRAG knowledge base. "
  137. "mode: 'naive' (simple vector), 'local' (entity-focused), "
  138. "'global' (theme/relationship-focused), 'hybrid' (combined). "
  139. "Returns retrieved context from indexed documents."
  140. )
  141. )
  142. @analyst.register_for_llm(
  143. description=(
  144. "Query the LightRAG knowledge base. "
  145. "mode: 'naive' (simple vector), 'local' (entity-focused), "
  146. "'global' (theme/relationship-focused), 'hybrid' (combined). "
  147. "Returns retrieved context from indexed documents."
  148. )
  149. )
  150. def lightrag_query(query: str, mode: str = "hybrid") -> str:
  151. """Query LightRAG synchronously (wraps async call)."""
  152. valid_modes = {"naive", "local", "global", "hybrid"}
  153. if mode not in valid_modes:
  154. return json.dumps(
  155. {"error": f"Invalid mode '{mode}'. Use one of: {valid_modes}"}
  156. )
  157. try:
  158. result = _run_async(
  159. _rag_instance.aquery(query, param=QueryParam(mode=mode))
  160. )
  161. return json.dumps({"mode": mode, "query": query, "result": result})
  162. except Exception as e:
  163. return json.dumps({"error": str(e)})
  164. return user_proxy, researcher, analyst, writer
  165. def run_multiagent_query(user_proxy, researcher, analyst, writer, question: str):
  166. """Run a multi-agent GroupChat to answer a question using LightRAG."""
  167. # Enforce pipeline: Researcher -> Analyst -> Writer.
  168. # func_call_filter (default True) automatically routes tool calls
  169. # to/from user_proxy, so transitions only govern non-tool handoffs.
  170. # User can only start with Researcher; Researcher advances to Analyst;
  171. # Analyst advances to Writer. Writer terminates the conversation.
  172. allowed_transitions = {
  173. user_proxy: [researcher],
  174. researcher: [user_proxy, analyst],
  175. analyst: [user_proxy, writer],
  176. writer: [],
  177. }
  178. group_chat = GroupChat(
  179. agents=[user_proxy, researcher, analyst, writer],
  180. messages=[],
  181. max_round=12,
  182. allowed_or_disallowed_speaker_transitions=allowed_transitions,
  183. speaker_transitions_type="allowed",
  184. )
  185. manager = GroupChatManager(
  186. groupchat=group_chat,
  187. llm_config=LLMConfig(
  188. {
  189. "model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
  190. "api_key": os.environ["OPENAI_API_KEY"],
  191. "api_type": "openai",
  192. }
  193. ),
  194. is_termination_msg=lambda msg: "TERMINATE" in (msg.get("content") or ""),
  195. )
  196. print(f"Question: {question}\n{'=' * 60}\n")
  197. user_proxy.run(manager, message=question).process()
  198. print(f"\n{'=' * 60}")
  199. # --- Main ---
  200. def main():
  201. global _rag_instance, _bg_loop
  202. if not os.getenv("OPENAI_API_KEY"):
  203. print(
  204. "Error: OPENAI_API_KEY environment variable is not set.\n"
  205. "Set it by running: export OPENAI_API_KEY='your-openai-api-key'"
  206. )
  207. return
  208. # Start a background event loop for LightRAG async calls.
  209. # AG2 tools run in threads without an event loop, so we need a
  210. # persistent loop that can accept coroutines from any thread.
  211. _bg_loop = asyncio.new_event_loop()
  212. bg_thread = threading.Thread(
  213. target=_start_background_loop, args=(_bg_loop,), daemon=True
  214. )
  215. bg_thread.start()
  216. try:
  217. # Step 1: Set up LightRAG (async, runs on the background loop)
  218. _rag_instance = _run_async(setup_lightrag())
  219. # Step 2: Create AG2 agents with LightRAG tools
  220. user_proxy, researcher, analyst, writer = create_agents()
  221. # Step 3: Ask a complex question
  222. run_multiagent_query(
  223. user_proxy,
  224. researcher,
  225. analyst,
  226. writer,
  227. question=(
  228. "How do transformer architectures relate to RAG systems? "
  229. "What role do knowledge graphs play in improving retrieval quality?"
  230. ),
  231. )
  232. except Exception as e:
  233. print(f"An error occurred: {e}")
  234. finally:
  235. if _rag_instance:
  236. _run_async(_rag_instance.finalize_storages())
  237. _bg_loop.call_soon_threadsafe(_bg_loop.stop)
  238. bg_thread.join(timeout=5)
  239. shutil.rmtree(WORKING_DIR, ignore_errors=True)
  240. if __name__ == "__main__":
  241. main()
  242. print("\nDone!")