What Temporal solves, and how to structure AI pipelines as reliable Workflows and Activities

The problem Temporal solves

AI agent pipelines are long-running processes. A research pipeline might call an LLM 10 times, fetch 5 URLs, write to a database, and wait for a human approval — all over the course of minutes. If any step fails, or your server crashes, you lose everything and have to start over.

Temporal makes your workflows durable. Every step is persisted. If a step fails, it retries automatically. If the worker crashes, the workflow resumes from where it left off. You write normal Python code; Temporal handles the reliability.

Core concepts

Concept What it is Analogy
Workflow A durable, long-running function that orchestrates steps The recipe — defines the steps and their order
Activity A single step that does real work (API call, DB write, LLM call) A single recipe step — can be retried independently
Worker A process that runs your Workflow and Activity code The cook — picks up tasks and executes them
Temporal Server Tracks state, schedules tasks, handles retries The restaurant manager — keeps track of every order
Task Queue Named channel between server and workers The ticket rail — orders go here, workers pick them up

Why LLM calls should always be Activities

The key design rule: put every side-effectful or expensive operation inside an Activity, not directly in the Workflow function. This includes LLM API calls, web requests, database writes, and file operations.

  • Activities can be retried independently — if a rate-limit error hits, only that activity retries
  • Workflow code must be deterministic (same inputs → same outputs) — LLM calls are not deterministic
  • Activity results are persisted — if the worker crashes mid-pipeline, completed activities do not re-run
Never call an LLM API, make HTTP requests, or write to a database directly inside a @workflow.defn function. These side effects will be replayed on worker restart, causing duplicate API calls and unexpected behaviour.

Installing the Temporal Python SDK

pip install temporalio
# Start a local Temporal server for development:
# brew install temporal && temporal server start-dev
# Or: docker run -d temporalio/auto-setup:latest
 

A simple AI pipeline with Temporal

import asyncio
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.common import RetryPolicy
 
# ── Activity definitions ───────────────────────────────────────
 
@activity.defn
async def call_llm(prompt: str) -> str:
    """Call the LLM API. Retried automatically on failure."""
    from openai import AsyncOpenAI
    client = AsyncOpenAI()
    response = await client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[{'role': 'user', 'content': prompt}]
    )
    return response.choices[0].message.content
 
@activity.defn
async def fetch_url(url: str) -> str:
    """Fetch web content. Retried on network errors."""
    import httpx
    async with httpx.AsyncClient(timeout=30) as client:
        r = await client.get(url)
        return r.text[:2000]  # truncate for LLM context
 
@activity.defn
async def save_result(topic: str, result: str) -> None:
    """Persist result to database."""
    print(f'Saving result for {topic}: {result[:100]}...')
    # In production: write to PostgreSQL, S3, etc.
 
# ── Workflow definition ────────────────────────────────────────
 
@dataclass
class ResearchInput:
    topic: str
    source_url: str
 
@workflow.defn
class ResearchWorkflow:
 
    @workflow.run
    async def run(self, input: ResearchInput) -> str:
        # Step 1: Fetch source material
        content = await workflow.execute_activity(
            fetch_url,
            input.source_url,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3),
        )
 
        # Step 2: Summarise with LLM
        summary = await workflow.execute_activity(
            call_llm,
            f'Summarise this article about {input.topic} in 3 bullet points:\n{content}',
            start_to_close_timeout=timedelta(seconds=60),
            retry_policy=RetryPolicy(
                maximum_attempts=5,
                initial_interval=timedelta(seconds=2),
                backoff_coefficient=2.0,  # exponential backoff
            ),
        )
 
        # Step 3: Save result
        await workflow.execute_activity(
            save_result,
            args=[input.topic, summary],
            start_to_close_timeout=timedelta(seconds=10),
        )
 
        return summary
 

Running the worker and triggering a workflow

# ── Worker process (run this in a separate terminal / process) ──
 
async def run_worker():
    client = await Client.connect('localhost:7233')
    worker = Worker(
        client,
        task_queue='research-queue',
        workflows=[ResearchWorkflow],
        activities=[call_llm, fetch_url, save_result],
    )
    print('Worker started...')
    await worker.run()
 
# asyncio.run(run_worker())
 
# ── Trigger a workflow (can be from any service) ────────────────
 
async def trigger_research(topic: str, url: str) -> str:
    client = await Client.connect('localhost:7233')
    handle = await client.start_workflow(
        ResearchWorkflow.run,
        ResearchInput(topic=topic, source_url=url),
        id=f'research-{topic.lower().replace(" ", "-")}',
        task_queue='research-queue',
    )
    result = await handle.result()
    return result
 
# asyncio.run(trigger_research('quantum computing', 'https://example.com/article'))
 

Retry policies for LLM rate limits

LLM APIs return 429 errors when rate limited. Configure Activity retry policies to handle this gracefully.

from temporalio.common import RetryPolicy
from datetime import timedelta
 
LLM_RETRY_POLICY = RetryPolicy(
    initial_interval=timedelta(seconds=5),
    backoff_coefficient=2.0,
    maximum_interval=timedelta(minutes=2),
    maximum_attempts=10,
    non_retryable_error_types=['ValueError', 'PermissionError'],
    # 429 RateLimitError will retry; authentication errors will not
)
 

Human-in-the-loop with Signals

Temporal Signals let you pause a workflow and wait for external input — perfect for human approval gates.

@workflow.defn
class ApprovalWorkflow:
    def __init__(self):
        self._approved: bool | None = None
 
    @workflow.signal
    def approve(self, decision: bool) -> None:
        self._approved = decision
 
    @workflow.run
    async def run(self, draft_prompt: str) -> str:
        draft = await workflow.execute_activity(
            call_llm, draft_prompt,
            start_to_close_timeout=timedelta(seconds=60)
        )
 
        # Pause until Signal arrives (or 24 hours)
        await workflow.wait_condition(
            lambda: self._approved is not None,
            timeout=timedelta(hours=24)
        )
 
        if self._approved:
            return f'PUBLISHED: {draft}'
        return f'REJECTED: {draft}'
 
# Send approval from anywhere:
# handle = client.get_workflow_handle('workflow-id')
# await handle.signal(ApprovalWorkflow.approve, True)
 

Temporal Cloud vs self-hosted

Aspect Temporal Cloud Self-hosted
Setup Sign up + SDK Deploy server + database
Scaling Automatic Manual cluster management
Price Action-based billing Infrastructure costs
Visibility UI Included Self-host Temporal UI
Best for Teams wanting managed service Data-residency or cost control at scale