After you've written your first Inngest function, the real power becomes available: orchestrating parallel work across many function instances (fan-out), running jobs on a schedule, and pausing a function mid-execution to wait for an external event — like a user approval or a webhook callback. These patterns are the building blocks of complex AI pipelines.
Pattern 1: Fan-Out with step.invoke()
Fan-out runs multiple function instances in parallel — each with full durable execution, retry logic, and checkpointing. Unlike `Promise.all()` inside a single step (which retries the whole batch), `step.invoke()` creates independent function runs.
// Orchestrator function export const processDocumentBatch = inngest.createFunction( { id: "process-document-batch" }, { event: "documents/batch-uploaded" }, async ({ event, step }) => { const { documentIds } = event.data; // Fan out: one processDocument run per document ID const results = await Promise.all( documentIds.map((docId) => step.invoke(`process-doc-${docId}`, { function: processDocument, // reference to another Inngest function data: { documentId: docId }, }) ) ); const successCount = results.filter((r) => r.success).length; return { total: documentIds.length, succeeded: successCount }; } ); // Worker function (runs independently for each document) export const processDocument = inngest.createFunction( { id: "process-document", retries: 5, concurrency: { limit: 10 }, // max 10 parallel document jobs globally }, { event: "documents/process-single" }, async ({ event, step }) => { const { documentId } = event.data; const text = await step.run("extract-text", () => extractText(documentId)); const chunks = await step.run("chunk-text", () => chunkText(text)); const embeddings = await step.run("embed", () => embedChunks(chunks)); await step.run("store", () => storeVectors(documentId, embeddings)); return { success: true, chunkCount: chunks.length }; } );
Pattern 2: Scheduled Functions
Cron-triggered functions run on a schedule without any external cron service. The schedule is defined in the function itself.
export const dailyReportDigest = inngest.createFunction( { id: "daily-report-digest" }, { cron: "0 9 * * MON-FRI" }, // 9am UTC, weekdays async ({ step }) => { // Step 1: Gather yesterday's activity const activity = await step.run("gather-activity", async () => { const yesterday = new Date(); yesterday.setDate(yesterday.getDate() - 1); return await getActivitySummary(yesterday); }); // Step 2: Generate AI digest const digest = await step.run("generate-digest", async () => { const response = await openai.chat.completions.create({ model: "gpt-4o-mini", messages: [ { role: "system", content: "Write a concise daily digest email." }, { role: "user", content: JSON.stringify(activity) }, ], }); return response.choices[0].message.content!; }); // Step 3: Send to all subscribers const subscribers = await step.run("get-subscribers", () => getActiveSubscribers()); await Promise.all( subscribers.map((sub) => step.invoke(`send-digest-${sub.id}`, { function: sendEmail, data: { to: sub.email, subject: "Daily Digest", body: digest }, }) ) ); return { sentTo: subscribers.length }; } );
Pattern 3: waitForEvent — Human-in-the-Loop
`step.waitForEvent()` pauses a function and resumes it when a matching event arrives. This is ideal for approval workflows, payment confirmations, or any flow that requires an external signal.
export const contentPublishWorkflow = inngest.createFunction( { id: "content-publish-workflow" }, { event: "content/submitted" }, async ({ event, step }) => { const { contentId, authorId } = event.data; // Step 1: AI moderation check const moderationResult = await step.run("moderate-content", async () => { const content = await getContent(contentId); return await moderateWithAI(content); }); if (moderationResult.requiresHumanReview) { // Notify reviewer await step.run("notify-reviewer", () => sendSlackMessage(`Content ${contentId} needs review: ${moderationResult.reason}`) ); // Pause and wait for a human approval event (timeout after 72 hours) const approval = await step.waitForEvent("wait-for-approval", { event: "content/reviewed", match: "data.contentId", // only match events with the same contentId timeout: "72h", }); if (!approval || approval.data.decision !== "approved") { await step.run("reject-content", () => updateContentStatus(contentId, "rejected") ); return { status: "rejected" }; } } // Step 2: Publish await step.run("publish-content", () => publishContent(contentId)); // Step 3: Notify author await step.run("notify-author", () => sendEmail(authorId, "Your content has been published!") ); return { status: "published" }; } ); // Trigger the approval event from your review UI await inngest.send({ name: "content/reviewed", data: { contentId: "abc123", decision: "approved", reviewerId: "user_xyz" }, });
The `match` field in `waitForEvent()` accepts dot-notation paths into the event payload. Use this to ensure each waiting function only resumes when its specific matching event arrives — not any event with the same name.Pattern 4: Rate Limiting and Throttling OpenAI Calls
export const generateContent = inngest.createFunction( { id: "generate-content", // Throttle: max 60 executions per minute across all runs of this function throttle: { count: 60, period: "1m", key: "event.data.tier", // separate limits per user tier }, // Debounce: if the same userId triggers multiple times within 5s, // only run once with the latest event debounce: { key: "event.data.userId", period: "5s", }, }, { event: "content/generate-requested" }, async ({ event, step }) => { // ... your generation logic } );
Concurrency Control
// Limit concurrent runs per user — prevent one user from monopolizing capacity export const userAIJob = inngest.createFunction( { id: "user-ai-job", concurrency: { limit: 2, // max 2 concurrent runs of this function per key key: "event.data.userId", // scoped per user }, }, { event: "ai/job-requested" }, async ({ event, step }) => { /* ... */ } );