Define job processors with concurrency control, named job handling, and error management.
Define how jobs are processed in the queue.
/**
* Define a job processor function for all jobs in the queue
* @param callback - Function to process jobs (callback or promise-based)
* @returns Promise that resolves when processor is registered
*/
process(callback: ProcessCallbackFunction): Promise<void>;
process(callback: ProcessPromiseFunction): Promise<void>;
process(callback: string): Promise<void>; // Path to processor file
type ProcessCallbackFunction<T> = (job: Job<T>, done: DoneCallback) => void;
type ProcessPromiseFunction<T> = (job: Job<T>) => Promise<any>;
type DoneCallback = (error?: Error | null, value?: any) => void;Usage Examples:
const Queue = require('bull');
const emailQueue = new Queue('email processing');
// Promise-based processor
emailQueue.process(async (job) => {
const { email, name } = job.data;
console.log(`Processing email for ${name}`);
// Simulate async work
await sendEmail(email, name);
return { sent: true, timestamp: Date.now() };
});
// Callback-based processor
emailQueue.process((job, done) => {
const { email, name } = job.data;
sendEmail(email, name, (err, result) => {
if (err) {
return done(err);
}
done(null, { sent: true, result });
});
});
// External processor file
emailQueue.process('./processors/emailProcessor.js');Process multiple jobs simultaneously with concurrency control.
/**
* Define a job processor with concurrency limit
* @param concurrency - Maximum number of jobs to process simultaneously
* @param callback - Function to process jobs
* @returns Promise that resolves when processor is registered
*/
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(concurrency: number, callback: string): Promise<void>;Usage Examples:
// Process up to 5 jobs simultaneously
emailQueue.process(5, async (job) => {
console.log(`Processing job ${job.id} with data:`, job.data);
await processEmailJob(job.data);
return { processed: true };
});
// Concurrent processing with callback
emailQueue.process(3, (job, done) => {
processJob(job.data, done);
});Process specific types of jobs by name.
/**
* Define a processor for jobs with a specific name
* @param name - Job name to process
* @param callback - Function to process named jobs
* @returns Promise that resolves when processor is registered
*/
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, callback: string): Promise<void>;Usage Examples:
const Queue = require('bull');
const taskQueue = new Queue('task processing');
// Process only 'welcome-email' jobs
taskQueue.process('welcome-email', async (job) => {
const { email, name } = job.data;
await sendWelcomeEmail(email, name);
return { emailSent: true };
});
// Process only 'generate-report' jobs
taskQueue.process('generate-report', async (job) => {
const { reportType, userId } = job.data;
const report = await generateReport(reportType, userId);
return { reportId: report.id };
});
// You can have multiple processors for different job types
taskQueue.process('send-notification', async (job) => {
await sendNotification(job.data);
return { notificationSent: true };
});Combine named job processing with concurrency control.
/**
* Define a processor for named jobs with concurrency limit
* @param name - Job name to process
* @param concurrency - Maximum concurrent jobs of this type
* @param callback - Function to process jobs
* @returns Promise that resolves when processor is registered
*/
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, concurrency: number, callback: string): Promise<void>;Usage Examples:
// Process up to 2 'image-resize' jobs concurrently
taskQueue.process('image-resize', 2, async (job) => {
const { imagePath, width, height } = job.data;
const resizedPath = await resizeImage(imagePath, width, height);
return { resizedPath };
});
// Process up to 10 'email-send' jobs concurrently
taskQueue.process('email-send', 10, async (job) => {
await sendEmail(job.data);
return { sent: true };
});Track and report job progress during processing.
// Progress tracking is done within job processors
// Access via job.progress() method in processorsUsage Examples:
taskQueue.process('large-task', async (job) => {
const { items } = job.data;
for (let i = 0; i < items.length; i++) {
await processItem(items[i]);
// Update progress (0-100 or any value)
const progress = Math.round((i + 1) / items.length * 100);
await job.progress(progress);
}
return { processed: items.length };
});
// Listen for progress updates
taskQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} is ${progress}% complete`);
});Handle errors and job failures in processors.
// Error handling patterns for different processor typesUsage Examples:
// Promise-based error handling
taskQueue.process(async (job) => {
try {
const result = await riskyOperation(job.data);
return result;
} catch (error) {
// Error will be caught by Bull and job marked as failed
throw new Error(`Processing failed: ${error.message}`);
}
});
// Callback-based error handling
taskQueue.process((job, done) => {
riskyOperation(job.data, (err, result) => {
if (err) {
// Pass error to done callback
return done(new Error(`Processing failed: ${err.message}`));
}
done(null, result);
});
});
// Custom error handling with job methods
taskQueue.process(async (job) => {
try {
await job.progress(10);
const data = await fetchData(job.data.url);
await job.progress(50);
const processed = await processData(data);
await job.progress(100);
return processed;
} catch (error) {
// Log error details
await job.log(`Error occurred: ${error.message}`);
throw error;
}
});Monitor processor lifecycle through events.
// Events are emitted during job processing lifecycle
// See Event System documentation for complete event handlingUsage Examples:
taskQueue.process(async (job) => {
await processJob(job.data);
return { success: true };
});
// Monitor processing events
taskQueue.on('active', (job) => {
console.log(`Job ${job.id} started processing`);
});
taskQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
taskQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
taskQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled and will be retried`);
});Use external files for job processors.
Usage Examples:
// In your main file
const taskQueue = new Queue('tasks');
// Use external processor file
taskQueue.process('./processors/taskProcessor.js');
// Use external processor with concurrency
taskQueue.process(5, './processors/concurrentProcessor.js');
// Use external processor for named jobs
taskQueue.process('specific-task', './processors/specificTaskProcessor.js');// In processors/taskProcessor.js
module.exports = async function(job) {
const { taskType, data } = job.data;
console.log(`Processing ${taskType} with data:`, data);
// Update progress
await job.progress(50);
// Process the task
const result = await performTask(taskType, data);
await job.progress(100);
return result;
};// Example of robust job processor with best practices
taskQueue.process('robust-task', 3, async (job) => {
const startTime = Date.now();
try {
// Validate job data
if (!job.data || !job.data.required_field) {
throw new Error('Invalid job data: missing required_field');
}
// Log job start
await job.log(`Started processing at ${new Date().toISOString()}`);
// Update progress at key points
await job.progress(10);
// Perform work with proper error handling
const step1 = await performStep1(job.data);
await job.progress(30);
const step2 = await performStep2(step1);
await job.progress(60);
const result = await performStep3(step2);
await job.progress(100);
// Log completion
const duration = Date.now() - startTime;
await job.log(`Completed in ${duration}ms`);
return {
success: true,
result,
duration
};
} catch (error) {
// Log detailed error information
await job.log(`Error: ${error.message}`);
await job.log(`Stack: ${error.stack}`);
// Re-throw to mark job as failed
throw error;
}
});