Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Define job processors with concurrency control, named job handling, and error management.
Define how jobs are processed in the queue.
/**
* Define a job processor function for all jobs in the queue
* @param callback - Function to process jobs (callback or promise-based)
* @returns Promise that resolves when processor is registered
*/
process(callback: ProcessCallbackFunction): Promise<void>;
process(callback: ProcessPromiseFunction): Promise<void>;
process(callback: string): Promise<void>; // Path to processor file
type ProcessCallbackFunction<T> = (job: Job<T>, done: DoneCallback) => void;
type ProcessPromiseFunction<T> = (job: Job<T>) => Promise<any>;
type DoneCallback = (error?: Error | null, value?: any) => void;Usage Examples:
const Queue = require('bull');
const emailQueue = new Queue('email processing');
// Promise-based processor
emailQueue.process(async (job) => {
const { email, name } = job.data;
console.log(`Processing email for ${name}`);
// Simulate async work
await sendEmail(email, name);
return { sent: true, timestamp: Date.now() };
});
// Callback-based processor
emailQueue.process((job, done) => {
const { email, name } = job.data;
sendEmail(email, name, (err, result) => {
if (err) {
return done(err);
}
done(null, { sent: true, result });
});
});
// External processor file
emailQueue.process('./processors/emailProcessor.js');Process multiple jobs simultaneously with concurrency control.
/**
* Define a job processor with concurrency limit
* @param concurrency - Maximum number of jobs to process simultaneously
* @param callback - Function to process jobs
* @returns Promise that resolves when processor is registered
*/
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(concurrency: number, callback: string): Promise<void>;Usage Examples:
// Process up to 5 jobs simultaneously
emailQueue.process(5, async (job) => {
console.log(`Processing job ${job.id} with data:`, job.data);
await processEmailJob(job.data);
return { processed: true };
});
// Concurrent processing with callback
emailQueue.process(3, (job, done) => {
processJob(job.data, done);
});Process specific types of jobs by name.
/**
* Define a processor for jobs with a specific name
* @param name - Job name to process
* @param callback - Function to process named jobs
* @returns Promise that resolves when processor is registered
*/
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, callback: string): Promise<void>;Usage Examples:
const Queue = require('bull');
const taskQueue = new Queue('task processing');
// Process only 'welcome-email' jobs
taskQueue.process('welcome-email', async (job) => {
const { email, name } = job.data;
await sendWelcomeEmail(email, name);
return { emailSent: true };
});
// Process only 'generate-report' jobs
taskQueue.process('generate-report', async (job) => {
const { reportType, userId } = job.data;
const report = await generateReport(reportType, userId);
return { reportId: report.id };
});
// You can have multiple processors for different job types
taskQueue.process('send-notification', async (job) => {
await sendNotification(job.data);
return { notificationSent: true };
});Combine named job processing with concurrency control.
/**
* Define a processor for named jobs with concurrency limit
* @param name - Job name to process
* @param concurrency - Maximum concurrent jobs of this type
* @param callback - Function to process jobs
* @returns Promise that resolves when processor is registered
*/
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, concurrency: number, callback: string): Promise<void>;Usage Examples:
// Process up to 2 'image-resize' jobs concurrently
taskQueue.process('image-resize', 2, async (job) => {
const { imagePath, width, height } = job.data;
const resizedPath = await resizeImage(imagePath, width, height);
return { resizedPath };
});
// Process up to 10 'email-send' jobs concurrently
taskQueue.process('email-send', 10, async (job) => {
await sendEmail(job.data);
return { sent: true };
});Track and report job progress during processing.
// Progress tracking is done within job processors
// Access via job.progress() method in processorsUsage Examples:
taskQueue.process('large-task', async (job) => {
const { items } = job.data;
for (let i = 0; i < items.length; i++) {
await processItem(items[i]);
// Update progress (0-100 or any value)
const progress = Math.round((i + 1) / items.length * 100);
await job.progress(progress);
}
return { processed: items.length };
});
// Listen for progress updates
taskQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} is ${progress}% complete`);
});Handle errors and job failures in processors.
// Error handling patterns for different processor typesUsage Examples:
// Promise-based error handling
taskQueue.process(async (job) => {
try {
const result = await riskyOperation(job.data);
return result;
} catch (error) {
// Error will be caught by Bull and job marked as failed
throw new Error(`Processing failed: ${error.message}`);
}
});
// Callback-based error handling
taskQueue.process((job, done) => {
riskyOperation(job.data, (err, result) => {
if (err) {
// Pass error to done callback
return done(new Error(`Processing failed: ${err.message}`));
}
done(null, result);
});
});
// Custom error handling with job methods
taskQueue.process(async (job) => {
try {
await job.progress(10);
const data = await fetchData(job.data.url);
await job.progress(50);
const processed = await processData(data);
await job.progress(100);
return processed;
} catch (error) {
// Log error details
await job.log(`Error occurred: ${error.message}`);
throw error;
}
});Monitor processor lifecycle through events.
// Events are emitted during job processing lifecycle
// See Event System documentation for complete event handlingUsage Examples:
taskQueue.process(async (job) => {
await processJob(job.data);
return { success: true };
});
// Monitor processing events
taskQueue.on('active', (job) => {
console.log(`Job ${job.id} started processing`);
});
taskQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
taskQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
taskQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled and will be retried`);
});Use external files for job processors.
Usage Examples:
// In your main file
const taskQueue = new Queue('tasks');
// Use external processor file
taskQueue.process('./processors/taskProcessor.js');
// Use external processor with concurrency
taskQueue.process(5, './processors/concurrentProcessor.js');
// Use external processor for named jobs
taskQueue.process('specific-task', './processors/specificTaskProcessor.js');// In processors/taskProcessor.js
module.exports = async function(job) {
const { taskType, data } = job.data;
console.log(`Processing ${taskType} with data:`, data);
// Update progress
await job.progress(50);
// Process the task
const result = await performTask(taskType, data);
await job.progress(100);
return result;
};// Example of robust job processor with best practices
taskQueue.process('robust-task', 3, async (job) => {
const startTime = Date.now();
try {
// Validate job data
if (!job.data || !job.data.required_field) {
throw new Error('Invalid job data: missing required_field');
}
// Log job start
await job.log(`Started processing at ${new Date().toISOString()}`);
// Update progress at key points
await job.progress(10);
// Perform work with proper error handling
const step1 = await performStep1(job.data);
await job.progress(30);
const step2 = await performStep2(step1);
await job.progress(60);
const result = await performStep3(step2);
await job.progress(100);
// Log completion
const duration = Date.now() - startTime;
await job.log(`Completed in ${duration}ms`);
return {
success: true,
result,
duration
};
} catch (error) {
// Log detailed error information
await job.log(`Error: ${error.message}`);
await job.log(`Stack: ${error.stack}`);
// Re-throw to mark job as failed
throw error;
}
});Install with Tessl CLI
npx tessl i tessl/npm-bull