What Changes in Production

A CrewAI script that works perfectly in a Jupyter notebook will often fail in production in three predictable ways: rate limit errors from concurrent requests, untracked LLM costs that quietly burn through your budget, and long-running crews that block web server threads. This guide addresses all three.

Deployment Option 1: FastAPI + Background Tasks

The most common pattern — expose crew execution as an async API endpoint:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from crewai import Agent, Task, Crew
from langchain_anthropic import ChatAnthropic
import uuid, asyncio
 
app = FastAPI()
job_store: dict = {}  # In production: use Redis
 
class CrewRequest(BaseModel):
    topic: str
 
def run_research_crew(job_id: str, topic: str):
    """Runs synchronously in a background thread."""
    llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
    try:
        researcher = Agent(
            role="Researcher",
            goal=f"Research {topic} thoroughly",
            backstory="Expert researcher",
            llm=llm, verbose=False
        )
        task = Task(
            description=f"Research and summarise: {topic}",
            agent=researcher,
            expected_output="A concise research summary"
        )
        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        job_store[job_id] = {"status": "complete", "result": str(result)}
    except Exception as e:
        job_store[job_id] = {"status": "error", "error": str(e)}
 
@app.post("/research")
async def start_research(req: CrewRequest, background_tasks: BackgroundTasks):
    job_id = str(uuid.uuid4())
    job_store[job_id] = {"status": "running"}
    background_tasks.add_task(run_research_crew, job_id, req.topic)
    return {"job_id": job_id}
 
@app.get("/research/{job_id}")
async def get_result(job_id: str):
    return job_store.get(job_id, {"status": "not_found"})
 
CrewAI's kickoff() is synchronous. Always run it in a background thread (BackgroundTasks, ThreadPoolExecutor, or a task queue like Celery) — never await it directly in an async route handler.

Deployment Option 2: CrewAI+ Cloud Platform

CrewAI+ (paid) provides managed crew deployment. You deploy your crew definition and call it via API without managing infrastructure. This is the fastest path to production for teams that do not want to run their own servers.

For self-hosted deployments, the FastAPI pattern above (or a Celery worker queue) is the recommended approach.

LLM Cost Tracking

CrewAI exposes usage metrics after kickoff() completes:

crew = Crew(agents=[researcher, writer], tasks=[research_task, write_task])
result = crew.kickoff()
 
# Access token usage after completion
usage = crew.usage_metrics
print(f"Total tokens used: {usage}")
# CrewUsageMetrics(total_tokens=4821, prompt_tokens=3200, completion_tokens=1621,
#                  successful_requests=6)
 

For per-job cost estimation, map token counts to current pricing:

def estimate_cost(usage, model: str = "claude-haiku-4-5-20251001") -> float:
    """Rough cost estimate in USD. Update prices as providers change them."""
    pricing = {
        "claude-haiku-4-5-20251001":  {"input": 0.80, "output": 4.00},    # per 1M tokens
        "claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
        "gpt-4o-mini":       {"input": 0.15, "output": 0.60},
        "gpt-4o":            {"input": 2.50, "output": 10.00},
    }
    p = pricing.get(model, {"input": 0, "output": 0})
    input_cost  = (usage.prompt_tokens     / 1_000_000) * p["input"]
    output_cost = (usage.completion_tokens / 1_000_000) * p["output"]
    return round(input_cost + output_cost, 6)
 
cost = estimate_cost(crew.usage_metrics)
print(f"Estimated cost: ${cost:.4f}")
 

Handling Rate Limits

Rate limit errors (429 responses) are the most common production failure for crews with multiple agents making concurrent LLM calls. Three approaches in order of preference:

Option 1: Use a Model with Higher Limits

Claude Haiku and GPT-4o-mini have substantially higher rate limits than their premium counterparts. For background research tasks that do not require maximum reasoning quality, use the faster model to avoid rate limits entirely.

Option 2: Configure Backoff in CrewAI

from crewai import LLM
 
# CrewAI's LLM wrapper supports max_retries and request_timeout
llm = LLM(
    model="claude-haiku-4-5-20251001",
    max_tokens=2048,
    max_retries=3,       # retry on 429/500
    timeout=60,
)
 
agent = Agent(
    role="Researcher",
    goal="Research the topic",
    backstory="Expert researcher",
    llm=llm
)
 

Option 3: LiteLLM Proxy for Rate Limit Management

Route all CrewAI calls through a LiteLLM proxy that handles load balancing across multiple API keys or falls back to secondary models when a rate limit is hit. Set OPENAI_API_BASE or ANTHROPIC_BASE_URL to point at your proxy.

Limiting Agent Iterations

Without limits, a misbehaving agent can loop indefinitely and burn tokens. Set hard limits on every agent and crew:

agent = Agent(
    role="Researcher",
    goal="Research the topic",
    backstory="Expert researcher",
    llm=llm,
    max_iter=10,          # max reasoning iterations before forced stop
    max_execution_time=120,  # seconds — hard wall clock limit
)
 
crew = Crew(
    agents=[agent],
    tasks=[task],
    max_rpm=10,           # max LLM requests per minute across the whole crew
)
 
max_rpm throttles requests across all agents in the crew. Set it based on your API tier's RPM limit. If you have multiple concurrent crews running, divide the limit accordingly.

Silent Failures to Watch For

  • Agent returns generic output instead of completing the task — usually caused by max_iter being hit silently. Enable verbose=True during debugging to see the full reasoning chain.
  • Task output is None — the expected_output description is too vague. Make expected_output specific and verifiable: 'A JSON object with keys: summary, key_points, sources' instead of 'A summary'.
  • Crew hangs indefinitely — no timeout set and the LLM is waiting for a slow tool response. Always set max_execution_time on agents.
  • Memory growing across requests in a long-running API server — CrewAI agents accumulate conversation history in memory by default. Instantiate fresh Agent and Crew objects per request, not at module load time.

Structured Output from Crews

For production pipelines, get typed output rather than raw strings by using output_pydantic on tasks:

from pydantic import BaseModel
from typing import List
 
class ResearchOutput(BaseModel):
    summary: str
    key_findings: List[str]
    confidence: float
 
task = Task(
    description="Research AI agent memory systems",
    agent=researcher,
    expected_output="A research report with summary, findings, and confidence score",
    output_pydantic=ResearchOutput  # returns a typed object
)
 
result = crew.kickoff()
# result.pydantic is a ResearchOutput instance
print(result.pydantic.summary)
print(result.pydantic.key_findings)