Bull provides utility functions for safe error handling, Redis client management, and metrics constants through the utils module.
const Queue = require('bull');
const { utils } = Queue;
// Or access directly
const { utils } = require('bull');Utilities for safe function execution with error handling.
/**
* Safe function execution with error handling
* @param fn - Function to execute safely
* @param ctx - Context object for function execution
* @param args - Arguments to pass to the function
* @returns Result of function execution or error object
*/
tryCatch(fn: Function, ctx?: any, args?: any[]): any;
/**
* Error object used by tryCatch for error states
*/
errorObject: {
value: Error | null;
};Usage Examples:
const { utils } = require('bull');
// Safe function execution
function riskyOperation(data) {
if (!data) {
throw new Error('Data is required');
}
return data.toUpperCase();
}
// Execute safely
const result = utils.tryCatch(riskyOperation, null, ['hello world']);
if (result === utils.errorObject) {
console.error('Function failed:', utils.errorObject.value.message);
} else {
console.log('Result:', result); // 'HELLO WORLD'
}
// Safe execution with context
class DataProcessor {
constructor(prefix) {
this.prefix = prefix;
}
process(data) {
return this.prefix + data;
}
}
const processor = new DataProcessor('Processed: ');
const contextResult = utils.tryCatch(processor.process, processor, ['test data']);
if (contextResult !== utils.errorObject) {
console.log(contextResult); // 'Processed: test data'
}Utilities for checking Redis client state and readiness.
/**
* Check if Redis client is ready for operations
* @param client - Redis client instance to check
* @returns Boolean indicating if client is ready
*/
isRedisReady(client: Redis.Redis): boolean;Usage Examples:
const Queue = require('bull');
const { utils } = require('bull');
const queue = new Queue('test queue');
// Wait for queue to be ready and check Redis client
await queue.isReady();
if (utils.isRedisReady(queue.client)) {
console.log('Redis client is ready for operations');
// Safe to perform Redis operations
const info = await queue.client.info();
console.log('Redis info:', info);
} else {
console.log('Redis client is not ready yet');
}
// Check multiple clients
const clients = queue.clients;
const readyClients = clients.filter(utils.isRedisReady);
console.log(`${readyClients.length}/${clients.length} clients are ready`);Utility for safe event emission with error handling.
/**
* Safely emit events with error handling
* @param emitter - EventEmitter instance
* @param event - Event name to emit
* @param args - Arguments to pass to event listeners
*/
emitSafe(emitter: EventEmitter, event: string, ...args: any[]): void;Usage Examples:
const { EventEmitter } = require('events');
const { utils } = require('bull');
const eventEmitter = new EventEmitter();
// Add listener that might throw
eventEmitter.on('risky-event', (data) => {
if (!data.valid) {
throw new Error('Invalid data received');
}
console.log('Processing:', data);
});
// Emit safely - won't crash if listener throws
utils.emitSafe(eventEmitter, 'risky-event', { valid: false });
console.log('Application continues running despite listener error');
// Normal emit for comparison (would crash)
// eventEmitter.emit('risky-event', { valid: false }); // This would throwPredefined time constants for metrics collection and job scheduling.
/**
* Time constants for metrics and scheduling (in minutes)
*/
MetricsTime: {
/** 1 minute */
ONE_MINUTE: 1;
/** 5 minutes */
FIVE_MINUTES: 5;
/** 15 minutes */
FIFTEEN_MINUTES: 15;
/** 30 minutes */
THIRTY_MINUTES: 30;
/** 1 hour (60 minutes) */
ONE_HOUR: 60;
/** 1 week (10080 minutes) */
ONE_WEEK: 10080;
/** 2 weeks (20160 minutes) */
TWO_WEEKS: 20160;
/** 1 month (40320 minutes) */
ONE_MONTH: 40320;
};Usage Examples:
const Queue = require('bull');
const { utils } = require('bull');
const taskQueue = new Queue('scheduled tasks');
// Use time constants for job scheduling
await taskQueue.add('hourly-report',
{ reportType: 'hourly' },
{
repeat: {
every: utils.MetricsTime.ONE_HOUR * 60 * 1000 // Convert to milliseconds
}
}
);
await taskQueue.add('daily-cleanup',
{ task: 'cleanup' },
{
repeat: {
every: utils.MetricsTime.ONE_HOUR * 24 * 60 * 1000 // 24 hours in milliseconds
}
}
);
// Use with metrics configuration
const metricsQueue = new Queue('metrics queue', {
metrics: {
maxDataPoints: utils.MetricsTime.ONE_WEEK // Store 1 week of data points
}
});
// Use for retention policies
const retentionTime = utils.MetricsTime.TWO_WEEKS * 60 * 1000; // Convert to milliseconds
await taskQueue.add('data-retention',
{ retentionPolicy: 'two-weeks' },
{
removeOnComplete: 10,
removeOnFail: 5,
delay: retentionTime
}
);
// Available constants reference
console.log('Available time constants:');
Object.entries(utils.MetricsTime).forEach(([name, minutes]) => {
const hours = minutes / 60;
const days = hours / 24;
console.log(`${name}: ${minutes} minutes (${hours} hours, ${days.toFixed(1)} days)`);
});Combine multiple utilities for robust queue management.
const Queue = require('bull');
const { utils } = require('bull');
class RobustQueueManager {
constructor(queueName, redisOptions = {}) {
this.queue = new Queue(queueName, redisOptions);
this.setupSafeEventHandlers();
}
async initialize() {
await this.queue.isReady();
if (!utils.isRedisReady(this.queue.client)) {
throw new Error('Redis client is not ready');
}
console.log('Queue manager initialized successfully');
}
setupSafeEventHandlers() {
// Use safe event emission for all handlers
this.queue.on('completed', (job, result) => {
const processResult = utils.tryCatch(this.handleJobCompletion, this, [job, result]);
if (processResult === utils.errorObject) {
console.error('Error handling job completion:', utils.errorObject.value);
}
});
this.queue.on('failed', (job, error) => {
const processResult = utils.tryCatch(this.handleJobFailure, this, [job, error]);
if (processResult === utils.errorObject) {
console.error('Error handling job failure:', utils.errorObject.value);
}
});
}
handleJobCompletion(job, result) {
// This method might throw, but it's called safely
console.log(`Job ${job.id} completed:`, result);
// Emit custom events safely
utils.emitSafe(this, 'job-processed', {
jobId: job.id,
success: true,
result: result,
processingTime: job.finishedOn - job.processedOn
});
}
handleJobFailure(job, error) {
console.error(`Job ${job.id} failed:`, error.message);
// Emit failure event safely
utils.emitSafe(this, 'job-processed', {
jobId: job.id,
success: false,
error: error.message,
attemptsMade: job.attemptsMade
});
}
async scheduleMaintenanceJob() {
// Use time constants for scheduling
const maintenanceInterval = utils.MetricsTime.ONE_HOUR * 60 * 1000; // 1 hour in ms
return this.queue.add('maintenance',
{ task: 'cleanup' },
{
repeat: { every: maintenanceInterval },
removeOnComplete: 5,
removeOnFail: 2
}
);
}
}
// Usage
const queueManager = new RobustQueueManager('robust-queue');
queueManager.on('job-processed', (data) => {
if (data.success) {
console.log(`✅ Job ${data.jobId} processed successfully in ${data.processingTime}ms`);
} else {
console.log(`❌ Job ${data.jobId} failed: ${data.error} (attempt ${data.attemptsMade})`);
}
});
await queueManager.initialize();
await queueManager.scheduleMaintenanceJob();