How to stream text and partial structured responses from PydanticAI agents
Why streaming matters for agents
A PydanticAI agent running a multi-step task can take 10-30 seconds. Without streaming, users stare at a blank screen. With streaming, they see progress — tokens arriving, tool calls executing, partial answers forming. This guide covers both text streaming and structured response streaming.
Text streaming
Use agent.run_stream() as an async context manager. It yields a StreamedRunResult which exposes stream_text() — an async generator of text deltas.
import asyncio
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o-mini')
async def main():
async with agent.run_stream('Write a short poem about the sea.') as result:
async for chunk in result.stream_text(delta=True):
print(chunk, end='', flush=True)
print() # newline at end
# Access usage stats after streaming completes
usage = result.usage()
print(f'Tokens: {usage.total_tokens}')
asyncio.run(main())
Pass delta=True to stream_text() to get incremental chunks. Without it, each yield includes the full text accumulated so far — useful for updating a text box that re-renders on each yield.Streaming with tools
Tool calls happen between text chunks. The stream pauses while a tool runs, then resumes with the next text chunk. Use stream_text(delta=True) and you get text from both before and after tool calls.
from pydantic_ai import Agent, RunContext
from pydantic_ai.tools import tool
agent = Agent('openai:gpt-4o')
@agent.tool_plain
def get_weather(city: str) -> str:
return f'The weather in {city} is sunny, 22C.'
async def stream_with_tools(query: str):
async with agent.run_stream(query) as result:
async for chunk in result.stream_text(delta=True):
print(chunk, end='', flush=True)
# After streaming, inspect all messages including tool calls
for msg in result.all_messages():
print(f' [{msg.__class__.__name__}]')
Structured response streaming
When your agent has a result_type (a Pydantic model), you can stream partial validations as the model generates the JSON. Use stream_structured() instead of stream_text().
from pydantic import BaseModel
from pydantic_ai import Agent
from typing import Optional
class MovieReview(BaseModel):
title: str
rating: int
summary: str
recommended: bool
agent = Agent('openai:gpt-4o', result_type=MovieReview)
async def stream_structured():
async with agent.run_stream(
'Review the movie Inception in 2 sentences.'
) as result:
async for partial in result.stream_structured(debounce_by=0.05):
# partial is a PartialResponse — fields fill in as JSON arrives
if hasattr(partial, 'title') and partial.title:
print(f'Title so far: {partial.title}')
# Final validated result
final: MovieReview = await result.get_data()
print(final.model_dump_json(indent=2))
Partial structured results are not fully validated — fields may be None or incomplete mid-stream. Only call result.get_data() for the final, fully validated object.FastAPI SSE endpoint
Server-Sent Events (SSE) is the standard way to stream LLM output to a browser. Here is a complete FastAPI endpoint.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic_ai import Agent
import asyncio
import json
app = FastAPI()
agent = Agent('openai:gpt-4o-mini')
@app.get('/stream')
async def stream_endpoint(query: str):
async def generate():
async with agent.run_stream(query) as result:
async for chunk in result.stream_text(delta=True):
# SSE format: 'data: ...\n\n'
yield f'data: {json.dumps({"chunk": chunk})}\n\n'
yield 'data: [DONE]\n\n'
return StreamingResponse(
generate(),
media_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no', # disable Nginx buffering
}
)
// Browser client
const es = new EventSource('/stream?query=Tell+me+a+story');
es.onmessage = (e) => {
if (e.data === '[DONE]') { es.close(); return; }
const { chunk } = JSON.parse(e.data);
document.getElementById('output').textContent += chunk;
};
Streaming with message history (multi-turn)
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessagesTypeAdapter
agent = Agent('openai:gpt-4o-mini')
history = []
async def chat_stream(user_message: str):
global history
async with agent.run_stream(
user_message,
message_history=history,
) as result:
async for chunk in result.stream_text(delta=True):
print(chunk, end='', flush=True)
print()
history = result.all_messages() # update history for next turn
Common issues
| Problem | Cause | Fix |
|---|---|---|
| No output until response completes | Not using run_stream() — using run() instead | Replace agent.run() with async with agent.run_stream() |
| Nginx buffers the stream | Proxy buffering enabled | Add X-Accel-Buffering: no response header |
| get_data() raises ValidationError | Calling get_data() before stream finishes | Always await get_data() after the async for loop |
| Tool output not visible in stream | Tools run silently between text chunks | Inspect result.all_messages() after streaming for tool call details |