Why Streaming Matters in Agent UIs

A LangGraph agent making three tool calls can easily take 15-30 seconds to complete. Without streaming, users stare at a spinner and have no idea if anything is happening. With streaming, they see the agent reasoning, watch tool calls fire, and read the final response as it generates — turning an anxious wait into an observable process.

LangGraph has strong streaming support, but the API surface is wide and the docs leave several real-world questions unanswered: which streaming method to use, how to filter out noise, and how to wire it into a web framework. This guide covers all three.

The Two Streaming Methods

Method What it yields Best for
astream() Full node state snapshots after each node completes Simple cases — see what changed after each step
astream_events() Granular events: tokens, tool calls, node starts/ends Production UIs — full control over what the user sees

Start with astream() to get something working, then switch to astream_events() when you need per-token streaming or fine-grained event filtering.

astream(): Node-by-Node State Updates

from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
import operator, asyncio
 
class State(TypedDict):
    messages: Annotated[list, operator.add]
 
llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
 
def agent_node(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
 
graph = StateGraph(State)
graph.add_node("agent", agent_node)
graph.set_entry_point("agent")
graph.add_edge("agent", END)
app = graph.compile()
 
async def stream_nodes():
    input_state = {"messages": [HumanMessage(content="What is LangGraph?")]}
 
    async for chunk in app.astream(input_state):
        # chunk is a dict: {node_name: state_update}
        for node_name, state_update in chunk.items():
            print(f"[{node_name}] state updated:")
            last_msg = state_update["messages"][-1]
            print(f"  {last_msg.content[:120]}...")
 
asyncio.run(stream_nodes())
 

astream() yields one chunk per node completion. You see the full state diff after each node — useful for debugging and for showing step-by-step progress, but you do not get individual tokens.

astream_events(): Full Granularity

astream_events() yields a stream of typed events. The event structure is consistent:

# Each event has this shape:
{
    "event": "on_chat_model_stream",  # event type
    "name": "ChatAnthropic",           # component name
    "run_id": "...",
    "tags": [...],
    "metadata": {...},
    "data": {"chunk": AIMessageChunk(...)}
}
 

The most useful event types for agent UIs:

Event type When it fires What to do with it
on_chat_model_stream Each new token from the LLM Append to the streaming message buffer
on_tool_start Before a tool call executes Show 'Calling tool: {name}...' in the UI
on_tool_end After a tool call returns Show tool result or hide loading indicator
on_chain_start Node begins executing Show current step indicator
on_chain_end Node finishes Mark step complete

Streaming Tokens to the User

import asyncio
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
 
class State(TypedDict):
    messages: Annotated[list, operator.add]
 
llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
 
def agent_node(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
 
graph = StateGraph(State)
graph.add_node("agent", agent_node)
graph.set_entry_point("agent")
graph.add_edge("agent", END)
app = graph.compile()
 
async def stream_tokens():
    input_state = {"messages": [HumanMessage(content="Explain embeddings briefly")]}
 
    async for event in app.astream_events(input_state, version="v2"):
        kind = event["event"]
 
        # Stream tokens as they arrive
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
 
    print()  # newline at end
 
asyncio.run(stream_tokens())
 
Always pass version="v2" to astream_events(). The v1 format is deprecated and will be removed in a future LangGraph release.

Streaming Tool Calls with Status Updates

from langchain_core.tools import tool
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
import asyncio
 
@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Results for '{query}': [simulated search result]"
 
@tool
def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"Weather in {city}: 18C, partly cloudy"
 
llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
agent = create_react_agent(llm, tools=[search_web, get_weather])
 
async def stream_with_tool_status():
    user_input = "What's the weather in London and what are the top AI news today?"
    input_state = {"messages": [{"role": "user", "content": user_input}]}
 
    async for event in agent.astream_events(input_state, version="v2"):
        kind = event["event"]
        name = event.get("name", "")
 
        if kind == "on_tool_start":
            tool_input = event["data"].get("input", {})
            print(f"\n[TOOL] Calling {name}({tool_input})...")
 
        elif kind == "on_tool_end":
            print(f"[TOOL] {name} complete")
 
        elif kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
 
asyncio.run(stream_with_tool_status())
 

FastAPI Server-Sent Events (SSE) Integration

This is the pattern for wiring LangGraph streaming into a web API that a frontend can consume:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json, asyncio
 
app = FastAPI()
 
class ChatRequest(BaseModel):
    message: str
    thread_id: str = "default"
 
async def event_generator(message: str, thread_id: str):
    input_state = {"messages": [{"role": "user", "content": message}]}
    config = {"configurable": {"thread_id": thread_id}}
 
    async for event in agent.astream_events(input_state, config=config, version="v2"):
        kind = event["event"]
 
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if chunk.content:
                data = json.dumps({"type": "token", "content": chunk.content})
                yield f"data: {data}\n\n"
 
        elif kind == "on_tool_start":
            data = json.dumps({"type": "tool_start", "name": event.get("name", "")})
            yield f"data: {data}\n\n"
 
        elif kind == "on_tool_end":
            data = json.dumps({"type": "tool_end", "name": event.get("name", "")})
            yield f"data: {data}\n\n"
 
    yield 'data: {"type": "done"}\n\n'
 
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        event_generator(request.message, request.thread_id),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )
 
The X-Accel-Buffering: no header is critical if you deploy behind Nginx. Without it, Nginx buffers the SSE stream and users see nothing until the buffer flushes.

Filtering Events to Specific Nodes

In complex graphs with multiple LLM nodes, you may want to stream tokens only from specific nodes — not all of them. Filter by tags:

# When building your graph, tag specific nodes
llm_with_tag = llm.with_config(tags=["stream_this"])
 
def agent_node(state):
    response = llm_with_tag.invoke(state["messages"])
    return {"messages": [response]}
 
# When streaming, filter by tag
async for event in app.astream_events(input_state, version="v2"):
    if event["event"] == "on_chat_model_stream":
        if "stream_this" in event.get("tags", []):
            chunk = event["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
 

Common Streaming Mistakes

  • Using stream() instead of astream() in async contexts — stream() is synchronous and will block your event loop.
  • Forgetting version="v2" in astream_events() — v1 format is different and deprecated.
  • Not handling chunk.content being a list — Anthropic models sometimes return content as a list of content blocks, not a plain string. Check type(chunk.content) and handle both.
  • Streaming inside a thread with a checkpointer but not passing config with thread_id — the checkpointer will not save state correctly.
  • Missing X-Accel-Buffering header behind Nginx — SSE will buffer and users see nothing in real time.