multi_agent_workflow.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # examples/multi_agent_workflow.py
  2. """
  3. Multi-Agent Collaboration Example with Validation
  4. This example demonstrates and validates that multi-agent communication works correctly
  5. in Agency Swarm. It creates a financial analysis workflow where:
  6. 1. PortfolioManager (orchestrator) - Gathers market data and coordinates analysis
  7. 2. RiskAnalyst (specialist) - Analyzes investment risks using specialized tools
  8. 3. ReportGenerator (specialist) - Formats professional investment reports
  9. The example also utilizes the output_type parameter to improve the structure of agents responses.
  10. Run with: python examples/multi_agent_workflow.py
  11. """
  12. import asyncio
  13. import logging
  14. import os
  15. import sys
  16. from typing import Any
  17. from pydantic import BaseModel, Field
  18. # Configure basic logging
  19. logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s")
  20. examples_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  21. sys.path.insert(0, os.path.join(examples_root, "src"))
  22. sys.path.insert(0, examples_root)
  23. from agency_swarm import Agency, Agent, RunContextWrapper, function_tool # noqa: E402
  24. from examples.utils import print_send_message_exchange # noqa: E402
  25. # --- Structured Output Types ---
  26. class RiskAssessment(BaseModel):
  27. risk_level: str = Field(..., description="Overall risk level (Low/Moderate/High)")
  28. risk_score: str = Field(..., description="Risk score out of 10")
  29. key_risks: list[str] = Field(..., description="List of key risk factors")
  30. recommendation: str = Field(..., description="Risk-based recommendation")
  31. class InvestmentReport(BaseModel):
  32. executive_summary: str = Field(..., description="Brief executive summary")
  33. market_position: str = Field(..., description="Current market position analysis")
  34. risk_analysis: str = Field(..., description="Risk analysis summary")
  35. final_recommendation: str = Field(..., description="Final investment recommendation")
  36. # --- Global tracking for validation ---
  37. tool_calls_made = []
  38. agent_interactions = []
  39. # --- Simple Tools ---
  40. @function_tool()
  41. async def fetch_market_data(wrapper: RunContextWrapper[Any], symbol: str) -> str:
  42. """Fetches basic market data for a stock symbol."""
  43. print(f"--- TOOL: fetch_market_data called for {symbol} ---")
  44. tool_calls_made.append(f"fetch_market_data:{symbol}")
  45. await asyncio.sleep(0.3) # Simulate API call
  46. return f"Retrieved market data for {symbol}: Price $175.43, Market Cap $2.85T, P/E 28.5, Rating: Buy"
  47. @function_tool()
  48. async def analyze_risk_factors(wrapper: RunContextWrapper[Any], symbol: str) -> str:
  49. """Analyzes risk factors for a stock."""
  50. print(f"--- TOOL: analyze_risk_factors called for {symbol} ---")
  51. tool_calls_made.append(f"analyze_risk_factors:{symbol}")
  52. await asyncio.sleep(0.4) # Simulate analysis
  53. return f"Risk analysis for {symbol}: High P/E suggests overvaluation risk, Beta 1.29 indicates volatility, Strong balance sheet provides stability"
  54. @function_tool()
  55. async def format_professional_report(wrapper: RunContextWrapper[Any], content: str) -> str:
  56. """Formats content into a professional investment report."""
  57. print("--- TOOL: format_professional_report called ---")
  58. tool_calls_made.append("format_professional_report")
  59. await asyncio.sleep(0.2) # Simulate formatting
  60. return f"Professional report formatted with: {content[:50]}..."
  61. # --- Define Agents ---
  62. portfolio_manager = Agent(
  63. name="PortfolioManager",
  64. instructions="""You orchestrate investment research by:
  65. 1. Using fetch_market_data tool to get financial metrics
  66. 2. Delegating risk analysis to RiskAnalyst
  67. 3. Delegating report formatting to ReportGenerator
  68. 4. Compiling final investment recommendation
  69. Always gather data first, then delegate analysis, then request formatting.""",
  70. tools=[fetch_market_data],
  71. output_type=str, # Returns final recommendation
  72. )
  73. risk_analyst = Agent(
  74. name="RiskAnalyst",
  75. instructions="""You specialize in investment risk analysis. When given market data:
  76. 1. Use analyze_risk_factors tool to evaluate risks
  77. 2. Assess volatility, valuation, and market position
  78. 3. Return structured risk assessment
  79. Focus on identifying key risks and providing clear risk scoring.""",
  80. tools=[analyze_risk_factors],
  81. output_type=RiskAssessment, # Structured risk output
  82. )
  83. report_generator = Agent(
  84. name="ReportGenerator",
  85. instructions="""You create professional investment reports. When given analysis:
  86. 1. Use format_professional_report tool for formatting assistance
  87. 2. Structure information with clear sections
  88. 3. Return professional investment report
  89. Focus on clear, executive-ready presentation.""",
  90. tools=[format_professional_report],
  91. output_type=InvestmentReport, # Structured report output
  92. )
  93. # --- Create Agency ---
  94. agency = Agency(
  95. portfolio_manager, # Entry point and orchestrator
  96. communication_flows=[
  97. portfolio_manager > risk_analyst,
  98. portfolio_manager > report_generator,
  99. ],
  100. shared_instructions="Provide accurate, professional financial analysis.",
  101. )
  102. # Helper function to visualize send message arguments
  103. def print_send_message_history(agency: Agency, agent_name: str) -> None:
  104. print("Message history for inter-agent communications:")
  105. print_send_message_exchange(agency, owner=agent_name)
  106. async def run_workflow():
  107. print("\n--- Investment Research Platform Demo ---")
  108. print("Portfolio Manager orchestrates by calling specialist agents and compiling results.\n")
  109. # Reset tracking
  110. global tool_calls_made, agent_interactions
  111. tool_calls_made = []
  112. agent_interactions = []
  113. stock_symbol = "AAPL"
  114. print(f"Client Request: Analyze investment opportunity for {stock_symbol}")
  115. response = await agency.get_response(
  116. message=f"Provide comprehensive investment analysis for {stock_symbol}. Get market data, risk assessment, and professional report."
  117. )
  118. print_send_message_history(agency, "PortfolioManager")
  119. print("\nFinal Investment Analysis:")
  120. print(f"{response.final_output}")
  121. print(f"\nCompleted in {len(response.new_items)} agent actions.")
  122. if __name__ == "__main__":
  123. success = asyncio.run(run_workflow())