| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- # examples/multi_agent_workflow.py
- """
- Multi-Agent Collaboration Example with Validation
- This example demonstrates and validates that multi-agent communication works correctly
- in Agency Swarm. It creates a financial analysis workflow where:
- 1. PortfolioManager (orchestrator) - Gathers market data and coordinates analysis
- 2. RiskAnalyst (specialist) - Analyzes investment risks using specialized tools
- 3. ReportGenerator (specialist) - Formats professional investment reports
- The example also utilizes the output_type parameter to improve the structure of agents responses.
- Run with: python examples/multi_agent_workflow.py
- """
- import asyncio
- import logging
- import os
- import sys
- from typing import Any
- from pydantic import BaseModel, Field
- # Configure basic logging
- logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s")
- examples_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
- sys.path.insert(0, os.path.join(examples_root, "src"))
- sys.path.insert(0, examples_root)
- from agency_swarm import Agency, Agent, RunContextWrapper, function_tool # noqa: E402
- from examples.utils import print_send_message_exchange # noqa: E402
- # --- Structured Output Types ---
- class RiskAssessment(BaseModel):
- risk_level: str = Field(..., description="Overall risk level (Low/Moderate/High)")
- risk_score: str = Field(..., description="Risk score out of 10")
- key_risks: list[str] = Field(..., description="List of key risk factors")
- recommendation: str = Field(..., description="Risk-based recommendation")
- class InvestmentReport(BaseModel):
- executive_summary: str = Field(..., description="Brief executive summary")
- market_position: str = Field(..., description="Current market position analysis")
- risk_analysis: str = Field(..., description="Risk analysis summary")
- final_recommendation: str = Field(..., description="Final investment recommendation")
- # --- Global tracking for validation ---
- tool_calls_made = []
- agent_interactions = []
- # --- Simple Tools ---
- @function_tool()
- async def fetch_market_data(wrapper: RunContextWrapper[Any], symbol: str) -> str:
- """Fetches basic market data for a stock symbol."""
- print(f"--- TOOL: fetch_market_data called for {symbol} ---")
- tool_calls_made.append(f"fetch_market_data:{symbol}")
- await asyncio.sleep(0.3) # Simulate API call
- return f"Retrieved market data for {symbol}: Price $175.43, Market Cap $2.85T, P/E 28.5, Rating: Buy"
- @function_tool()
- async def analyze_risk_factors(wrapper: RunContextWrapper[Any], symbol: str) -> str:
- """Analyzes risk factors for a stock."""
- print(f"--- TOOL: analyze_risk_factors called for {symbol} ---")
- tool_calls_made.append(f"analyze_risk_factors:{symbol}")
- await asyncio.sleep(0.4) # Simulate analysis
- return f"Risk analysis for {symbol}: High P/E suggests overvaluation risk, Beta 1.29 indicates volatility, Strong balance sheet provides stability"
- @function_tool()
- async def format_professional_report(wrapper: RunContextWrapper[Any], content: str) -> str:
- """Formats content into a professional investment report."""
- print("--- TOOL: format_professional_report called ---")
- tool_calls_made.append("format_professional_report")
- await asyncio.sleep(0.2) # Simulate formatting
- return f"Professional report formatted with: {content[:50]}..."
- # --- Define Agents ---
- portfolio_manager = Agent(
- name="PortfolioManager",
- instructions="""You orchestrate investment research by:
- 1. Using fetch_market_data tool to get financial metrics
- 2. Delegating risk analysis to RiskAnalyst
- 3. Delegating report formatting to ReportGenerator
- 4. Compiling final investment recommendation
- Always gather data first, then delegate analysis, then request formatting.""",
- tools=[fetch_market_data],
- output_type=str, # Returns final recommendation
- )
- risk_analyst = Agent(
- name="RiskAnalyst",
- instructions="""You specialize in investment risk analysis. When given market data:
- 1. Use analyze_risk_factors tool to evaluate risks
- 2. Assess volatility, valuation, and market position
- 3. Return structured risk assessment
- Focus on identifying key risks and providing clear risk scoring.""",
- tools=[analyze_risk_factors],
- output_type=RiskAssessment, # Structured risk output
- )
- report_generator = Agent(
- name="ReportGenerator",
- instructions="""You create professional investment reports. When given analysis:
- 1. Use format_professional_report tool for formatting assistance
- 2. Structure information with clear sections
- 3. Return professional investment report
- Focus on clear, executive-ready presentation.""",
- tools=[format_professional_report],
- output_type=InvestmentReport, # Structured report output
- )
- # --- Create Agency ---
- agency = Agency(
- portfolio_manager, # Entry point and orchestrator
- communication_flows=[
- portfolio_manager > risk_analyst,
- portfolio_manager > report_generator,
- ],
- shared_instructions="Provide accurate, professional financial analysis.",
- )
- # Helper function to visualize send message arguments
- def print_send_message_history(agency: Agency, agent_name: str) -> None:
- print("Message history for inter-agent communications:")
- print_send_message_exchange(agency, owner=agent_name)
- async def run_workflow():
- print("\n--- Investment Research Platform Demo ---")
- print("Portfolio Manager orchestrates by calling specialist agents and compiling results.\n")
- # Reset tracking
- global tool_calls_made, agent_interactions
- tool_calls_made = []
- agent_interactions = []
- stock_symbol = "AAPL"
- print(f"Client Request: Analyze investment opportunity for {stock_symbol}")
- response = await agency.get_response(
- message=f"Provide comprehensive investment analysis for {stock_symbol}. Get market data, risk assessment, and professional report."
- )
- print_send_message_history(agency, "PortfolioManager")
- print("\nFinal Investment Analysis:")
- print(f"{response.final_output}")
- print(f"\nCompleted in {len(response.new_items)} agent actions.")
- if __name__ == "__main__":
- success = asyncio.run(run_workflow())
|