streaming.mdx 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. ---
  2. title: "Streaming"
  3. description: "Implementing streaming in Agency Swarm."
  4. icon: "ellipsis"
  5. ---
  6. Streaming enables agents to return outputs immediately, significantly improving user experience. Instead of waiting for the entire response to be generated, the user can see the response being generated in real-time.
  7. ## Streaming Responses
  8. In Agency Swarm, streaming is handled through the `get_response_stream` method. The framework returns [StreamEvent](https://openai.github.io/openai-agents-python/ref/stream_events/#agents.stream_events.StreamEvent) objects as they are returned by OpenAI, providing direct access to the underlying streaming events.
  9. ```python
  10. async def stream_response(message: str):
  11. """Stream a response and handle events properly."""
  12. full_text = ""
  13. async for event in agency.get_response_stream(message):
  14. # Handle streaming events with data
  15. if hasattr(event, "data"):
  16. data = event.data
  17. # Only capture actual response text, not tool call arguments
  18. if hasattr(data, "delta") and hasattr(data, "type"):
  19. if data.type == "response.output_text.delta":
  20. # Stream the actual response text in real-time
  21. delta_text = data.delta
  22. if delta_text:
  23. print(delta_text, end="", flush=True)
  24. full_text += delta_text
  25. # Skip tool call deltas (we don't want to show those to users)
  26. elif data.type == "response.function_call_arguments.delta":
  27. continue
  28. # Handle validation errors
  29. elif isinstance(event, dict):
  30. event_type = event.get("event", event.get("type"))
  31. if event_type == "error":
  32. print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}")
  33. break
  34. print("\n✅ Stream complete")
  35. return full_text
  36. # Usage
  37. await stream_response("I want you to build me a website")
  38. ```
  39. ## Streaming from Tools
  40. Tools can inject events into the parent SSE stream while they execute. This is especially useful when a tool runs a sub-agent or a long-running operation and you want the client to receive live progress updates alongside the normal agent output.
  41. ### Accessing the Streaming Context
  42. Inside any `BaseTool.run()` method, two properties give you access to the live stream:
  43. | Field | Type | What it is |
  44. |---|---|---|
  45. | `self.context.streaming_context` | `StreamingContext \| None` | The queue that feeds the parent stream. `None` when the request is not streaming. |
  46. | `self.tool_call_id` | `str \| None` | The unique ID of this specific tool invocation. Use it to let clients correlate events with the tool call. |
  47. ```python
  48. streaming_ctx = self.context.streaming_context
  49. tool_call_id = self.tool_call_id or ""
  50. ```
  51. `streaming_ctx` is `None` when the request is not streaming, so always guard before calling `put_event`.
  52. Any object passed to `streaming_ctx.put_event(event)` is forwarded into the same stream your client is already consuming. There are three ways to use this, depending on how informative you want the events to be.
  53. ### Method 1: Raw values
  54. The simplest option — emit any plain value directly. Useful for quick progress strings during development.
  55. ```python
  56. class MyTool(BaseTool):
  57. """Does something and reports progress."""
  58. async def run(self):
  59. streaming_ctx = self.context.streaming_context
  60. if streaming_ctx:
  61. await streaming_ctx.put_event("step 1/3: fetching data…")
  62. data = await _fetch()
  63. if streaming_ctx:
  64. await streaming_ctx.put_event("step 2/3: processing…")
  65. result = await _process(data)
  66. if streaming_ctx:
  67. await streaming_ctx.put_event("step 3/3: done.")
  68. return result
  69. ```
  70. On the consumer side:
  71. ```python
  72. async for event in agency.get_response_stream(message):
  73. if isinstance(event, str):
  74. print(f"[tool] {event}")
  75. ```
  76. ### Method 2: Existing SDK event types
  77. While neither the OpenAI API nor the `agents` SDK contains event types specifically designed for streaming output from tool execution, you can construct a [`RawResponsesStreamEvent`](https://openai.github.io/openai-agents-python/ref/stream_events/#agents.stream_events.RawResponsesStreamEvent) wrapping a `ResponseTextDeltaEvent` directly inside the tool. The internal tracking fields (`item_id`, `content_index`, etc.) just need placeholder values — the consumer's existing delta-handling code treats it identically to a normal LLM text delta.
  78. ```python
  79. from agents import RawResponsesStreamEvent
  80. from openai.types.responses import ResponseTextDeltaEvent
  81. from agency_swarm.tools import BaseTool
  82. from pydantic import Field
  83. class MyTool(BaseTool):
  84. """Does something and streams progress as text deltas."""
  85. task: str = Field(..., description="The task to perform")
  86. async def run(self):
  87. streaming_ctx = self.context.streaming_context
  88. async def emit_delta(text: str) -> None:
  89. if streaming_ctx:
  90. await streaming_ctx.put_event(
  91. RawResponsesStreamEvent(
  92. data=ResponseTextDeltaEvent(
  93. content_index=0,
  94. output_index=0,
  95. sequence_number=0,
  96. item_id="tool_progress",
  97. logprobs=[],
  98. delta=text,
  99. type="response.output_text.delta",
  100. )
  101. )
  102. )
  103. await emit_delta("Starting…")
  104. result = await _perform_task(self.task)
  105. await emit_delta(" Done.")
  106. return result
  107. ```
  108. On the consumer side, these events are indistinguishable from regular LLM text deltas unless you inspect the `item_id` you set on the event:
  109. ```python
  110. from agents import RawResponsesStreamEvent
  111. from openai.types.responses import ResponseTextDeltaEvent
  112. async for event in agency.get_response_stream(message):
  113. if isinstance(event, RawResponsesStreamEvent) and isinstance(event.data, ResponseTextDeltaEvent):
  114. if event.data.item_id == "tool_progress":
  115. print(f"[tool] {event.data.delta}", end="", flush=True)
  116. else:
  117. print(event.data.delta, end="", flush=True)
  118. ```
  119. ### Method 3: Custom event model
  120. For the most informative output you can construct your own class that includes structured metadata — status, tool call ID for correlation, error details, etc. No built-in SDK type covers tool execution progress, so this is the recommended approach for production use, but it will require explicit checks on the consumer side to separate these events from standard SDK events.
  121. ```python
  122. from typing import Literal
  123. from pydantic import BaseModel, Field
  124. from agency_swarm.tools import BaseTool
  125. class ToolProgressEvent(BaseModel):
  126. """Streaming progress event emitted by a tool invocation."""
  127. call_id: str
  128. status: Literal["in_progress", "completed", "failed"] = "in_progress"
  129. delta: str | None = None
  130. error: str | None = None
  131. class LongRunningTool(BaseTool):
  132. """A tool that reports live progress through the parent stream."""
  133. task: str = Field(..., description="The task to perform")
  134. async def run(self):
  135. streaming_ctx = self.context.streaming_context
  136. tool_call_id = self.tool_call_id or ""
  137. async def emit(status: str, delta: str | None = None, error: str | None = None) -> None:
  138. if streaming_ctx:
  139. await streaming_ctx.put_event(
  140. ToolProgressEvent(call_id=tool_call_id, status=status, delta=delta, error=error)
  141. )
  142. await emit("in_progress", delta="Starting…")
  143. try:
  144. result = await _perform_task(self.task)
  145. await emit("completed")
  146. return result
  147. except Exception as exc:
  148. await emit("failed", error=str(exc))
  149. raise
  150. ```
  151. On the consumer side, filter by type:
  152. ```python
  153. async for event in agency.get_response_stream(message):
  154. if isinstance(event, ToolProgressEvent):
  155. print(f"[{event.status}] {event.delta or ''}")
  156. ```
  157. <Warning>
  158. Regardless of the chosen method, events emitted via `put_event()` are only visible to the **client consuming the stream** — the calling agent never sees them. From the agent's perspective, the tool returns only its final `return` value, which is what gets added to the conversation history and passed back to the model.
  159. </Warning>