Parallel Activities, Signals for approval, and monitoring via the Temporal UI

From single LLM calls to production pipelines

The previous article covered Temporal basics. This article goes deeper: parallel execution, child workflows for modular design, Signals for human review, and cost safety patterns. These are the patterns you need when your AI pipeline grows beyond a few sequential steps.

Parallel Activities with gather

Temporal natively supports parallel Activities — just start multiple activities without awaiting each one, then gather the results.

import asyncio
from temporalio import workflow, activity
from datetime import timedelta
 
@activity.defn
async def research_topic(topic: str) -> str:
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model='gpt-4o-mini')
    return (await llm.ainvoke(f'Research key facts about: {topic}')).content
 
@activity.defn
async def find_examples(topic: str) -> list[str]:
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model='gpt-4o-mini')
    result = (await llm.ainvoke(f'List 3 real-world examples of: {topic}')).content
    return result.split('\n')[:3]
 
@activity.defn
async def get_statistics(topic: str) -> str:
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model='gpt-4o-mini')
    return (await llm.ainvoke(f'Provide 3 key statistics about: {topic}')).content
 
@workflow.defn
class ParallelResearchWorkflow:
 
    @workflow.run
    async def run(self, topic: str) -> dict:
        timeout = timedelta(seconds=60)
 
        # Start all three activities in parallel
        research_handle = workflow.execute_activity(
            research_topic, topic, start_to_close_timeout=timeout
        )
        examples_handle = workflow.execute_activity(
            find_examples, topic, start_to_close_timeout=timeout
        )
        stats_handle = workflow.execute_activity(
            get_statistics, topic, start_to_close_timeout=timeout
        )
 
        # Gather results — runs in parallel, merges when all complete
        research, examples, stats = await asyncio.gather(
            research_handle, examples_handle, stats_handle
        )
 
        return {'research': research, 'examples': examples, 'statistics': stats}
 
Parallel Activities are one of Temporal's biggest advantages over simple async code. If your worker crashes mid-execution, already-completed activities do not re-run — Temporal replays from the last checkpoint.

Child workflows for modular pipelines

For complex pipelines, split into Child Workflows — each handles one domain and can be retried or scaled independently.

@workflow.defn
class ContentPipeline:
 
    @workflow.run
    async def run(self, topic: str) -> str:
        # Step 1: Run research as a child workflow
        research_data = await workflow.execute_child_workflow(
            ParallelResearchWorkflow.run,
            topic,
            id=f'research-{topic}',
            task_queue='research-queue',
        )
 
        # Step 2: Synthesise all research into an article
        context = f"""
Research: {research_data['research']}
Examples: {'; '.join(research_data['examples'])}
Statistics: {research_data['statistics']}
        """
        article = await workflow.execute_activity(
            call_llm,
            f'Write a 500-word article about {topic} using:\n{context}',
            start_to_close_timeout=timedelta(seconds=90),
        )
 
        return article
 

LangChain integration inside Activities

Any LangChain chain, agent, or retriever can run inside a Temporal Activity. The key is that the Activity is the durability boundary — not the LangChain call itself.

@activity.defn
async def rag_query(question: str) -> str:
    """Run a LangChain RAG query as a durable Activity."""
    from langchain_openai import ChatOpenAI, OpenAIEmbeddings
    from langchain_community.vectorstores import Chroma
    from langchain.chains import RetrievalQA
 
    llm = ChatOpenAI(model='gpt-4o')
    embeddings = OpenAIEmbeddings()
    vectorstore = Chroma(persist_directory='./chroma_db', embedding_function=embeddings)
 
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=vectorstore.as_retriever(search_kwargs={'k': 5}),
    )
    result = await qa_chain.ainvoke({'query': question})
    return result['result']
 

Cost safety: activity timeouts and budget guards

LLM pipelines can accumulate unexpected costs if activities loop or run longer than expected. Temporal's timeout system is your primary safety net.

from temporalio.common import RetryPolicy
from datetime import timedelta
 
# Three timeout types you should always set:
await workflow.execute_activity(
    call_llm,
    prompt,
    # How long a single attempt can take
    start_to_close_timeout=timedelta(seconds=60),
    # How long all retry attempts combined can take
    schedule_to_close_timeout=timedelta(minutes=10),
    retry_policy=RetryPolicy(
        maximum_attempts=3,
        # Fail fast on auth errors (not retryable)
        non_retryable_error_types=['AuthenticationError'],
    ),
)
 
# For the whole workflow:
# Set execution_timeout when starting the workflow
# handle = await client.start_workflow(
#     ContentPipeline.run, topic,
#     id='pipeline-1',
#     task_queue='main',
#     execution_timeout=timedelta(hours=1),  # hard cap
# )
 

Monitoring with the Temporal UI

Temporal's built-in UI (http://localhost:8233 for local dev) shows every workflow execution, its status, activity history, and any errors. Key things to monitor:

  • Workflow status: Running, Completed, Failed, Timed Out
  • Activity retry count — high retries indicate rate limit issues or flaky APIs
  • Workflow duration — longer than expected usually means an activity is stuck
  • Failed workflows — click through to see the exact activity that failed and its error
Give workflows meaningful IDs (e.g. 'research-{topic}-{date}') so you can find them easily in the UI. Random UUIDs make debugging much harder.

When to use Temporal vs simpler alternatives

Situation Use Temporal Use simpler alternative
Pipeline takes > 30 seconds Yes
Pipeline must survive server restarts Yes
Steps need independent retry policies Yes
Human approval required mid-pipeline Yes
Simple 3-step sequential pipeline Overkill asyncio or LangGraph
Single LLM call per request Overkill Direct API call
Workflow runs in under 5 seconds Overkill FastAPI endpoint