Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Bull provides utility functions for safe error handling, Redis client management, and metrics constants through the utils module.
const Queue = require('bull');
const { utils } = Queue;
// Or access directly
const { utils } = require('bull');Utilities for safe function execution with error handling.
/**
* Safe function execution with error handling
* @param fn - Function to execute safely
* @param ctx - Context object for function execution
* @param args - Arguments to pass to the function
* @returns Result of function execution or error object
*/
tryCatch(fn: Function, ctx?: any, args?: any[]): any;
/**
* Error object used by tryCatch for error states
*/
errorObject: {
value: Error | null;
};Usage Examples:
const { utils } = require('bull');
// Safe function execution
function riskyOperation(data) {
if (!data) {
throw new Error('Data is required');
}
return data.toUpperCase();
}
// Execute safely
const result = utils.tryCatch(riskyOperation, null, ['hello world']);
if (result === utils.errorObject) {
console.error('Function failed:', utils.errorObject.value.message);
} else {
console.log('Result:', result); // 'HELLO WORLD'
}
// Safe execution with context
class DataProcessor {
constructor(prefix) {
this.prefix = prefix;
}
process(data) {
return this.prefix + data;
}
}
const processor = new DataProcessor('Processed: ');
const contextResult = utils.tryCatch(processor.process, processor, ['test data']);
if (contextResult !== utils.errorObject) {
console.log(contextResult); // 'Processed: test data'
}Utilities for checking Redis client state and readiness.
/**
* Check if Redis client is ready for operations
* @param client - Redis client instance to check
* @returns Boolean indicating if client is ready
*/
isRedisReady(client: Redis.Redis): boolean;Usage Examples:
const Queue = require('bull');
const { utils } = require('bull');
const queue = new Queue('test queue');
// Wait for queue to be ready and check Redis client
await queue.isReady();
if (utils.isRedisReady(queue.client)) {
console.log('Redis client is ready for operations');
// Safe to perform Redis operations
const info = await queue.client.info();
console.log('Redis info:', info);
} else {
console.log('Redis client is not ready yet');
}
// Check multiple clients
const clients = queue.clients;
const readyClients = clients.filter(utils.isRedisReady);
console.log(`${readyClients.length}/${clients.length} clients are ready`);Utility for safe event emission with error handling.
/**
* Safely emit events with error handling
* @param emitter - EventEmitter instance
* @param event - Event name to emit
* @param args - Arguments to pass to event listeners
*/
emitSafe(emitter: EventEmitter, event: string, ...args: any[]): void;Usage Examples:
const { EventEmitter } = require('events');
const { utils } = require('bull');
const eventEmitter = new EventEmitter();
// Add listener that might throw
eventEmitter.on('risky-event', (data) => {
if (!data.valid) {
throw new Error('Invalid data received');
}
console.log('Processing:', data);
});
// Emit safely - won't crash if listener throws
utils.emitSafe(eventEmitter, 'risky-event', { valid: false });
console.log('Application continues running despite listener error');
// Normal emit for comparison (would crash)
// eventEmitter.emit('risky-event', { valid: false }); // This would throwPredefined time constants for metrics collection and job scheduling.
/**
* Time constants for metrics and scheduling (in minutes)
*/
MetricsTime: {
/** 1 minute */
ONE_MINUTE: 1;
/** 5 minutes */
FIVE_MINUTES: 5;
/** 15 minutes */
FIFTEEN_MINUTES: 15;
/** 30 minutes */
THIRTY_MINUTES: 30;
/** 1 hour (60 minutes) */
ONE_HOUR: 60;
/** 1 week (10080 minutes) */
ONE_WEEK: 10080;
/** 2 weeks (20160 minutes) */
TWO_WEEKS: 20160;
/** 1 month (40320 minutes) */
ONE_MONTH: 40320;
};Usage Examples:
const Queue = require('bull');
const { utils } = require('bull');
const taskQueue = new Queue('scheduled tasks');
// Use time constants for job scheduling
await taskQueue.add('hourly-report',
{ reportType: 'hourly' },
{
repeat: {
every: utils.MetricsTime.ONE_HOUR * 60 * 1000 // Convert to milliseconds
}
}
);
await taskQueue.add('daily-cleanup',
{ task: 'cleanup' },
{
repeat: {
every: utils.MetricsTime.ONE_HOUR * 24 * 60 * 1000 // 24 hours in milliseconds
}
}
);
// Use with metrics configuration
const metricsQueue = new Queue('metrics queue', {
metrics: {
maxDataPoints: utils.MetricsTime.ONE_WEEK // Store 1 week of data points
}
});
// Use for retention policies
const retentionTime = utils.MetricsTime.TWO_WEEKS * 60 * 1000; // Convert to milliseconds
await taskQueue.add('data-retention',
{ retentionPolicy: 'two-weeks' },
{
removeOnComplete: 10,
removeOnFail: 5,
delay: retentionTime
}
);
// Available constants reference
console.log('Available time constants:');
Object.entries(utils.MetricsTime).forEach(([name, minutes]) => {
const hours = minutes / 60;
const days = hours / 24;
console.log(`${name}: ${minutes} minutes (${hours} hours, ${days.toFixed(1)} days)`);
});Combine multiple utilities for robust queue management.
const Queue = require('bull');
const { utils } = require('bull');
class RobustQueueManager {
constructor(queueName, redisOptions = {}) {
this.queue = new Queue(queueName, redisOptions);
this.setupSafeEventHandlers();
}
async initialize() {
await this.queue.isReady();
if (!utils.isRedisReady(this.queue.client)) {
throw new Error('Redis client is not ready');
}
console.log('Queue manager initialized successfully');
}
setupSafeEventHandlers() {
// Use safe event emission for all handlers
this.queue.on('completed', (job, result) => {
const processResult = utils.tryCatch(this.handleJobCompletion, this, [job, result]);
if (processResult === utils.errorObject) {
console.error('Error handling job completion:', utils.errorObject.value);
}
});
this.queue.on('failed', (job, error) => {
const processResult = utils.tryCatch(this.handleJobFailure, this, [job, error]);
if (processResult === utils.errorObject) {
console.error('Error handling job failure:', utils.errorObject.value);
}
});
}
handleJobCompletion(job, result) {
// This method might throw, but it's called safely
console.log(`Job ${job.id} completed:`, result);
// Emit custom events safely
utils.emitSafe(this, 'job-processed', {
jobId: job.id,
success: true,
result: result,
processingTime: job.finishedOn - job.processedOn
});
}
handleJobFailure(job, error) {
console.error(`Job ${job.id} failed:`, error.message);
// Emit failure event safely
utils.emitSafe(this, 'job-processed', {
jobId: job.id,
success: false,
error: error.message,
attemptsMade: job.attemptsMade
});
}
async scheduleMaintenanceJob() {
// Use time constants for scheduling
const maintenanceInterval = utils.MetricsTime.ONE_HOUR * 60 * 1000; // 1 hour in ms
return this.queue.add('maintenance',
{ task: 'cleanup' },
{
repeat: { every: maintenanceInterval },
removeOnComplete: 5,
removeOnFail: 2
}
);
}
}
// Usage
const queueManager = new RobustQueueManager('robust-queue');
queueManager.on('job-processed', (data) => {
if (data.success) {
console.log(`✅ Job ${data.jobId} processed successfully in ${data.processingTime}ms`);
} else {
console.log(`❌ Job ${data.jobId} failed: ${data.error} (attempt ${data.attemptsMade})`);
}
});
await queueManager.initialize();
await queueManager.scheduleMaintenanceJob();Install with Tessl CLI
npx tessl i tessl/npm-bull