| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- """LightRAG + AG2 Multi-Agent Demo.
- Demonstrates how AG2 agents can use LightRAG's knowledge graph retrieval
- as a tool. Multiple specialized agents collaborate to answer complex
- questions over indexed documents.
- Architecture:
- User -> AG2 GroupChat (Researcher + Analyst + Writer) -> LightRAG queries
- - Researcher: uses LightRAG hybrid search to gather facts
- - Analyst: uses LightRAG naive (vector) search for complementary results
- - Writer: synthesizes findings into a final answer
- Requires:
- pip install lightrag-hku "ag2[openai]>=0.11.4,<1.0"
- export OPENAI_API_KEY="..."
- Usage:
- python examples/lightrag_ag2_multiagent_demo.py
- """
- import asyncio
- import json
- import os
- import shutil
- import threading
- from autogen import (
- AssistantAgent,
- GroupChat,
- GroupChatManager,
- LLMConfig,
- UserProxyAgent,
- )
- from lightrag import LightRAG, QueryParam
- from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
- # --- Configuration ---
- WORKING_DIR = "./ag2_demo_workdir"
- SAMPLE_TEXT = """
- Artificial intelligence has transformed multiple industries. Machine learning,
- a subset of AI, enables systems to learn from data without explicit programming.
- Deep learning, using neural networks with many layers, has achieved breakthroughs
- in computer vision, natural language processing, and speech recognition.
- Transformer architectures, introduced in the 2017 paper "Attention Is All You Need"
- by Vaswani et al., revolutionized NLP. Models like GPT and BERT are built on
- transformers. GPT (Generative Pre-trained Transformer) uses decoder-only architecture
- for text generation, while BERT (Bidirectional Encoder Representations) uses
- encoder-only architecture for understanding tasks.
- Retrieval-Augmented Generation (RAG) combines the strengths of retrieval systems
- and generative models. Instead of relying solely on parametric knowledge, RAG
- systems retrieve relevant documents from a knowledge base and use them as context
- for generation. This approach reduces hallucination and enables models to access
- up-to-date information.
- Knowledge graphs represent information as entities and relationships. When combined
- with RAG, knowledge graphs enable structured reasoning over document collections.
- LightRAG implements this approach with dual-level retrieval: local search focuses
- on specific entities, while global search captures broader themes and relationships.
- """
- # --- LightRAG Setup ---
- async def setup_lightrag() -> LightRAG:
- """Initialize LightRAG and index sample documents."""
- if os.path.exists(WORKING_DIR):
- shutil.rmtree(WORKING_DIR)
- os.makedirs(WORKING_DIR, exist_ok=True)
- rag = LightRAG(
- working_dir=WORKING_DIR,
- embedding_func=openai_embed,
- llm_model_func=gpt_4o_mini_complete,
- )
- await rag.initialize_storages()
- await rag.ainsert(SAMPLE_TEXT)
- print("LightRAG initialized and documents indexed.\n")
- return rag
- # --- Async Bridge ---
- # AG2 runs tools in a background thread without an event loop.
- # We maintain a dedicated event loop in a separate thread for LightRAG async calls.
- _bg_loop: asyncio.AbstractEventLoop = None
- def _start_background_loop(loop: asyncio.AbstractEventLoop):
- asyncio.set_event_loop(loop)
- loop.run_forever()
- def _run_async(coro):
- """Submit a coroutine to the background event loop and wait for the result."""
- future = asyncio.run_coroutine_threadsafe(coro, _bg_loop)
- return future.result(timeout=120)
- # --- AG2 Agent Tools ---
- # Global reference to LightRAG instance (set in main)
- _rag_instance: LightRAG = None
- def create_agents():
- """Create AG2 agents with LightRAG tools."""
- llm_config = LLMConfig(
- {
- "model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
- "api_key": os.environ["OPENAI_API_KEY"],
- "api_type": "openai",
- }
- )
- researcher = AssistantAgent(
- name="Researcher",
- system_message=(
- "You are a research specialist. Use the lightrag_query tool to search "
- "the knowledge base. Start with 'hybrid' mode for comprehensive results. "
- "If you need specific entity details, use 'local' mode. "
- "Present your findings as structured bullet points. "
- "Always call the tool -- do NOT answer from your own knowledge."
- ),
- llm_config=llm_config,
- )
- analyst = AssistantAgent(
- name="Analyst",
- system_message=(
- "You are a knowledge graph analyst. Your FIRST action MUST be calling "
- "the lightrag_query tool with mode='naive' to run a direct vector search. "
- "This gives different results from the Researcher's hybrid search. "
- "After receiving the naive search results, compare them with the "
- "Researcher's findings and highlight any additional insights. "
- "You MUST call the tool before writing any analysis."
- ),
- llm_config=llm_config,
- )
- writer = AssistantAgent(
- name="Writer",
- system_message=(
- "You are a technical writer. Synthesize the findings from the "
- "Researcher and Analyst into a clear, well-structured answer. "
- "Do NOT use the search tool -- work only with what the other agents "
- "have found. End your response with TERMINATE."
- ),
- llm_config=llm_config,
- )
- def is_termination(msg):
- return "TERMINATE" in (msg.get("content") or "")
- user_proxy = UserProxyAgent(
- name="User",
- human_input_mode="NEVER",
- max_consecutive_auto_reply=10,
- code_execution_config=False,
- is_termination_msg=is_termination,
- )
- # --- Register LightRAG as a tool ---
- @user_proxy.register_for_execution()
- @researcher.register_for_llm(
- description=(
- "Query the LightRAG knowledge base. "
- "mode: 'naive' (simple vector), 'local' (entity-focused), "
- "'global' (theme/relationship-focused), 'hybrid' (combined). "
- "Returns retrieved context from indexed documents."
- )
- )
- @analyst.register_for_llm(
- description=(
- "Query the LightRAG knowledge base. "
- "mode: 'naive' (simple vector), 'local' (entity-focused), "
- "'global' (theme/relationship-focused), 'hybrid' (combined). "
- "Returns retrieved context from indexed documents."
- )
- )
- def lightrag_query(query: str, mode: str = "hybrid") -> str:
- """Query LightRAG synchronously (wraps async call)."""
- valid_modes = {"naive", "local", "global", "hybrid"}
- if mode not in valid_modes:
- return json.dumps(
- {"error": f"Invalid mode '{mode}'. Use one of: {valid_modes}"}
- )
- try:
- result = _run_async(
- _rag_instance.aquery(query, param=QueryParam(mode=mode))
- )
- return json.dumps({"mode": mode, "query": query, "result": result})
- except Exception as e:
- return json.dumps({"error": str(e)})
- return user_proxy, researcher, analyst, writer
- def run_multiagent_query(user_proxy, researcher, analyst, writer, question: str):
- """Run a multi-agent GroupChat to answer a question using LightRAG."""
- # Enforce pipeline: Researcher -> Analyst -> Writer.
- # func_call_filter (default True) automatically routes tool calls
- # to/from user_proxy, so transitions only govern non-tool handoffs.
- # User can only start with Researcher; Researcher advances to Analyst;
- # Analyst advances to Writer. Writer terminates the conversation.
- allowed_transitions = {
- user_proxy: [researcher],
- researcher: [user_proxy, analyst],
- analyst: [user_proxy, writer],
- writer: [],
- }
- group_chat = GroupChat(
- agents=[user_proxy, researcher, analyst, writer],
- messages=[],
- max_round=12,
- allowed_or_disallowed_speaker_transitions=allowed_transitions,
- speaker_transitions_type="allowed",
- )
- manager = GroupChatManager(
- groupchat=group_chat,
- llm_config=LLMConfig(
- {
- "model": os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
- "api_key": os.environ["OPENAI_API_KEY"],
- "api_type": "openai",
- }
- ),
- is_termination_msg=lambda msg: "TERMINATE" in (msg.get("content") or ""),
- )
- print(f"Question: {question}\n{'=' * 60}\n")
- user_proxy.run(manager, message=question).process()
- print(f"\n{'=' * 60}")
- # --- Main ---
- def main():
- global _rag_instance, _bg_loop
- if not os.getenv("OPENAI_API_KEY"):
- print(
- "Error: OPENAI_API_KEY environment variable is not set.\n"
- "Set it by running: export OPENAI_API_KEY='your-openai-api-key'"
- )
- return
- # Start a background event loop for LightRAG async calls.
- # AG2 tools run in threads without an event loop, so we need a
- # persistent loop that can accept coroutines from any thread.
- _bg_loop = asyncio.new_event_loop()
- bg_thread = threading.Thread(
- target=_start_background_loop, args=(_bg_loop,), daemon=True
- )
- bg_thread.start()
- try:
- # Step 1: Set up LightRAG (async, runs on the background loop)
- _rag_instance = _run_async(setup_lightrag())
- # Step 2: Create AG2 agents with LightRAG tools
- user_proxy, researcher, analyst, writer = create_agents()
- # Step 3: Ask a complex question
- run_multiagent_query(
- user_proxy,
- researcher,
- analyst,
- writer,
- question=(
- "How do transformer architectures relate to RAG systems? "
- "What role do knowledge graphs play in improving retrieval quality?"
- ),
- )
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- if _rag_instance:
- _run_async(_rag_instance.finalize_storages())
- _bg_loop.call_soon_threadsafe(_bg_loop.stop)
- bg_thread.join(timeout=5)
- shutil.rmtree(WORKING_DIR, ignore_errors=True)
- if __name__ == "__main__":
- main()
- print("\nDone!")
|