Specialized error classes for controlling job flow and retry behavior. BullMQ provides fine-grained error handling strategies that enable sophisticated job failure management and recovery patterns.
BullMQ provides specialized error classes that control job behavior when thrown from processors.
/**
* Forces job to failed state regardless of retry attempts
*/
class UnrecoverableError extends Error {
constructor(message: string);
}
/**
* Moves job to delayed state for later retry
*/
class DelayedError extends Error {
constructor(delay?: number);
}
/**
* Indicates rate limit exceeded, delays job processing
*/
class RateLimitError extends Error {
constructor(message: string, delay?: number);
}
/**
* Moves job to waiting-children state until dependencies complete
*/
class WaitingChildrenError extends Error {
constructor(message?: string);
}
/**
* Returns job to waiting state for retry
*/
class WaitingError extends Error {
constructor(message?: string);
}/** Marker for unrecoverable errors */
const UNRECOVERABLE_ERROR = 'bullmq:unrecoverable';Usage Examples:
import {
Worker,
UnrecoverableError,
DelayedError,
RateLimitError,
WaitingChildrenError,
WaitingError
} from "bullmq";
// Worker with comprehensive error handling
const apiWorker = new Worker("api-calls", async (job) => {
const { url, method, data } = job.data;
try {
const response = await makeApiCall(url, method, data);
return response.data;
} catch (error) {
// Handle different error scenarios
if (error.status === 401) {
// Authentication failed - don't retry
throw new UnrecoverableError("Authentication failed - invalid credentials");
}
if (error.status === 429) {
// Rate limited - delay retry
const retryAfter = error.headers['retry-after'] * 1000 || 60000;
throw new RateLimitError("API rate limit exceeded", retryAfter);
}
if (error.status === 400) {
// Bad request - don't retry
throw new UnrecoverableError(`Bad request: ${error.message}`);
}
if (error.status >= 500) {
// Server error - delay retry
throw new DelayedError(30000); // 30 second delay
}
if (error.code === 'ECONNREFUSED') {
// Connection refused - delay retry
throw new DelayedError(60000); // 1 minute delay
}
// Other errors - normal retry behavior
throw error;
}
});
// Start worker
await apiWorker.run();Forces immediate job failure without retries, regardless of job options.
// Examples of unrecoverable errors
const dataWorker = new Worker("data-processing", async (job) => {
const { data, schema } = job.data;
// Validate required fields
if (!data || !schema) {
throw new UnrecoverableError("Missing required data or schema");
}
// Check data format
if (!isValidFormat(data)) {
throw new UnrecoverableError("Invalid data format - cannot be processed");
}
// Check permissions
if (!userHasPermission(job.data.userId, "process-data")) {
throw new UnrecoverableError("User lacks permission to process this data");
}
// Process data
return await processData(data, schema);
});Moves job to delayed state for retry after specified delay.
// Delayed retry scenarios
const fileWorker = new Worker("file-processing", async (job) => {
const { filePath } = job.data;
try {
// Check if file exists
if (!await fileExists(filePath)) {
// File might be still uploading - delay retry
throw new DelayedError(30000); // 30 seconds
}
// Check if file is locked by another process
if (await isFileLocked(filePath)) {
throw new DelayedError(10000); // 10 seconds
}
// Process file
return await processFile(filePath);
} catch (error) {
if (error.code === 'EBUSY') {
// File busy - delay retry
throw new DelayedError(15000);
}
throw error;
}
});
// Exponential backoff with DelayedError
const exponentialDelayWorker = new Worker("flaky-service", async (job) => {
const delay = Math.pow(2, job.attemptsMade) * 1000; // 1s, 2s, 4s, 8s, ...
try {
return await callFlakyService(job.data);
} catch (error) {
if (job.attemptsMade < 5) {
throw new DelayedError(delay);
}
throw error; // Final attempt - let it fail normally
}
});Handles rate limiting with automatic delay calculation.
// Rate limit handling
const socialMediaWorker = new Worker("social-posts", async (job) => {
const { platform, content } = job.data;
try {
return await postToSocialMedia(platform, content);
} catch (error) {
if (error.code === 'RATE_LIMITED') {
// Use rate limit info from API response
const resetTime = error.rateLimit?.reset || Date.now() + 3600000;
const delay = resetTime - Date.now();
throw new RateLimitError(
`Rate limited until ${new Date(resetTime).toISOString()}`,
Math.max(delay, 0)
);
}
throw error;
}
});
// Global rate limiting
let lastApiCall = 0;
const minInterval = 1000; // 1 second between calls
const rateLimitedWorker = new Worker("api-worker", async (job) => {
const now = Date.now();
const timeSinceLastCall = now - lastApiCall;
if (timeSinceLastCall < minInterval) {
const delay = minInterval - timeSinceLastCall;
throw new RateLimitError("Global rate limit", delay);
}
lastApiCall = now;
return await makeApiCall(job.data.url);
});Used in flow contexts to wait for child jobs to complete.
// Parent job waiting for children
const parentWorker = new Worker("parent-jobs", async (job) => {
const { childJobs } = job.data;
// Create child jobs
const childQueue = new Queue("child-jobs");
const children = await Promise.all(
childJobs.map(childData => childQueue.add("process-child", childData))
);
// Check if all children are completed
const allCompleted = await Promise.all(
children.map(child => child.isCompleted())
);
if (!allCompleted.every(Boolean)) {
// Some children not completed - wait
throw new WaitingChildrenError("Waiting for child jobs to complete");
}
// All children completed - proceed with parent processing
const childResults = await Promise.all(
children.map(child => child.returnvalue)
);
return { childResults, processed: true };
});Returns job to waiting state for immediate retry.
// Resource availability check
const resourceWorker = new Worker("resource-jobs", async (job) => {
const { resourceId } = job.data;
// Check resource availability
const resource = await getResource(resourceId);
if (!resource.available) {
// Resource not available - return to waiting
throw new WaitingError("Resource not available, returning to queue");
}
// Resource available - process job
return await processWithResource(resource, job.data);
});
// Queue capacity check
const capacityWorker = new Worker("capacity-jobs", async (job) => {
const currentLoad = await getCurrentSystemLoad();
if (currentLoad > 0.8) {
// System overloaded - return to waiting
throw new WaitingError("System overloaded, deferring job");
}
return await processJob(job.data);
});// Comprehensive error handling strategy
class ErrorHandlingWorker {
constructor(queueName: string, processor: Function) {
const worker = new Worker(queueName, async (job) => {
try {
return await processor(job);
} catch (error) {
return this.handleError(error, job);
}
});
// Log all failures
worker.on("failed", (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
if (error instanceof UnrecoverableError) {
console.error("Unrecoverable error - job will not be retried");
}
});
return worker;
}
private async handleError(error: Error, job: any) {
// Network errors - delay retry
if (this.isNetworkError(error)) {
throw new DelayedError(30000);
}
// Validation errors - unrecoverable
if (this.isValidationError(error)) {
throw new UnrecoverableError(error.message);
}
// Service unavailable - delay retry
if (this.isServiceUnavailable(error)) {
const delay = Math.min(60000 * job.attemptsMade, 300000); // Max 5 minutes
throw new DelayedError(delay);
}
// Re-throw for normal retry behavior
throw error;
}
private isNetworkError(error: Error): boolean {
return ['ECONNRESET', 'ECONNREFUSED', 'ETIMEDOUT'].includes((error as any).code);
}
private isValidationError(error: Error): boolean {
return error.message.includes('validation') || error.message.includes('invalid');
}
private isServiceUnavailable(error: Error): boolean {
return (error as any).status === 503;
}
}
// Usage
const smartWorker = new ErrorHandlingWorker("smart-processing", async (job) => {
// Your job processing logic
return await processJob(job.data);
});// Circuit breaker pattern with errors
class CircuitBreakerWorker {
private failures = 0;
private lastFailure = 0;
private readonly threshold = 5;
private readonly timeout = 60000; // 1 minute
constructor(queueName: string) {
const worker = new Worker(queueName, async (job) => {
// Check circuit breaker state
if (this.isCircuitOpen()) {
throw new DelayedError(this.timeout);
}
try {
const result = await this.processJob(job.data);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
});
}
private isCircuitOpen(): boolean {
return this.failures >= this.threshold &&
(Date.now() - this.lastFailure) < this.timeout;
}
private onSuccess() {
this.failures = 0;
}
private onFailure() {
this.failures++;
this.lastFailure = Date.now();
}
private async processJob(data: any) {
// Actual job processing
return await externalService.process(data);
}
}// Error tracking and alerting
const monitoredWorker = new Worker("monitored-jobs", async (job) => {
try {
return await processJob(job.data);
} catch (error) {
// Track error types
trackError(error, job);
// Different handling based on error type
if (error instanceof UnrecoverableError) {
// Alert for unrecoverable errors
await sendAlert("CRITICAL", `Unrecoverable error in job ${job.id}: ${error.message}`);
throw error;
}
if (error instanceof RateLimitError) {
// Track rate limiting
trackRateLimit(job.name, error.delay);
throw error;
}
// Default handling
throw error;
}
});
function trackError(error: Error, job: any) {
console.log(`Error in job ${job.id} (attempt ${job.attemptsMade}):`, {
errorType: error.constructor.name,
message: error.message,
jobName: job.name,
timestamp: new Date().toISOString(),
});
}
async function sendAlert(level: string, message: string) {
// Send to monitoring system
console.log(`[${level}] ${message}`);
}
function trackRateLimit(jobName: string, delay: number) {
console.log(`Rate limit hit for ${jobName}, delayed by ${delay}ms`);
}