or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

utils.mddocs/

Utility Functions

Bull provides utility functions for safe error handling, Redis client management, and metrics constants through the utils module.

Core Imports

const Queue = require('bull');
const { utils } = Queue;

// Or access directly
const { utils } = require('bull');

Capabilities

Safe Function Execution

Utilities for safe function execution with error handling.

/**
 * Safe function execution with error handling
 * @param fn - Function to execute safely
 * @param ctx - Context object for function execution
 * @param args - Arguments to pass to the function
 * @returns Result of function execution or error object
 */
tryCatch(fn: Function, ctx?: any, args?: any[]): any;

/**
 * Error object used by tryCatch for error states
 */
errorObject: {
  value: Error | null;
};

Usage Examples:

const { utils } = require('bull');

// Safe function execution
function riskyOperation(data) {
  if (!data) {
    throw new Error('Data is required');
  }
  return data.toUpperCase();
}

// Execute safely
const result = utils.tryCatch(riskyOperation, null, ['hello world']);

if (result === utils.errorObject) {
  console.error('Function failed:', utils.errorObject.value.message);
} else {
  console.log('Result:', result); // 'HELLO WORLD'
}

// Safe execution with context
class DataProcessor {
  constructor(prefix) {
    this.prefix = prefix;
  }
  
  process(data) {
    return this.prefix + data;
  }
}

const processor = new DataProcessor('Processed: ');
const contextResult = utils.tryCatch(processor.process, processor, ['test data']);

if (contextResult !== utils.errorObject) {
  console.log(contextResult); // 'Processed: test data'
}

Redis Client Management

Utilities for checking Redis client state and readiness.

/**
 * Check if Redis client is ready for operations
 * @param client - Redis client instance to check
 * @returns Boolean indicating if client is ready
 */
isRedisReady(client: Redis.Redis): boolean;

Usage Examples:

const Queue = require('bull');
const { utils } = require('bull');

const queue = new Queue('test queue');

// Wait for queue to be ready and check Redis client
await queue.isReady();

if (utils.isRedisReady(queue.client)) {
  console.log('Redis client is ready for operations');
  
  // Safe to perform Redis operations
  const info = await queue.client.info();
  console.log('Redis info:', info);
} else {
  console.log('Redis client is not ready yet');
}

// Check multiple clients
const clients = queue.clients;
const readyClients = clients.filter(utils.isRedisReady);
console.log(`${readyClients.length}/${clients.length} clients are ready`);

Safe Event Emission

Utility for safe event emission with error handling.

/**
 * Safely emit events with error handling
 * @param emitter - EventEmitter instance
 * @param event - Event name to emit
 * @param args - Arguments to pass to event listeners
 */
emitSafe(emitter: EventEmitter, event: string, ...args: any[]): void;

Usage Examples:

const { EventEmitter } = require('events');
const { utils } = require('bull');

const eventEmitter = new EventEmitter();

// Add listener that might throw
eventEmitter.on('risky-event', (data) => {
  if (!data.valid) {
    throw new Error('Invalid data received');
  }
  console.log('Processing:', data);
});

// Emit safely - won't crash if listener throws
utils.emitSafe(eventEmitter, 'risky-event', { valid: false });
console.log('Application continues running despite listener error');

// Normal emit for comparison (would crash)
// eventEmitter.emit('risky-event', { valid: false }); // This would throw

Metrics Time Constants

Predefined time constants for metrics collection and job scheduling.

/**
 * Time constants for metrics and scheduling (in minutes)
 */
MetricsTime: {
  /** 1 minute */
  ONE_MINUTE: 1;
  /** 5 minutes */
  FIVE_MINUTES: 5;
  /** 15 minutes */
  FIFTEEN_MINUTES: 15;
  /** 30 minutes */
  THIRTY_MINUTES: 30;
  /** 1 hour (60 minutes) */
  ONE_HOUR: 60;
  /** 1 week (10080 minutes) */
  ONE_WEEK: 10080;
  /** 2 weeks (20160 minutes) */
  TWO_WEEKS: 20160;
  /** 1 month (40320 minutes) */
  ONE_MONTH: 40320;
};

Usage Examples:

