What Temporal solves, and how to structure AI pipelines as reliable Workflows and Activities
The problem Temporal solves
AI agent pipelines are long-running processes. A research pipeline might call an LLM 10 times, fetch 5 URLs, write to a database, and wait for a human approval — all over the course of minutes. If any step fails, or your server crashes, you lose everything and have to start over.
Temporal makes your workflows durable. Every step is persisted. If a step fails, it retries automatically. If the worker crashes, the workflow resumes from where it left off. You write normal Python code; Temporal handles the reliability.
Core concepts
| Concept | What it is | Analogy |
|---|---|---|
| Workflow | A durable, long-running function that orchestrates steps | The recipe — defines the steps and their order |
| Activity | A single step that does real work (API call, DB write, LLM call) | A single recipe step — can be retried independently |
| Worker | A process that runs your Workflow and Activity code | The cook — picks up tasks and executes them |
| Temporal Server | Tracks state, schedules tasks, handles retries | The restaurant manager — keeps track of every order |
| Task Queue | Named channel between server and workers | The ticket rail — orders go here, workers pick them up |
Why LLM calls should always be Activities
The key design rule: put every side-effectful or expensive operation inside an Activity, not directly in the Workflow function. This includes LLM API calls, web requests, database writes, and file operations.
- Activities can be retried independently — if a rate-limit error hits, only that activity retries
- Workflow code must be deterministic (same inputs → same outputs) — LLM calls are not deterministic
- Activity results are persisted — if the worker crashes mid-pipeline, completed activities do not re-run
Never call an LLM API, make HTTP requests, or write to a database directly inside a @workflow.defn function. These side effects will be replayed on worker restart, causing duplicate API calls and unexpected behaviour.Installing the Temporal Python SDK
pip install temporalio
# Start a local Temporal server for development:
# brew install temporal && temporal server start-dev
# Or: docker run -d temporalio/auto-setup:latest
A simple AI pipeline with Temporal
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.common import RetryPolicy
# ── Activity definitions ───────────────────────────────────────
@activity.defn
async def call_llm(prompt: str) -> str:
"""Call the LLM API. Retried automatically on failure."""
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.chat.completions.create(
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': prompt}]
)
return response.choices[0].message.content
@activity.defn
async def fetch_url(url: str) -> str:
"""Fetch web content. Retried on network errors."""
import httpx
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(url)
return r.text[:2000] # truncate for LLM context
@activity.defn
async def save_result(topic: str, result: str) -> None:
"""Persist result to database."""
print(f'Saving result for {topic}: {result[:100]}...')
# In production: write to PostgreSQL, S3, etc.
# ── Workflow definition ────────────────────────────────────────
@dataclass
class ResearchInput:
topic: str
source_url: str
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, input: ResearchInput) -> str:
# Step 1: Fetch source material
content = await workflow.execute_activity(
fetch_url,
input.source_url,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Step 2: Summarise with LLM
summary = await workflow.execute_activity(
call_llm,
f'Summarise this article about {input.topic} in 3 bullet points:\n{content}',
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(
maximum_attempts=5,
initial_interval=timedelta(seconds=2),
backoff_coefficient=2.0, # exponential backoff
),
)
# Step 3: Save result
await workflow.execute_activity(
save_result,
args=[input.topic, summary],
start_to_close_timeout=timedelta(seconds=10),
)
return summary
Running the worker and triggering a workflow
# ── Worker process (run this in a separate terminal / process) ──
async def run_worker():
client = await Client.connect('localhost:7233')
worker = Worker(
client,
task_queue='research-queue',
workflows=[ResearchWorkflow],
activities=[call_llm, fetch_url, save_result],
)
print('Worker started...')
await worker.run()
# asyncio.run(run_worker())
# ── Trigger a workflow (can be from any service) ────────────────
async def trigger_research(topic: str, url: str) -> str:
client = await Client.connect('localhost:7233')
handle = await client.start_workflow(
ResearchWorkflow.run,
ResearchInput(topic=topic, source_url=url),
id=f'research-{topic.lower().replace(" ", "-")}',
task_queue='research-queue',
)
result = await handle.result()
return result
# asyncio.run(trigger_research('quantum computing', 'https://example.com/article'))
Retry policies for LLM rate limits
LLM APIs return 429 errors when rate limited. Configure Activity retry policies to handle this gracefully.
from temporalio.common import RetryPolicy
from datetime import timedelta
LLM_RETRY_POLICY = RetryPolicy(
initial_interval=timedelta(seconds=5),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=2),
maximum_attempts=10,
non_retryable_error_types=['ValueError', 'PermissionError'],
# 429 RateLimitError will retry; authentication errors will not
)
Human-in-the-loop with Signals
Temporal Signals let you pause a workflow and wait for external input — perfect for human approval gates.
@workflow.defn
class ApprovalWorkflow:
def __init__(self):
self._approved: bool | None = None
@workflow.signal
def approve(self, decision: bool) -> None:
self._approved = decision
@workflow.run
async def run(self, draft_prompt: str) -> str:
draft = await workflow.execute_activity(
call_llm, draft_prompt,
start_to_close_timeout=timedelta(seconds=60)
)
# Pause until Signal arrives (or 24 hours)
await workflow.wait_condition(
lambda: self._approved is not None,
timeout=timedelta(hours=24)
)
if self._approved:
return f'PUBLISHED: {draft}'
return f'REJECTED: {draft}'
# Send approval from anywhere:
# handle = client.get_workflow_handle('workflow-id')
# await handle.signal(ApprovalWorkflow.approve, True)
Temporal Cloud vs self-hosted
| Aspect | Temporal Cloud | Self-hosted |
|---|---|---|
| Setup | Sign up + SDK | Deploy server + database |
| Scaling | Automatic | Manual cluster management |
| Price | Action-based billing | Infrastructure costs |
| Visibility UI | Included | Self-host Temporal UI |
| Best for | Teams wanting managed service | Data-residency or cost control at scale |