| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- ---
- title: "Streaming"
- description: "Implementing streaming in Agency Swarm."
- icon: "ellipsis"
- ---
- Streaming enables agents to return outputs immediately, significantly improving user experience. Instead of waiting for the entire response to be generated, the user can see the response being generated in real-time.
- ## Streaming Responses
- In Agency Swarm, streaming is handled through the `get_response_stream` method. The framework returns [StreamEvent](https://openai.github.io/openai-agents-python/ref/stream_events/#agents.stream_events.StreamEvent) objects as they are returned by OpenAI, providing direct access to the underlying streaming events.
- ```python
- async def stream_response(message: str):
- """Stream a response and handle events properly."""
- full_text = ""
- async for event in agency.get_response_stream(message):
- # Handle streaming events with data
- if hasattr(event, "data"):
- data = event.data
- # Only capture actual response text, not tool call arguments
- if hasattr(data, "delta") and hasattr(data, "type"):
- if data.type == "response.output_text.delta":
- # Stream the actual response text in real-time
- delta_text = data.delta
- if delta_text:
- print(delta_text, end="", flush=True)
- full_text += delta_text
- # Skip tool call deltas (we don't want to show those to users)
- elif data.type == "response.function_call_arguments.delta":
- continue
- # Handle validation errors
- elif isinstance(event, dict):
- event_type = event.get("event", event.get("type"))
- if event_type == "error":
- print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}")
- break
- print("\n✅ Stream complete")
- return full_text
- # Usage
- await stream_response("I want you to build me a website")
- ```
- ## Streaming from Tools
- Tools can inject events into the parent SSE stream while they execute. This is especially useful when a tool runs a sub-agent or a long-running operation and you want the client to receive live progress updates alongside the normal agent output.
- ### Accessing the Streaming Context
- Inside any `BaseTool.run()` method, two properties give you access to the live stream:
- | Field | Type | What it is |
- |---|---|---|
- | `self.context.streaming_context` | `StreamingContext \| None` | The queue that feeds the parent stream. `None` when the request is not streaming. |
- | `self.tool_call_id` | `str \| None` | The unique ID of this specific tool invocation. Use it to let clients correlate events with the tool call. |
- ```python
- streaming_ctx = self.context.streaming_context
- tool_call_id = self.tool_call_id or ""
- ```
- `streaming_ctx` is `None` when the request is not streaming, so always guard before calling `put_event`.
- Any object passed to `streaming_ctx.put_event(event)` is forwarded into the same stream your client is already consuming. There are three ways to use this, depending on how informative you want the events to be.
- ### Method 1: Raw values
- The simplest option — emit any plain value directly. Useful for quick progress strings during development.
- ```python
- class MyTool(BaseTool):
- """Does something and reports progress."""
- async def run(self):
- streaming_ctx = self.context.streaming_context
- if streaming_ctx:
- await streaming_ctx.put_event("step 1/3: fetching data…")
- data = await _fetch()
- if streaming_ctx:
- await streaming_ctx.put_event("step 2/3: processing…")
- result = await _process(data)
- if streaming_ctx:
- await streaming_ctx.put_event("step 3/3: done.")
- return result
- ```
- On the consumer side:
- ```python
- async for event in agency.get_response_stream(message):
- if isinstance(event, str):
- print(f"[tool] {event}")
- ```
- ### Method 2: Existing SDK event types
- While neither the OpenAI API nor the `agents` SDK contains event types specifically designed for streaming output from tool execution, you can construct a [`RawResponsesStreamEvent`](https://openai.github.io/openai-agents-python/ref/stream_events/#agents.stream_events.RawResponsesStreamEvent) wrapping a `ResponseTextDeltaEvent` directly inside the tool. The internal tracking fields (`item_id`, `content_index`, etc.) just need placeholder values — the consumer's existing delta-handling code treats it identically to a normal LLM text delta.
- ```python
- from agents import RawResponsesStreamEvent
- from openai.types.responses import ResponseTextDeltaEvent
- from agency_swarm.tools import BaseTool
- from pydantic import Field
- class MyTool(BaseTool):
- """Does something and streams progress as text deltas."""
- task: str = Field(..., description="The task to perform")
- async def run(self):
- streaming_ctx = self.context.streaming_context
- async def emit_delta(text: str) -> None:
- if streaming_ctx:
- await streaming_ctx.put_event(
- RawResponsesStreamEvent(
- data=ResponseTextDeltaEvent(
- content_index=0,
- output_index=0,
- sequence_number=0,
- item_id="tool_progress",
- logprobs=[],
- delta=text,
- type="response.output_text.delta",
- )
- )
- )
- await emit_delta("Starting…")
- result = await _perform_task(self.task)
- await emit_delta(" Done.")
- return result
- ```
- On the consumer side, these events are indistinguishable from regular LLM text deltas unless you inspect the `item_id` you set on the event:
- ```python
- from agents import RawResponsesStreamEvent
- from openai.types.responses import ResponseTextDeltaEvent
- async for event in agency.get_response_stream(message):
- if isinstance(event, RawResponsesStreamEvent) and isinstance(event.data, ResponseTextDeltaEvent):
- if event.data.item_id == "tool_progress":
- print(f"[tool] {event.data.delta}", end="", flush=True)
- else:
- print(event.data.delta, end="", flush=True)
- ```
- ### Method 3: Custom event model
- For the most informative output you can construct your own class that includes structured metadata — status, tool call ID for correlation, error details, etc. No built-in SDK type covers tool execution progress, so this is the recommended approach for production use, but it will require explicit checks on the consumer side to separate these events from standard SDK events.
- ```python
- from typing import Literal
- from pydantic import BaseModel, Field
- from agency_swarm.tools import BaseTool
- class ToolProgressEvent(BaseModel):
- """Streaming progress event emitted by a tool invocation."""
- call_id: str
- status: Literal["in_progress", "completed", "failed"] = "in_progress"
- delta: str | None = None
- error: str | None = None
- class LongRunningTool(BaseTool):
- """A tool that reports live progress through the parent stream."""
- task: str = Field(..., description="The task to perform")
- async def run(self):
- streaming_ctx = self.context.streaming_context
- tool_call_id = self.tool_call_id or ""
- async def emit(status: str, delta: str | None = None, error: str | None = None) -> None:
- if streaming_ctx:
- await streaming_ctx.put_event(
- ToolProgressEvent(call_id=tool_call_id, status=status, delta=delta, error=error)
- )
- await emit("in_progress", delta="Starting…")
- try:
- result = await _perform_task(self.task)
- await emit("completed")
- return result
- except Exception as exc:
- await emit("failed", error=str(exc))
- raise
- ```
- On the consumer side, filter by type:
- ```python
- async for event in agency.get_response_stream(message):
- if isinstance(event, ToolProgressEvent):
- print(f"[{event.status}] {event.delta or ''}")
- ```
- <Warning>
- Regardless of the chosen method, events emitted via `put_event()` are only visible to the **client consuming the stream** — the calling agent never sees them. From the agent's perspective, the tool returns only its final `return` value, which is what gets added to the conversation history and passed back to the model.
- </Warning>
|