Parallel Activities, Signals for approval, and monitoring via the Temporal UI
From single LLM calls to production pipelines
The previous article covered Temporal basics. This article goes deeper: parallel execution, child workflows for modular design, Signals for human review, and cost safety patterns. These are the patterns you need when your AI pipeline grows beyond a few sequential steps.
Parallel Activities with gather
Temporal natively supports parallel Activities — just start multiple activities without awaiting each one, then gather the results.
import asyncio
from temporalio import workflow, activity
from datetime import timedelta
@activity.defn
async def research_topic(topic: str) -> str:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model='gpt-4o-mini')
return (await llm.ainvoke(f'Research key facts about: {topic}')).content
@activity.defn
async def find_examples(topic: str) -> list[str]:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model='gpt-4o-mini')
result = (await llm.ainvoke(f'List 3 real-world examples of: {topic}')).content
return result.split('\n')[:3]
@activity.defn
async def get_statistics(topic: str) -> str:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model='gpt-4o-mini')
return (await llm.ainvoke(f'Provide 3 key statistics about: {topic}')).content
@workflow.defn
class ParallelResearchWorkflow:
@workflow.run
async def run(self, topic: str) -> dict:
timeout = timedelta(seconds=60)
# Start all three activities in parallel
research_handle = workflow.execute_activity(
research_topic, topic, start_to_close_timeout=timeout
)
examples_handle = workflow.execute_activity(
find_examples, topic, start_to_close_timeout=timeout
)
stats_handle = workflow.execute_activity(
get_statistics, topic, start_to_close_timeout=timeout
)
# Gather results — runs in parallel, merges when all complete
research, examples, stats = await asyncio.gather(
research_handle, examples_handle, stats_handle
)
return {'research': research, 'examples': examples, 'statistics': stats}
Parallel Activities are one of Temporal's biggest advantages over simple async code. If your worker crashes mid-execution, already-completed activities do not re-run — Temporal replays from the last checkpoint.Child workflows for modular pipelines
For complex pipelines, split into Child Workflows — each handles one domain and can be retried or scaled independently.
@workflow.defn
class ContentPipeline:
@workflow.run
async def run(self, topic: str) -> str:
# Step 1: Run research as a child workflow
research_data = await workflow.execute_child_workflow(
ParallelResearchWorkflow.run,
topic,
id=f'research-{topic}',
task_queue='research-queue',
)
# Step 2: Synthesise all research into an article
context = f"""
Research: {research_data['research']}
Examples: {'; '.join(research_data['examples'])}
Statistics: {research_data['statistics']}
"""
article = await workflow.execute_activity(
call_llm,
f'Write a 500-word article about {topic} using:\n{context}',
start_to_close_timeout=timedelta(seconds=90),
)
return article
LangChain integration inside Activities
Any LangChain chain, agent, or retriever can run inside a Temporal Activity. The key is that the Activity is the durability boundary — not the LangChain call itself.
@activity.defn
async def rag_query(question: str) -> str:
"""Run a LangChain RAG query as a durable Activity."""
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
llm = ChatOpenAI(model='gpt-4o')
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(persist_directory='./chroma_db', embedding_function=embeddings)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=vectorstore.as_retriever(search_kwargs={'k': 5}),
)
result = await qa_chain.ainvoke({'query': question})
return result['result']
Cost safety: activity timeouts and budget guards
LLM pipelines can accumulate unexpected costs if activities loop or run longer than expected. Temporal's timeout system is your primary safety net.
from temporalio.common import RetryPolicy
from datetime import timedelta
# Three timeout types you should always set:
await workflow.execute_activity(
call_llm,
prompt,
# How long a single attempt can take
start_to_close_timeout=timedelta(seconds=60),
# How long all retry attempts combined can take
schedule_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
maximum_attempts=3,
# Fail fast on auth errors (not retryable)
non_retryable_error_types=['AuthenticationError'],
),
)
# For the whole workflow:
# Set execution_timeout when starting the workflow
# handle = await client.start_workflow(
# ContentPipeline.run, topic,
# id='pipeline-1',
# task_queue='main',
# execution_timeout=timedelta(hours=1), # hard cap
# )
Monitoring with the Temporal UI
Temporal's built-in UI (http://localhost:8233 for local dev) shows every workflow execution, its status, activity history, and any errors. Key things to monitor:
- Workflow status: Running, Completed, Failed, Timed Out
- Activity retry count — high retries indicate rate limit issues or flaky APIs
- Workflow duration — longer than expected usually means an activity is stuck
- Failed workflows — click through to see the exact activity that failed and its error
Give workflows meaningful IDs (e.g. 'research-{topic}-{date}') so you can find them easily in the UI. Random UUIDs make debugging much harder.When to use Temporal vs simpler alternatives
| Situation | Use Temporal | Use simpler alternative |
|---|---|---|
| Pipeline takes > 30 seconds | Yes | — |
| Pipeline must survive server restarts | Yes | — |
| Steps need independent retry policies | Yes | — |
| Human approval required mid-pipeline | Yes | — |
| Simple 3-step sequential pipeline | Overkill | asyncio or LangGraph |
| Single LLM call per request | Overkill | Direct API call |
| Workflow runs in under 5 seconds | Overkill | FastAPI endpoint |