const Queue = require('bull');
const { utils } = require('bull');

const taskQueue = new Queue('scheduled tasks');

// Use time constants for job scheduling
await taskQueue.add('hourly-report', 
  { reportType: 'hourly' },
  { 
    repeat: { 
      every: utils.MetricsTime.ONE_HOUR * 60 * 1000 // Convert to milliseconds
    }
  }
);

await taskQueue.add('daily-cleanup',
  { task: 'cleanup' },
  {
    repeat: {
      every: utils.MetricsTime.ONE_HOUR * 24 * 60 * 1000 // 24 hours in milliseconds
    }
  }
);

// Use with metrics configuration
const metricsQueue = new Queue('metrics queue', {
  metrics: {
    maxDataPoints: utils.MetricsTime.ONE_WEEK // Store 1 week of data points
  }
});

// Use for retention policies
const retentionTime = utils.MetricsTime.TWO_WEEKS * 60 * 1000; // Convert to milliseconds

await taskQueue.add('data-retention',
  { retentionPolicy: 'two-weeks' },
  {
    removeOnComplete: 10,
    removeOnFail: 5,
    delay: retentionTime
  }
);

// Available constants reference
console.log('Available time constants:');
Object.entries(utils.MetricsTime).forEach(([name, minutes]) => {
  const hours = minutes / 60;
  const days = hours / 24;
  console.log(`${name}: ${minutes} minutes (${hours} hours, ${days.toFixed(1)} days)`);
});

Utility Integration Examples

Combine multiple utilities for robust queue management.

const Queue = require('bull');
const { utils } = require('bull');

class RobustQueueManager {
  constructor(queueName, redisOptions = {}) {
    this.queue = new Queue(queueName, redisOptions);
    this.setupSafeEventHandlers();
  }
  
  async initialize() {
    await this.queue.isReady();
    
    if (!utils.isRedisReady(this.queue.client)) {
      throw new Error('Redis client is not ready');
    }
    
    console.log('Queue manager initialized successfully');
  }
  
  setupSafeEventHandlers() {
    // Use safe event emission for all handlers
    this.queue.on('completed', (job, result) => {
      const processResult = utils.tryCatch(this.handleJobCompletion, this, [job, result]);
      
      if (processResult === utils.errorObject) {
        console.error('Error handling job completion:', utils.errorObject.value);
      }
    });
    
    this.queue.on('failed', (job, error) => {
      const processResult = utils.tryCatch(this.handleJobFailure, this, [job, error]);
      
      if (processResult === utils.errorObject) {
        console.error('Error handling job failure:', utils.errorObject.value);
      }
    });
  }
  
  handleJobCompletion(job, result) {
    // This method might throw, but it's called safely
    console.log(`Job ${job.id} completed:`, result);
    
    // Emit custom events safely
    utils.emitSafe(this, 'job-processed', {
      jobId: job.id,
      success: true,
      result: result,
      processingTime: job.finishedOn - job.processedOn
    });
  }
  
  handleJobFailure(job, error) {
    console.error(`Job ${job.id} failed:`, error.message);
    
    // Emit failure event safely
    utils.emitSafe(this, 'job-processed', {
      jobId: job.id,
      success: false,
      error: error.message,
      attemptsMade: job.attemptsMade
    });
  }
  
  async scheduleMaintenanceJob() {
    // Use time constants for scheduling
    const maintenanceInterval = utils.MetricsTime.ONE_HOUR * 60 * 1000; // 1 hour in ms
    
    return this.queue.add('maintenance',
      { task: 'cleanup' },
      {
        repeat: { every: maintenanceInterval },
        removeOnComplete: 5,
        removeOnFail: 2
      }
    );
  }
}

// Usage
const queueManager = new RobustQueueManager('robust-queue');

queueManager.on('job-processed', (data) => {
  if (data.success) {
    console.log(`✅ Job ${data.jobId} processed successfully in ${data.processingTime}ms`);
  } else {
    console.log(`❌ Job ${data.jobId} failed: ${data.error} (attempt ${data.attemptsMade})`);
  }
});

await queueManager.initialize();
await queueManager.scheduleMaintenanceJob();