How to build complex, stateful AI pipelines using the Workflow API

Why Workflows exist

LlamaIndex's QueryEngine and AgentRunner are great for straight-line tasks — ask a question, get an answer. But real applications need branching, loops, human checkpoints, and parallel execution. Workflows were introduced to handle exactly these cases.

A Workflow is a graph of Steps. Each Step is an async function that consumes one type of Event and emits another. You define the event types and the connections between steps; LlamaIndex handles routing, concurrency, and state.

Core concepts

Concept What it is
Workflow The top-level container — a class inheriting from Workflow
Step An async method decorated with @step — runs when its input event arrives
Event A Pydantic model that carries data between steps
StartEvent The built-in event that kicks off a workflow (sent by workflow.run())
StopEvent The built-in event that ends the workflow and returns a result
Context A per-run object for sharing state between steps without global variables

A minimal RAG workflow

from llama_index.core.workflow import (
    Workflow, StartEvent, StopEvent, step, Event, Context
)
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
 
# 1. Define custom events
class RetrieveEvent(Event):
    query: str
    nodes: list
 
# 2. Define the workflow
class RAGWorkflow(Workflow):
 
    @step
    async def retrieve(self, ctx: Context, ev: StartEvent) -> RetrieveEvent:
        index = await ctx.get('index')  # loaded in setup
        retriever = index.as_retriever(similarity_top_k=5)
        nodes = await retriever.aretrieve(ev.query)
        return RetrieveEvent(query=ev.query, nodes=nodes)
 
    @step
    async def synthesise(self, ctx: Context, ev: RetrieveEvent) -> StopEvent:
        llm = OpenAI(model='gpt-4o-mini')
        context_str = '\n'.join(n.get_content() for n in ev.nodes)
        prompt = f'Context:\n{context_str}\n\nQuestion: {ev.query}'
        response = await llm.acomplete(prompt)
        return StopEvent(result=str(response))
 
# 3. Run
async def main():
    docs = SimpleDirectoryReader('data').load_data()
    index = VectorStoreIndex.from_documents(docs)
 
    wf = RAGWorkflow(timeout=30)
    await wf.ctx.set('index', index)  # share state via Context
 
    result = await wf.run(query='What are the key findings?')
    print(result)
 

Using Context to share state between steps

Context is the per-run scratchpad. Use ctx.set() and ctx.get() to pass data between steps without needing to thread it through event payloads.

# In any step:
await ctx.set('user_id', 'user-123')
 
# In a later step:
user_id = await ctx.get('user_id', default=None)
 
Use Context for data that needs to be read by multiple steps (e.g., config, loaded index, user metadata). Use Event payloads for data that flows directly from one step to the next.

Parallel steps with collect_events

When you need multiple steps to run in parallel and then merge their results, emit multiple events from one step and use collect_events() to wait for all of them.

from llama_index.core.workflow import collect_events
 
class ParallelSearchEvent(Event):
    source: str
    results: list
 
class MergeEvent(Event):
    all_results: list
 
class ResearchWorkflow(Workflow):
 
    @step
    async def fan_out(self, ctx: Context, ev: StartEvent) -> ParallelSearchEvent:
        # Emit two events — both downstream steps run in parallel
        ctx.write_event_to_stream(ParallelSearchEvent(source='web', results=[]))
        return ParallelSearchEvent(source='database', results=[])
 
    @step
    async def search_web(self, ctx: Context, ev: ParallelSearchEvent) -> ParallelSearchEvent:
        if ev.source != 'web': return None  # only handle web events
        results = await do_web_search()
        return ParallelSearchEvent(source='web', results=results)
 
    @step
    async def merge(self, ctx: Context,
                    ev: ParallelSearchEvent) -> StopEvent:
        # Collect both parallel events before proceeding
        events = ctx.collect_events(ev, [ParallelSearchEvent, ParallelSearchEvent])
        if events is None:
            return None  # not all events arrived yet
        combined = [r for e in events for r in e.results]
        return StopEvent(result=combined)
 

Human-in-the-loop with InputRequiredEvent

Workflows support pausing for human input using the built-in HumanResponseEvent and InputRequiredEvent pair.

from llama_index.core.workflow import (
    InputRequiredEvent, HumanResponseEvent
)
 
class ApprovalWorkflow(Workflow):
 
    @step
    async def generate_draft(self, ctx: Context, ev: StartEvent) -> InputRequiredEvent:
        draft = await generate_with_llm(ev.topic)
        # Pause and ask the human
        return InputRequiredEvent(
            prefix='Please review this draft and reply APPROVE or REVISE:',
            payload=draft
        )
 
    @step
    async def handle_review(self, ctx: Context, ev: HumanResponseEvent) -> StopEvent:
        if ev.response.strip().upper() == 'APPROVE':
            return StopEvent(result='Draft approved.')
        else:
            return StopEvent(result='Draft sent for revision.')
 
# Running with human input:
async def run_with_human():
    wf = ApprovalWorkflow(timeout=300)
    handler = wf.run(topic='AI regulation')
 
    async for ev in handler.stream_events():
        if isinstance(ev, InputRequiredEvent):
            human_input = input(f'{ev.prefix}\n{ev.payload}\n> ')
            handler.ctx.send_event(HumanResponseEvent(response=human_input))
 
    result = await handler
    print(result)
 

Streaming workflow events

# Stream all events as they occur
async def stream_workflow():
    wf = RAGWorkflow()
    handler = wf.run(query='What is quantum computing?')
 
    async for event in handler.stream_events():
        print(f'Event: {type(event).__name__} — {event}')
 
    final = await handler
    print(f'Final: {final}')
 

Workflows vs QueryEngine vs AgentRunner

Use case Best fit
Simple RAG Q&A QueryEngine
Conversational RAG with memory ChatEngine
Open-ended tool use AgentRunner (ReAct)
Multi-step pipeline with branching Workflow
Human approval gates Workflow
Parallel data gathering + synthesis Workflow
Long-running pipelines with checkpoints Workflow
Start with QueryEngine or AgentRunner. Move to Workflow when you find yourself fighting against the abstraction — needing custom branching, parallel steps, or pause/resume logic.