
Step-Based State Machines in BullMQ Jobs for Resilient Workflows
Build production-ready resumable workflows in BullMQ using step-based state machines. Learn how to handle long-running jobs that gracefully recover from failures.
Step-Based State Machines in BullMQ Jobs for Resilient Workflows
Your BullMQ job is halfway through a complex, multi-stage data processing pipeline—generating reports, calling APIs, transforming data—when suddenly, an API timeout kills it. Now you have to start from scratch. Sound familiar?
Long-running jobs that fail mid-execution are one of the most frustrating challenges in distributed systems. Retrying from the beginning wastes resources, time, and money. But what if your jobs could remember where they left off and resume from the exact point of failure?
In this guide, we'll explore how to implement step-based state machines in BullMQ jobs to build resumable, resilient workflows that gracefully handle failures and retries without repeating work.
The Problem: All-or-Nothing Job Execution
Traditional BullMQ jobs follow an all-or-nothing pattern:
async function processJob(job: Job) {
// Step 1: Fetch data from external API
const data = await fetchDataFromAPI(job.data.id);
// Step 2: Transform data
const transformed = await transformData(data);
// Step 3: Save to database
await saveToDatabase(transformed);
// Step 4: Send notifications
await sendNotifications(job.data.id);
return { success: true };
}
If Step 3 fails after Step 1 and Step 2 succeed, BullMQ retries the entire job — repeating the expensive API call and transformation work unnecessarily.
The core issues:
- No state persistence: The job doesn't remember which steps completed
- Wasted resources: Successful steps repeat on retry
- Inconsistent state: Partial work might leave systems in an inconsistent state
- Poor observability: Hard to know exactly where failures occurred
The Solution: Step-Based State Machines
A step-based state machine uses a step field in job data to track progress through a multi-stage workflow. Each step:
- Checks current state: "Where did we leave off?"
- Performs its work: Executes only the necessary logic
- Updates state: Marks completion and advances to the next step
- Handles failures gracefully: Retries resume from the failed step
This pattern leverages BullMQ's built-in features:
- Job data persistence:
job.datasurvives retries - WaitingChildrenError: Pause parent jobs while children execute
- Child job dependencies: Coordinate multi-stage workflows
- Retry mechanisms: Built-in exponential backoff
Reference the BullMQ job data documentation for details on job persistence and the WaitingChildrenError pattern for parent-child coordination.
Implementing the Step Pattern
Basic Step Structure
Here's a minimal step-based job processor:
import { Worker, Job } from 'bullmq';
interface JobData {
userId: string;
orderId: string;
step?: 'validate' | 'process' | 'notify' | 'complete';
}
interface JobResult {
success: boolean;
step: string;
data?: any;
}
async function processOrderJob(job: Job<JobData>): Promise<JobResult> {
// Get current step (defaults to first step)
const step = job.data.step || 'validate';
try {
// Execute logic based on current step
switch (step) {
case 'validate':
await validateOrder(job);
// Advance to next step
await job.updateData({ ...job.data, step: 'process' });
return processOrderJob(job); // Continue immediately
case 'process':
await processPayment(job);
await job.updateData({ ...job.data, step: 'notify' });
return processOrderJob(job);
case 'notify':
await sendConfirmationEmail(job);
await job.updateData({ ...job.data, step: 'complete' });
return processOrderJob(job);
case 'complete':
return {
success: true,
step: 'complete',
data: { orderId: job.data.orderId },
};
default:
throw new Error(`Unknown step: ${step}`);
}
} catch (error) {
// Error bubbles up, BullMQ retries from current step
throw error;
}
}
// Helper functions
async function validateOrder(job: Job<JobData>) {
await job.updateProgress(10);
// Validate order data, check inventory, etc.
const isValid = await checkInventory(job.data.orderId);
if (!isValid) {
throw new Error('Order validation failed');
}
}
async function processPayment(job: Job<JobData>) {
await job.updateProgress(50);
// Process payment with external API
await callPaymentAPI(job.data.orderId);
}
async function sendConfirmationEmail(job: Job<JobData>) {
await job.updateProgress(90);
// Send email notification
await emailService.send(job.data.userId, 'Order confirmed');
}
Key benefits of this pattern:
- Automatic resume: If
processPaymentfails, retry starts atprocessstep - Progress tracking: Each step reports progress independently
- State visibility: Job data shows exactly which step failed
- Idempotency support: Steps can check completion state before executing
Managing Child Jobs with WaitingChildrenError
For complex workflows with parallel operations, use child jobs and WaitingChildrenError to pause execution:
import { Worker, Job, WaitingChildrenError } from 'bullmq';
import { reportQueue, analysisQueue, notificationQueue } from './queues';
interface ReportJobData {
reportId: string;
dataSourceIds: string[];
step?: 'fetch' | 'wait-analysis' | 'aggregate' | 'complete';
}
async function processReportJob(job: Job<ReportJobData>, token?: string): Promise<any> {
const step = job.data.step || 'fetch';
try {
// ================================================
// STEP 1: Fetch Data Sources
// ================================================
if (step === 'fetch') {
await job.updateProgress(10);
// Fetch data from multiple sources
const dataResults = await Promise.all(job.data.dataSourceIds.map((id) => fetchDataSource(id)));
// Store fetched data in job data for next step
await job.updateData({
...job.data,
fetchedData: dataResults,
step: 'wait-analysis',
});
// Spawn child jobs for parallel analysis
const childJobPromises = dataResults.map((data, index) =>
analysisQueue.add(
`analysis-${index}`,
{ data, reportId: job.data.reportId },
{
parent: {
id: job.id!,
queue: job.queueQualifiedName,
},
},
),
);
await Promise.all(childJobPromises);
// Pause this job until children complete
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
throw new WaitingChildrenError();
}
}
// ================================================
// STEP 2: Aggregate Analysis Results
// ================================================
if (step === 'wait-analysis') {
await job.updateProgress(60);
// Collect results from child jobs
const childrenResults = await job.getChildrenValues();
const analysisResults = Object.values(childrenResults);
// Aggregate and transform
const aggregatedReport = await aggregateResults(analysisResults, job.data.reportId);
// Save aggregated report
await saveReport(job.data.reportId, aggregatedReport);
await job.updateData({
...job.data,
step: 'complete',
});
return {
success: true,
reportId: job.data.reportId,
analysisCount: analysisResults.length,
};
}
return { success: false, error: 'Unknown step' };
} catch (error) {
// WaitingChildrenError is not a real error—it's a signal to pause
if (error instanceof WaitingChildrenError) {
throw error;
}
// All other errors trigger retry from current step
throw error;
}
}
Critical pattern notes:
- Update step BEFORE waiting:
job.updateData()beforemoveToWaitingChildren() - WaitingChildrenError is special: Don't catch it—let it bubble up to BullMQ
- Check existing children: Use
job.getChildrenValues()to avoid re-spawning - Token parameter: Pass
tokentomoveToWaitingChildren()for proper locking
Production-Ready Example: Multi-Stage Data Pipeline
Let's build a realistic data processing pipeline that:
- Validates input data
- Spawns parallel transformation jobs
- Waits for transformations to complete
- Aggregates results
- Publishes to downstream systems
import { Worker, Job, Queue, WaitingChildrenError } from 'bullmq';
// ================================================
// Job Data Interfaces
// ================================================
interface PipelineJobData {
pipelineId: string;
inputFiles: string[];
outputDestination: string;
step?: 'validate' | 'transform' | 'wait-transform' | 'aggregate' | 'publish' | 'complete';
validatedData?: any[];
aggregatedResult?: any;
}
interface TransformJobData {
pipelineId: string;
fileData: any;
transformationType: string;
}
interface PipelineResult {
success: boolean;
pipelineId: string;
processedFiles: number;
outputLocation?: string;
error?: string;
}
// ================================================
// Queue Setup
// ================================================
const redisConfig = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
};
const pipelineQueue = new Queue('data-pipeline', {
connection: redisConfig,
});
const transformQueue = new Queue('data-transform', {
connection: redisConfig,
});
// ================================================
// Main Pipeline Processor
// ================================================
async function processPipelineJob(job: Job<PipelineJobData>, token?: string): Promise<PipelineResult> {
const step = job.data.step || 'validate';
const startTime = Date.now();
try {
// ================================================
// STEP 1: Validate Input Data
// ================================================
if (step === 'validate') {
await job.updateProgress(10);
await job.log(`Starting validation for ${job.data.inputFiles.length} files`);
// Load and validate all input files
const validationResults = await Promise.all(
job.data.inputFiles.map(async (filePath) => {
const data = await loadFile(filePath);
const isValid = await validateDataSchema(data);
if (!isValid) {
throw new Error(`Invalid data schema in file: ${filePath}`);
}
return { filePath, data };
}),
);
await job.log(`Validated ${validationResults.length} files successfully`);
// Save validated data and advance to transform step
await job.updateData({
...job.data,
validatedData: validationResults,
step: 'transform',
});
// Continue to next step immediately (no waiting)
return processPipelineJob(job, token);
}
// ================================================
// STEP 2: Spawn Transformation Child Jobs
// ================================================
if (step === 'transform') {
await job.updateProgress(30);
const validatedData = job.data.validatedData;
if (!validatedData || validatedData.length === 0) {
throw new Error('No validated data found');
}
await job.log(`Spawning ${validatedData.length} transformation jobs`);
// Check which transform jobs already exist (from previous retry)
const existingChildren = await job.getChildrenValues();
const existingTransforms = Object.keys(existingChildren);
// Only spawn transform jobs that don't already exist
const newTransformJobs = validatedData
.filter((_, index) => {
const jobName = `transform-${job.data.pipelineId}-${index}`;
return !existingTransforms.includes(jobName);
})
.map((item, index) =>
transformQueue.add(
`transform-${job.data.pipelineId}-${index}`,
{
pipelineId: job.data.pipelineId,
fileData: item.data,
transformationType: 'standard',
} as TransformJobData,
{
parent: {
id: job.id!,
queue: job.queueQualifiedName,
},
priority: 1,
},
),
);
if (newTransformJobs.length > 0) {
await Promise.all(newTransformJobs);
await job.log(`Spawned ${newTransformJobs.length} new transformation jobs`);
} else {
await job.log('All transformation jobs already exist');
}
// Update to next step BEFORE waiting
await job.updateData({
...job.data,
step: 'wait-transform',
});
// Pause until all transform children complete
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
throw new WaitingChildrenError();
}
// If we reach here, children completed synchronously (unlikely)
await job.log('Transform jobs completed');
}
// ================================================
// STEP 3: Aggregate Transformation Results
// ================================================
if (step === 'wait-transform') {
await job.updateProgress(70);
await job.log('Collecting transformation results');
// Check if any child jobs failed
await checkChildJobFailures(job);
// Collect results from all transform jobs
const childrenValues = await job.getChildrenValues();
const transformResults = Object.values(childrenValues);
await job.log(`Aggregating ${transformResults.length} transformation results`);
// Aggregate and combine results
const aggregatedResult = await aggregateTransformations(transformResults, job.data.pipelineId);
// Save aggregated result to job data
await job.updateData({
...job.data,
aggregatedResult,
step: 'publish',
});
// Continue to publish step
return processPipelineJob(job, token);
}
// ================================================
// STEP 4: Publish Results
// ================================================
if (step === 'publish') {
await job.updateProgress(90);
await job.log('Publishing results to destination');
const { aggregatedResult, outputDestination, pipelineId } = job.data;
if (!aggregatedResult) {
throw new Error('No aggregated result found');
}
// Publish to external system (S3, database, API, etc.)
const outputLocation = await publishResults(aggregatedResult, outputDestination, pipelineId);
await job.log(`Results published to ${outputLocation}`);
// Update to final step
await job.updateData({
...job.data,
step: 'complete',
});
return processPipelineJob(job, token);
}
// ================================================
// STEP 5: Complete
// ================================================
if (step === 'complete') {
await job.updateProgress(100);
const duration = Date.now() - startTime;
await job.log(`Pipeline completed in ${duration}ms`);
return {
success: true,
pipelineId: job.data.pipelineId,
processedFiles: job.data.validatedData?.length || 0,
outputLocation: job.data.outputDestination,
};
}
// Fallback for unknown step
throw new Error(`Unknown step: ${step}`);
} catch (error) {
// Don't treat WaitingChildrenError as a real error
if (error instanceof WaitingChildrenError) {
await job.log('Job paused, waiting for child jobs to complete');
throw error;
}
// Log actual errors
const duration = Date.now() - startTime;
await job.log(`Job failed at step '${step}' after ${duration}ms: ${error.message}`);
// BullMQ will retry from current step
throw error;
}
}
// ================================================
// Transform Job Processor (Child Job)
// ================================================
async function processTransformJob(job: Job<TransformJobData>): Promise<any> {
await job.updateProgress(0);
const { fileData, transformationType } = job.data;
try {
// Simulate transformation work
const transformed = await applyTransformation(fileData, transformationType);
await job.updateProgress(100);
return {
success: true,
data: transformed,
transformationType,
};
} catch (error) {
throw error;
}
}
// ================================================
// Helper Functions
// ================================================
async function loadFile(filePath: string): Promise<any> {
// Simulate file loading
return { path: filePath, content: 'file data' };
}
async function validateDataSchema(data: any): Promise<boolean> {
// Simulate schema validation
return data && data.content;
}
async function aggregateTransformations(results: any[], pipelineId: string): Promise<any> {
// Combine transformation results
return {
pipelineId,
totalRecords: results.length,
data: results.map((r) => r.data),
};
}
async function publishResults(result: any, destination: string, pipelineId: string): Promise<string> {
// Simulate publishing to external system
const outputPath = `${destination}/${pipelineId}/output.json`;
// await uploadToS3(outputPath, result);
return outputPath;
}
async function applyTransformation(data: any, type: string): Promise<any> {
// Simulate data transformation
return { ...data, transformed: true, type };
}
async function checkChildJobFailures(job: Job): Promise<void> {
// Get all child jobs
const children = await job.getChildren();
for (const child of children.completed || []) {
const childJob = await Job.fromId(transformQueue, child.id);
if (childJob?.isFailed()) {
throw new Error(`Child job ${childJob.id} failed: ${childJob.failedReason}`);
}
}
}
// ================================================
// Worker Initialization
// ================================================
const pipelineWorker = new Worker('data-pipeline', processPipelineJob, {
connection: redisConfig,
concurrency: 2, // Process 2 pipelines concurrently
});
const transformWorker = new Worker('data-transform', processTransformJob, {
connection: redisConfig,
concurrency: 10, // Process 10 transforms concurrently
});
// Event handlers
pipelineWorker.on('completed', (job) => {
console.log(`Pipeline ${job.id} completed`);
});
pipelineWorker.on('failed', (job, error) => {
console.error(`Pipeline ${job?.id} failed at step ${job?.data.step}:`, error.message);
});
transformWorker.on('completed', (job) => {
console.log(`Transform ${job.id} completed`);
});
transformWorker.on('failed', (job, error) => {
console.error(`Transform ${job?.id} failed:`, error.message);
});
// ================================================
// Usage Example
// ================================================
async function startPipeline() {
const job = await pipelineQueue.add('process-pipeline', {
pipelineId: 'pipeline-123',
inputFiles: ['/data/input1.json', '/data/input2.json', '/data/input3.json'],
outputDestination: 's3://output-bucket',
});
console.log(`Started pipeline job ${job.id}`);
}
Best Practices for Step-Based State Machines
1. Always Update Step Before Pausing
// ✅ CORRECT: Update step first
await job.updateData({ ...job.data, step: 'next-step' });
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
throw new WaitingChildrenError();
}
// ❌ WRONG: Update step after waiting (retry starts at wrong step)
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
await job.updateData({ ...job.data, step: 'next-step' }); // This won't execute!
throw new WaitingChildrenError();
}
2. Check for Existing Children Before Spawning
Avoid duplicate child jobs on retry:
const existingChildren = await job.getChildrenValues();
const existingJobNames = Object.keys(existingChildren);
const newJobs = items
.filter((item) => !existingJobNames.includes(`job-${item.id}`))
.map((item) =>
queue.add(`job-${item.id}`, item, {
parent: { id: job.id!, queue: job.queueQualifiedName },
}),
);
if (newJobs.length > 0) {
await Promise.all(newJobs);
}
3. Store Intermediate Results in Job Data
Preserve work between steps:
// Save results after expensive operation
const processedData = await expensiveOperation(job.data.input);
await job.updateData({
...job.data,
processedData, // Available on retry
step: 'next-step',
});
4. Use Granular Progress Reporting
Update progress per step for better observability:
const progressByStep = {
validate: 10,
process: 50,
notify: 90,
complete: 100,
};
await job.updateProgress(progressByStep[step]);
5. Implement Idempotent Steps
Steps should be safe to retry:
async function saveToDatabase(job: Job) {
// Check if already saved
const exists = await checkIfExists(job.data.id);
if (exists) {
// Skip save, already done
return;
}
// Perform save
await db.insert(job.data);
}
6. Don't Catch WaitingChildrenError
Let BullMQ handle the special error:
try {
// ... step logic ...
if (shouldWait) {
throw new WaitingChildrenError();
}
} catch (error) {
// ✅ CORRECT: Re-throw WaitingChildrenError
if (error instanceof WaitingChildrenError) {
throw error;
}
// Handle other errors
throw error;
}
7. Validate Child Job Success
Check child results before proceeding:
async function checkChildJobFailures(job: Job): Promise<void> {
const children = await job.getChildren();
for (const childRef of children.completed || []) {
const childJob = await Job.fromId(queue, childRef.id);
if (childJob?.isFailed()) {
throw new Error(`Child job ${childJob.id} failed: ${childJob.failedReason}`);
}
}
}
Monitoring and Observability
Track step transitions for debugging:
async function processWithLogging(job: Job) {
const step = job.data.step || 'start';
// Log step entry
await job.log(`Entering step: ${step}`);
try {
// ... step logic ...
await job.log(`Completed step: ${step}`);
} catch (error) {
await job.log(`Failed step: ${step} - ${error.message}`);
throw error;
}
}
Use structured logging for production:
import { createLogger } from 'winston';
const logger = createLogger({
defaultMeta: {
service: 'pipeline-worker',
jobId: job.id,
step: job.data.step,
},
});
logger.info('Step completed', {
duration: Date.now() - stepStartTime,
recordsProcessed: results.length,
});
Key Takeaways
- Step pattern enables resumable workflows: Jobs remember progress and resume from failure points
- WaitingChildrenError pauses execution: Use for coordinating parent-child job flows
- Update step BEFORE waiting: Critical for correct retry behavior
- Check existing children: Avoid duplicate work on retry
- Store intermediate results: Preserve expensive computations in job data
- Separate concerns by step: Each step has a single responsibility
- Progress tracking per step: Better observability and user experience
- Idempotent steps are safer: Design steps to handle repeated execution
Step-based state machines transform fragile, all-or-nothing jobs into resilient workflows that gracefully handle failures. This pattern is essential for production systems processing high-value, long-running operations where losing progress is unacceptable.
Ready to build fault-tolerant distributed systems? Start by refactoring your longest-running BullMQ jobs into discrete steps. Your future self (and your infrastructure costs) will thank you.