Worker-based job processing with configurable concurrency, error handling, and retry mechanisms. The Worker class provides robust job processing capabilities with support for rate limiting, stalled job recovery, and graceful shutdown.
Main class for processing jobs from queues with comprehensive concurrency and error handling.
/**
* Worker class for processing jobs from queues
*/
class Worker<T = any, R = any, N extends string = string> {
constructor(name: string, processor: Processor<T, R, N>, opts?: WorkerOptions);
/** Start processing jobs */
run(): Promise<void>;
/** Pause job processing */
pause(): Promise<void>;
/** Resume job processing */
resume(): Promise<void>;
/** Get next job for processing */
getNextJob(token: string): Promise<Job<T, R, N> | undefined>;
/** Close worker gracefully */
close(force?: boolean): Promise<void>;
/** Check if worker is running */
isRunning(): boolean;
/** Check if worker is paused */
isPaused(): boolean;
/** Wait until worker is ready */
waitUntilReady(): Promise<void>;
}/**
* Job processor function signature
*/
type Processor<T = any, R = any, N extends string = string> = (
job: Job<T, R, N>,
token?: string
) => Promise<R>;Usage Examples:
import { Worker, Job } from "bullmq";
// Basic worker
const emailWorker = new Worker("email processing", async (job) => {
const { to, subject, body } = job.data;
// Process the job
console.log(`Sending email to ${to}`);
// Update progress
await job.updateProgress(25);
// Simulate email sending
await sendEmail(to, subject, body);
await job.updateProgress(100);
return { emailId: "email-123", sentAt: new Date() };
});
// Worker with advanced options
const imageWorker = new Worker("image processing",
async (job: Job<{ imageUrl: string; transformations: string[] }>) => {
const { imageUrl, transformations } = job.data;
await job.updateProgress(10);
// Download image
const image = await downloadImage(imageUrl);
await job.updateProgress(30);
// Apply transformations
let processedImage = image;
for (let i = 0; i < transformations.length; i++) {
processedImage = await applyTransformation(processedImage, transformations[i]);
await job.updateProgress(30 + (50 * (i + 1) / transformations.length));
}
// Upload processed image
const newUrl = await uploadImage(processedImage);
await job.updateProgress(100);
return { originalUrl: imageUrl, processedUrl: newUrl };
},
{
concurrency: 5,
limiter: {
max: 10,
duration: 1000,
},
removeOnComplete: 100,
removeOnFail: 50,
}
);
// Start processing
await emailWorker.run();
await imageWorker.run();interface WorkerOptions {
/** Maximum number of jobs to process concurrently (default: 1) */
concurrency?: number;
/** Rate limiting configuration */
limiter?: RateLimiterOptions;
/** Maximum number of times a job can be stalled before failed (default: 1) */
maxStalledCount?: number;
/** Interval for checking stalled jobs in ms (default: 30000) */
stalledInterval?: number;
/** Keep completed jobs count/age */
removeOnComplete?: number | KeepJobs;
/** Keep failed jobs count/age */
removeOnFail?: number | KeepJobs;
/** Job lock duration in ms (default: 30000) */
lockDuration?: number;
/** Lock renewal interval in ms (default: 15000) */
lockRenewTime?: number;
/** Skip stalled job detection */
skipStalledCheck?: boolean;
/** Skip automatic lock renewal */
skipLockRenewal?: boolean;
/** Redis connection options */
connection?: ConnectionOptions;
/** Key prefix for Redis keys */
prefix?: string;
/** Metrics collection options */
metrics?: MetricsOptions;
/** Telemetry configuration */
telemetry?: TelemetryOptions;
}interface RateLimiterOptions {
/** Maximum number of jobs to process */
max: number;
/** Duration window in milliseconds */
duration: number;
/** Bucket size for token bucket algorithm */
bucketSize?: number;
/** Group jobs by a function for separate rate limiting */
groupKey?: string | ((job: Job) => string);
}interface KeepJobs {
/** Keep jobs by age (milliseconds) */
age?: number;
/** Keep jobs by count */
count?: number;
}The Worker class emits events for monitoring job processing:
interface WorkerEvents {
'active': (job: Job, prev?: string) => void;
'completed': (job: Job, result: any, prev?: string) => void;
'failed': (job: Job, error: Error, prev?: string) => void;
'progress': (job: Job, progress: number | object) => void;
'stalled': (jobId: string, prev?: string) => void;
'error': (error: Error) => void;
'drained': () => void;
'paused': () => void;
'resumed': () => void;
'closing': () => void;
'closed': () => void;
'ioredis:close': () => void;
'ioredis:select': () => void;
}Event Usage:
// Worker event listeners
emailWorker.on("active", (job) => {
console.log(`Started processing job ${job.id}`);
});
emailWorker.on("completed", (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
emailWorker.on("failed", (job, error) => {
console.log(`Job ${job.id} failed:`, error.message);
// Log additional error details
if (job.stacktrace) {
console.log("Stack trace:", job.stacktrace);
}
});
emailWorker.on("progress", (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
emailWorker.on("stalled", (jobId) => {
console.log(`Job ${jobId} stalled`);
});
emailWorker.on("drained", () => {
console.log("Queue is drained - no more jobs to process");
});
emailWorker.on("error", (error) => {
console.error("Worker error:", error);
});// Worker with comprehensive configuration
const advancedWorker = new Worker(
"data processing",
async (job: Job<{ dataset: string; operations: string[] }>) => {
// Job processing logic
const { dataset, operations } = job.data;
// Process with progress updates
for (let i = 0; i < operations.length; i++) {
await processOperation(dataset, operations[i]);
await job.updateProgress((i + 1) / operations.length * 100);
}
return { processed: true, timestamp: Date.now() };
},
{
// Process 3 jobs concurrently
concurrency: 3,
// Rate limiting: max 20 jobs per minute
limiter: {
max: 20,
duration: 60 * 1000,
},
// Job timeout and retry settings
lockDuration: 60 * 1000, // 1 minute timeout
maxStalledCount: 2, // Allow 2 stalls before failing
stalledInterval: 30 * 1000, // Check every 30 seconds
// Cleanup settings
removeOnComplete: 1000, // Keep 1000 completed jobs
removeOnFail: {
age: 24 * 60 * 60 * 1000, // Keep failed jobs for 24 hours
count: 100, // But max 100 failed jobs
},
// Metrics collection
metrics: {
maxDataPoints: 100,
},
}
);
// Graceful shutdown
process.on("SIGINT", async () => {
console.log("Shutting down worker...");
await advancedWorker.close();
process.exit(0);
});// Start worker
await emailWorker.run();
// Pause processing (current jobs continue, no new jobs started)
await emailWorker.pause();
// Resume processing
await emailWorker.resume();
// Check worker status
console.log("Is running:", emailWorker.isRunning());
console.log("Is paused:", emailWorker.isPaused());
// Wait for worker to be ready
await emailWorker.waitUntilReady();
// Graceful shutdown (wait for current jobs to complete)
await emailWorker.close();
// Force shutdown (terminate immediately)
await emailWorker.close(true);import { UnrecoverableError, DelayedError, RateLimitError } from "bullmq";
const robustWorker = new Worker("api calls", async (job) => {
try {
const response = await makeApiCall(job.data.url);
return response.data;
} catch (error) {
if (error.status === 401) {
// Don't retry authentication errors
throw new UnrecoverableError("Authentication failed");
} else if (error.status === 429) {
// Rate limited - delay retry
throw new RateLimitError("Rate limit exceeded", 60000);
} else if (error.status >= 500) {
// Server error - delay retry
throw new DelayedError(30000);
} else {
// Other errors - normal retry
throw error;
}
}
});