How to build complex, stateful AI pipelines using the Workflow API
Why Workflows exist
LlamaIndex's QueryEngine and AgentRunner are great for straight-line tasks — ask a question, get an answer. But real applications need branching, loops, human checkpoints, and parallel execution. Workflows were introduced to handle exactly these cases.
A Workflow is a graph of Steps. Each Step is an async function that consumes one type of Event and emits another. You define the event types and the connections between steps; LlamaIndex handles routing, concurrency, and state.
Core concepts
| Concept | What it is |
|---|---|
| Workflow | The top-level container — a class inheriting from Workflow |
| Step | An async method decorated with @step — runs when its input event arrives |
| Event | A Pydantic model that carries data between steps |
| StartEvent | The built-in event that kicks off a workflow (sent by workflow.run()) |
| StopEvent | The built-in event that ends the workflow and returns a result |
| Context | A per-run object for sharing state between steps without global variables |
A minimal RAG workflow
from llama_index.core.workflow import (
Workflow, StartEvent, StopEvent, step, Event, Context
)
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
# 1. Define custom events
class RetrieveEvent(Event):
query: str
nodes: list
# 2. Define the workflow
class RAGWorkflow(Workflow):
@step
async def retrieve(self, ctx: Context, ev: StartEvent) -> RetrieveEvent:
index = await ctx.get('index') # loaded in setup
retriever = index.as_retriever(similarity_top_k=5)
nodes = await retriever.aretrieve(ev.query)
return RetrieveEvent(query=ev.query, nodes=nodes)
@step
async def synthesise(self, ctx: Context, ev: RetrieveEvent) -> StopEvent:
llm = OpenAI(model='gpt-4o-mini')
context_str = '\n'.join(n.get_content() for n in ev.nodes)
prompt = f'Context:\n{context_str}\n\nQuestion: {ev.query}'
response = await llm.acomplete(prompt)
return StopEvent(result=str(response))
# 3. Run
async def main():
docs = SimpleDirectoryReader('data').load_data()
index = VectorStoreIndex.from_documents(docs)
wf = RAGWorkflow(timeout=30)
await wf.ctx.set('index', index) # share state via Context
result = await wf.run(query='What are the key findings?')
print(result)
Using Context to share state between steps
Context is the per-run scratchpad. Use ctx.set() and ctx.get() to pass data between steps without needing to thread it through event payloads.
# In any step:
await ctx.set('user_id', 'user-123')
# In a later step:
user_id = await ctx.get('user_id', default=None)
Use Context for data that needs to be read by multiple steps (e.g., config, loaded index, user metadata). Use Event payloads for data that flows directly from one step to the next.Parallel steps with collect_events
When you need multiple steps to run in parallel and then merge their results, emit multiple events from one step and use collect_events() to wait for all of them.
from llama_index.core.workflow import collect_events
class ParallelSearchEvent(Event):
source: str
results: list
class MergeEvent(Event):
all_results: list
class ResearchWorkflow(Workflow):
@step
async def fan_out(self, ctx: Context, ev: StartEvent) -> ParallelSearchEvent:
# Emit two events — both downstream steps run in parallel
ctx.write_event_to_stream(ParallelSearchEvent(source='web', results=[]))
return ParallelSearchEvent(source='database', results=[])
@step
async def search_web(self, ctx: Context, ev: ParallelSearchEvent) -> ParallelSearchEvent:
if ev.source != 'web': return None # only handle web events
results = await do_web_search()
return ParallelSearchEvent(source='web', results=results)
@step
async def merge(self, ctx: Context,
ev: ParallelSearchEvent) -> StopEvent:
# Collect both parallel events before proceeding
events = ctx.collect_events(ev, [ParallelSearchEvent, ParallelSearchEvent])
if events is None:
return None # not all events arrived yet
combined = [r for e in events for r in e.results]
return StopEvent(result=combined)
Human-in-the-loop with InputRequiredEvent
Workflows support pausing for human input using the built-in HumanResponseEvent and InputRequiredEvent pair.
from llama_index.core.workflow import (
InputRequiredEvent, HumanResponseEvent
)
class ApprovalWorkflow(Workflow):
@step
async def generate_draft(self, ctx: Context, ev: StartEvent) -> InputRequiredEvent:
draft = await generate_with_llm(ev.topic)
# Pause and ask the human
return InputRequiredEvent(
prefix='Please review this draft and reply APPROVE or REVISE:',
payload=draft
)
@step
async def handle_review(self, ctx: Context, ev: HumanResponseEvent) -> StopEvent:
if ev.response.strip().upper() == 'APPROVE':
return StopEvent(result='Draft approved.')
else:
return StopEvent(result='Draft sent for revision.')
# Running with human input:
async def run_with_human():
wf = ApprovalWorkflow(timeout=300)
handler = wf.run(topic='AI regulation')
async for ev in handler.stream_events():
if isinstance(ev, InputRequiredEvent):
human_input = input(f'{ev.prefix}\n{ev.payload}\n> ')
handler.ctx.send_event(HumanResponseEvent(response=human_input))
result = await handler
print(result)
Streaming workflow events
# Stream all events as they occur
async def stream_workflow():
wf = RAGWorkflow()
handler = wf.run(query='What is quantum computing?')
async for event in handler.stream_events():
print(f'Event: {type(event).__name__} — {event}')
final = await handler
print(f'Final: {final}')
Workflows vs QueryEngine vs AgentRunner
| Use case | Best fit |
|---|---|
| Simple RAG Q&A | QueryEngine |
| Conversational RAG with memory | ChatEngine |
| Open-ended tool use | AgentRunner (ReAct) |
| Multi-step pipeline with branching | Workflow |
| Human approval gates | Workflow |
| Parallel data gathering + synthesis | Workflow |
| Long-running pipelines with checkpoints | Workflow |
Start with QueryEngine or AgentRunner. Move to Workflow when you find yourself fighting against the abstraction — needing custom branching, parallel steps, or pause/resume logic.