Runner.run() vs Runner.run_streamed()
Runner.run() waits for the full agent response before returning anything. Runner.run_streamed() returns a StreamedRunResult immediately and yields events as they are generated — tokens, tool calls, agent transitions, and completion signals.
from agents import Agent, Runner
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant."
)
# Blocking — waits for full response
result = await Runner.run(agent, "Explain vector databases")
print(result.final_output)
# Streaming — yields events as they happen
async with Runner.run_streamed(agent, "Explain vector databases") as stream:
async for event in stream:
print(event) # StreamEvent objects
StreamEvent Types
| Event type | When it fires | Useful for |
|---|---|---|
| RawResponsesStreamEvent | Each chunk from the underlying model API | Token-by-token streaming to the user |
| RunItemStreamEvent (tool_called) | When the agent decides to call a tool | Show 'Calling tool...' in the UI |
| RunItemStreamEvent (tool_output) | When a tool call returns its result | Show tool result or dismiss loading state |
| RunItemStreamEvent (message_output) | When the agent produces a message | Full message output (non-streaming) |
| AgentUpdatedStreamEvent | When a handoff changes the active agent | Show which agent is currently active |
Token-by-Token Streaming
from agents import Agent, Runner, RawResponsesStreamEvent
from openai.types.responses import ResponseTextDeltaEvent
import asyncio
agent = Agent(
name="Writer",
instructions="You are a concise technical writer."
)
async def stream_tokens():
async with Runner.run_streamed(agent, "Describe the CAP theorem briefly") as stream:
async for event in stream:
# Filter to raw model events only
if not isinstance(event, RawResponsesStreamEvent):
continue
# Extract text delta from the underlying response event
data = event.data
if isinstance(data, ResponseTextDeltaEvent):
print(data.delta, end="", flush=True)
print() # newline after stream completes
asyncio.run(stream_tokens())
Streaming Tool Call Events
from agents import Agent, Runner, RunItemStreamEvent, function_tool
from agents.items import ToolCallItem, ToolCallOutputItem, MessageOutputItem
import asyncio
@function_tool
def get_stock_price(ticker: str) -> str:
"""Get the current stock price for a ticker symbol."""
prices = {"AAPL": "$189.50", "GOOGL": "$175.20", "MSFT": "$420.30"}
return prices.get(ticker.upper(), "Ticker not found")
agent = Agent(
name="Finance Agent",
instructions="Help users with stock price queries.",
tools=[get_stock_price]
)
async def stream_with_tools():
async with Runner.run_streamed(agent, "What are the prices of AAPL and MSFT?") as stream:
async for event in stream:
if isinstance(event, RunItemStreamEvent):
item = event.item
if isinstance(item, ToolCallItem):
name = item.raw_item.name
args = item.raw_item.arguments
print(f"[TOOL CALL] {name}({args})")
elif isinstance(item, ToolCallOutputItem):
print(f"[TOOL RESULT] {item.output}")
elif isinstance(item, MessageOutputItem):
print(f"[AGENT] {item.raw_item.content[0].text}")
asyncio.run(stream_with_tools())
Accessing the Final Result After Streaming
async with Runner.run_streamed(agent, "Summarise AI trends in 2026") as stream:
async for event in stream:
# ... handle events ...
pass
# After the stream completes, the full result is available
result = stream.result
print("Final output:", result.final_output)
print("New items:", result.new_items)
print("Raw responses:", result.raw_responses)
FastAPI SSE Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from agents import Agent, Runner, RawResponsesStreamEvent, RunItemStreamEvent
from agents.items import ToolCallItem, ToolCallOutputItem
from openai.types.responses import ResponseTextDeltaEvent
import json
app = FastAPI()
class ChatRequest(BaseModel):
message: str
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
tools=[get_stock_price]
)
async def sse_generator(message: str):
async with Runner.run_streamed(agent, message) as stream:
async for event in stream:
if isinstance(event, RawResponsesStreamEvent):
if isinstance(event.data, ResponseTextDeltaEvent):
payload = json.dumps({"type": "token", "content": event.data.delta})
yield f"data: {payload}\n\n"
elif isinstance(event, RunItemStreamEvent):
if isinstance(event.item, ToolCallItem):
payload = json.dumps({
"type": "tool_call",
"name": event.item.raw_item.name
})
yield f"data: {payload}\n\n"
elif isinstance(event.item, ToolCallOutputItem):
payload = json.dumps({"type": "tool_done"})
yield f"data: {payload}\n\n"
yield 'data: {"type": "done"}\n\n'
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
return StreamingResponse(
sse_generator(req.message),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
Streaming with Multi-Agent Handoffs
from agents import Agent, Runner, AgentUpdatedStreamEvent
billing_agent = Agent(name="Billing", instructions="Handle billing questions.")
tech_agent = Agent(name="Technical", instructions="Handle technical questions.")
triage_agent = Agent(
name="Triage",
instructions="Route to the right specialist agent.",
handoffs=[billing_agent, tech_agent]
)
async def stream_with_handoffs():
async with Runner.run_streamed(triage_agent, "I can't log in to my account") as stream:
async for event in stream:
# Detect when a handoff happens
if isinstance(event, AgentUpdatedStreamEvent):
print(f"[HANDOFF] Now talking to: {event.new_agent.name}")
Common Streaming Mistakes
- Not using async with — Runner.run_streamed() returns a context manager. Forgetting async with means the stream never starts.
- Trying to access stream.result before the async for loop completes — the result is only available after all events have been consumed.
- Missing X-Accel-Buffering: no in Nginx deployments — SSE responses will buffer at the proxy and users see nothing in real time.
- Printing event directly without type checking — different event types have different .data structures. Always check isinstance() first.