streaming.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. """
  2. Agency Swarm Streaming Demo
  3. Simple demonstration of real-time streaming in Agency Swarm v1.x.
  4. Shows how to handle streaming events and filter text vs tool call data.
  5. """
  6. import asyncio
  7. import logging
  8. import os
  9. import sys
  10. # Path setup for standalone examples
  11. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
  12. from agency_swarm import Agency, Agent, function_tool
  13. # ===== DEBUG CONFIGURATION =====
  14. # Set to True to see ALL raw events for frontend integration
  15. # Set to "verbose" to see raw events
  16. DEBUG_MODE = False
  17. logging.basicConfig(level=logging.WARNING)
  18. # --- Logging Configuration --- #
  19. def print_event_info(event, event_count):
  20. # Extract key fields
  21. agent_name = getattr(event, "agent", None)
  22. caller_agent = getattr(event, "callerAgent", None)
  23. event_type = getattr(event, "type", None)
  24. call_id = getattr(event, "call_id", None)
  25. item_id = getattr(event, "item_id", None)
  26. # For data events, get the nested type
  27. if hasattr(event, "data") and hasattr(event.data, "type"):
  28. data_type = event.data.type
  29. else:
  30. data_type = None
  31. # Format the output
  32. print(f"\n[EVENT #{event_count}]")
  33. print(f" agent: {agent_name}")
  34. print(f" callerAgent: {caller_agent}")
  35. if call_id:
  36. print(f" call_id: {call_id}")
  37. if item_id:
  38. print(f" item_id: {item_id}")
  39. print(f" event.type: {event_type}")
  40. if data_type:
  41. print(f" data.type: {data_type}")
  42. # Show raw event only if verbose
  43. if DEBUG_MODE == "verbose":
  44. print(f" Raw: {event}")
  45. # --- Simple Tool --- #
  46. @function_tool
  47. def get_weather(location: str) -> str:
  48. """Get weather information for a location."""
  49. return f"The weather in {location} is sunny, 22°C with light winds."
  50. # --- Agent Setup --- #
  51. def create_demo_agency():
  52. """Create a demo agency for the TUI."""
  53. ceo = Agent(
  54. name="CEO",
  55. description="Chief Executive Officer - oversees all operations",
  56. instructions="You are the CEO. When asked about weather, delegate to Worker with a specific location (use London if not specified).",
  57. tools=[],
  58. )
  59. worker = Agent(
  60. name="Worker",
  61. description="Worker - performs tasks and writes weather reports",
  62. instructions="You handle weather tasks. Use the get_weather tool which returns weather reports.",
  63. tools=[get_weather],
  64. )
  65. # Create agency with communication flows (v1.x pattern)
  66. agency = Agency(
  67. ceo, # Entry point agent (positional argument)
  68. communication_flows=[ceo > worker],
  69. name="TerminalDemoAgency",
  70. )
  71. return agency
  72. agency = create_demo_agency()
  73. # --- Streaming Handler --- #
  74. async def stream_response(message: str):
  75. """Stream a response and handle events properly."""
  76. print(f"\nStreaming: {message}")
  77. print("Response: ", end="", flush=True)
  78. full_text = ""
  79. event_count = 0
  80. async for event in agency.get_response_stream(message):
  81. event_count += 1
  82. # Debug logging for frontend developers
  83. if DEBUG_MODE:
  84. print_event_info(event, event_count)
  85. # Normal streaming logic
  86. if hasattr(event, "data"):
  87. data = event.data
  88. # Only capture actual response text, not tool call arguments
  89. if hasattr(data, "delta") and hasattr(data, "type"):
  90. if data.type == "response.output_text.delta":
  91. # Stream the actual response text in real-time
  92. delta_text = data.delta
  93. if delta_text:
  94. if not DEBUG_MODE:
  95. print(delta_text, end="", flush=True)
  96. full_text += delta_text
  97. # Skip tool call deltas (we don't want to show those to users)
  98. elif data.type == "response.function_call_arguments.delta":
  99. continue
  100. # Handle validation errors
  101. elif isinstance(event, dict):
  102. event_type = event.get("event", event.get("type"))
  103. if event_type == "error":
  104. print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}")
  105. break
  106. if DEBUG_MODE:
  107. print(f"\nTotal events received: {event_count}")
  108. print("\n✅ Stream complete")
  109. print(f"Total: {len(full_text)} characters streamed")
  110. return full_text
  111. # --- Main Demo --- #
  112. async def main():
  113. """Run simple streaming demo."""
  114. print("Agency Swarm Streaming Demo")
  115. print("=" * 40)
  116. print("Watch text stream in real-time.")
  117. await stream_response("What's the weather in London?")
  118. print("\nDemo complete!")
  119. if __name__ == "__main__":
  120. asyncio.run(main())