| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- """
- Agency Swarm Streaming Demo
- Simple demonstration of real-time streaming in Agency Swarm v1.x.
- Shows how to handle streaming events and filter text vs tool call data.
- """
- import asyncio
- import logging
- import os
- import sys
- # Path setup for standalone examples
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
- from agency_swarm import Agency, Agent, function_tool
- # ===== DEBUG CONFIGURATION =====
- # Set to True to see ALL raw events for frontend integration
- # Set to "verbose" to see raw events
- DEBUG_MODE = False
- logging.basicConfig(level=logging.WARNING)
- # --- Logging Configuration --- #
- def print_event_info(event, event_count):
- # Extract key fields
- agent_name = getattr(event, "agent", None)
- caller_agent = getattr(event, "callerAgent", None)
- event_type = getattr(event, "type", None)
- call_id = getattr(event, "call_id", None)
- item_id = getattr(event, "item_id", None)
- # For data events, get the nested type
- if hasattr(event, "data") and hasattr(event.data, "type"):
- data_type = event.data.type
- else:
- data_type = None
- # Format the output
- print(f"\n[EVENT #{event_count}]")
- print(f" agent: {agent_name}")
- print(f" callerAgent: {caller_agent}")
- if call_id:
- print(f" call_id: {call_id}")
- if item_id:
- print(f" item_id: {item_id}")
- print(f" event.type: {event_type}")
- if data_type:
- print(f" data.type: {data_type}")
- # Show raw event only if verbose
- if DEBUG_MODE == "verbose":
- print(f" Raw: {event}")
- # --- Simple Tool --- #
- @function_tool
- def get_weather(location: str) -> str:
- """Get weather information for a location."""
- return f"The weather in {location} is sunny, 22°C with light winds."
- # --- Agent Setup --- #
- def create_demo_agency():
- """Create a demo agency for the TUI."""
- ceo = Agent(
- name="CEO",
- description="Chief Executive Officer - oversees all operations",
- instructions="You are the CEO. When asked about weather, delegate to Worker with a specific location (use London if not specified).",
- tools=[],
- )
- worker = Agent(
- name="Worker",
- description="Worker - performs tasks and writes weather reports",
- instructions="You handle weather tasks. Use the get_weather tool which returns weather reports.",
- tools=[get_weather],
- )
- # Create agency with communication flows (v1.x pattern)
- agency = Agency(
- ceo, # Entry point agent (positional argument)
- communication_flows=[ceo > worker],
- name="TerminalDemoAgency",
- )
- return agency
- agency = create_demo_agency()
- # --- Streaming Handler --- #
- async def stream_response(message: str):
- """Stream a response and handle events properly."""
- print(f"\nStreaming: {message}")
- print("Response: ", end="", flush=True)
- full_text = ""
- event_count = 0
- async for event in agency.get_response_stream(message):
- event_count += 1
- # Debug logging for frontend developers
- if DEBUG_MODE:
- print_event_info(event, event_count)
- # Normal streaming logic
- if hasattr(event, "data"):
- data = event.data
- # Only capture actual response text, not tool call arguments
- if hasattr(data, "delta") and hasattr(data, "type"):
- if data.type == "response.output_text.delta":
- # Stream the actual response text in real-time
- delta_text = data.delta
- if delta_text:
- if not DEBUG_MODE:
- print(delta_text, end="", flush=True)
- full_text += delta_text
- # Skip tool call deltas (we don't want to show those to users)
- elif data.type == "response.function_call_arguments.delta":
- continue
- # Handle validation errors
- elif isinstance(event, dict):
- event_type = event.get("event", event.get("type"))
- if event_type == "error":
- print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}")
- break
- if DEBUG_MODE:
- print(f"\nTotal events received: {event_count}")
- print("\n✅ Stream complete")
- print(f"Total: {len(full_text)} characters streamed")
- return full_text
- # --- Main Demo --- #
- async def main():
- """Run simple streaming demo."""
- print("Agency Swarm Streaming Demo")
- print("=" * 40)
- print("Watch text stream in real-time.")
- await stream_response("What's the weather in London?")
- print("\nDemo complete!")
- if __name__ == "__main__":
- asyncio.run(main())
|