--- title: "Streaming" description: "Implementing streaming in Agency Swarm." icon: "ellipsis" --- Streaming enables agents to return outputs immediately, significantly improving user experience. Instead of waiting for the entire response to be generated, the user can see the response being generated in real-time. ## Streaming Responses In Agency Swarm, streaming is handled through the `get_response_stream` method. The framework returns [StreamEvent](https://openai.github.io/openai-agents-python/ref/stream_events/#agents.stream_events.StreamEvent) objects as they are returned by OpenAI, providing direct access to the underlying streaming events. ```python async def stream_response(message: str): """Stream a response and handle events properly.""" full_text = "" async for event in agency.get_response_stream(message): # Handle streaming events with data if hasattr(event, "data"): data = event.data # Only capture actual response text, not tool call arguments if hasattr(data, "delta") and hasattr(data, "type"): if data.type == "response.output_text.delta": # Stream the actual response text in real-time delta_text = data.delta if delta_text: print(delta_text, end="", flush=True) full_text += delta_text # Skip tool call deltas (we don't want to show those to users) elif data.type == "response.function_call_arguments.delta": continue # Handle validation errors elif isinstance(event, dict): event_type = event.get("event", event.get("type")) if event_type == "error": print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}") break print("\n✅ Stream complete") return full_text # Usage await stream_response("I want you to build me a website") ``` ## Streaming from Tools Tools can inject events into the parent SSE stream while they execute. This is especially useful when a tool runs a sub-agent or a long-running operation and you want the client to receive live progress updates alongside the normal agent output. ### Accessing the Streaming Context Inside any `BaseTool.run()` method, two properties give you access to the live stream: | Field | Type | What it is | |---|---|---| | `self.context.streaming_context` | `StreamingContext \| None` | The queue that feeds the parent stream. `None` when the request is not streaming. | | `self.tool_call_id` | `str \| None` | The unique ID of this specific tool invocation. Use it to let clients correlate events with the tool call. | ```python streaming_ctx = self.context.streaming_context tool_call_id = self.tool_call_id or "" ``` `streaming_ctx` is `None` when the request is not streaming, so always guard before calling `put_event`. Any object passed to `streaming_ctx.put_event(event)` is forwarded into the same stream your client is already consuming. There are three ways to use this, depending on how informative you want the events to be. ### Method 1: Raw values The simplest option — emit any plain value directly. Useful for quick progress strings during development. ```python class MyTool(BaseTool): """Does something and reports progress.""" async def run(self): streaming_ctx = self.context.streaming_context if streaming_ctx: await streaming_ctx.put_event("step 1/3: fetching data…") data = await _fetch() if streaming_ctx: await streaming_ctx.put_event("step 2/3: processing…") result = await _process(data) if streaming_ctx: await streaming_ctx.put_event("step 3/3: done.") return result ``` On the consumer side: ```python async for event in agency.get_response_stream(message): if isinstance(event, str): print(f"[tool] {event}") ``` ### Method 2: Existing SDK event types While neither the OpenAI API nor the `agents` SDK contains event types specifically designed for streaming output from tool execution, you can construct a [`RawResponsesStreamEvent`](https://openai.github.io/openai-agents-python/ref/stream_events/#agents.stream_events.RawResponsesStreamEvent) wrapping a `ResponseTextDeltaEvent` directly inside the tool. The internal tracking fields (`item_id`, `content_index`, etc.) just need placeholder values — the consumer's existing delta-handling code treats it identically to a normal LLM text delta. ```python from agents import RawResponsesStreamEvent from openai.types.responses import ResponseTextDeltaEvent from agency_swarm.tools import BaseTool from pydantic import Field class MyTool(BaseTool): """Does something and streams progress as text deltas.""" task: str = Field(..., description="The task to perform") async def run(self): streaming_ctx = self.context.streaming_context async def emit_delta(text: str) -> None: if streaming_ctx: await streaming_ctx.put_event( RawResponsesStreamEvent( data=ResponseTextDeltaEvent( content_index=0, output_index=0, sequence_number=0, item_id="tool_progress", logprobs=[], delta=text, type="response.output_text.delta", ) ) ) await emit_delta("Starting…") result = await _perform_task(self.task) await emit_delta(" Done.") return result ``` On the consumer side, these events are indistinguishable from regular LLM text deltas unless you inspect the `item_id` you set on the event: ```python from agents import RawResponsesStreamEvent from openai.types.responses import ResponseTextDeltaEvent async for event in agency.get_response_stream(message): if isinstance(event, RawResponsesStreamEvent) and isinstance(event.data, ResponseTextDeltaEvent): if event.data.item_id == "tool_progress": print(f"[tool] {event.data.delta}", end="", flush=True) else: print(event.data.delta, end="", flush=True) ``` ### Method 3: Custom event model For the most informative output you can construct your own class that includes structured metadata — status, tool call ID for correlation, error details, etc. No built-in SDK type covers tool execution progress, so this is the recommended approach for production use, but it will require explicit checks on the consumer side to separate these events from standard SDK events. ```python from typing import Literal from pydantic import BaseModel, Field from agency_swarm.tools import BaseTool class ToolProgressEvent(BaseModel): """Streaming progress event emitted by a tool invocation.""" call_id: str status: Literal["in_progress", "completed", "failed"] = "in_progress" delta: str | None = None error: str | None = None class LongRunningTool(BaseTool): """A tool that reports live progress through the parent stream.""" task: str = Field(..., description="The task to perform") async def run(self): streaming_ctx = self.context.streaming_context tool_call_id = self.tool_call_id or "" async def emit(status: str, delta: str | None = None, error: str | None = None) -> None: if streaming_ctx: await streaming_ctx.put_event( ToolProgressEvent(call_id=tool_call_id, status=status, delta=delta, error=error) ) await emit("in_progress", delta="Starting…") try: result = await _perform_task(self.task) await emit("completed") return result except Exception as exc: await emit("failed", error=str(exc)) raise ``` On the consumer side, filter by type: ```python async for event in agency.get_response_stream(message): if isinstance(event, ToolProgressEvent): print(f"[{event.status}] {event.delta or ''}") ``` Regardless of the chosen method, events emitted via `put_event()` are only visible to the **client consuming the stream** — the calling agent never sees them. From the agent's perspective, the tool returns only its final `return` value, which is what gets added to the conversation history and passed back to the model.