CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bull

Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

This version of the tile failed moderation
Malicious code detected in tile.json: This tile claims to describe 'pkg:npm/bull@4.16.5' but the tile itself is named 'tessl/npm-bull' with version '4.16.2'. This is a potential supply chain attack pattern (typosquatting/dependency confusion). The legitimate 'bull' package is maintained by OptimalBits, not 'tessl'. The version mismatch (tile 4.16.2 claiming to describe 4.16.5) and the unofficial namespace suggest this could be an attempt to impersonate the legitimate bull package to intercept installations or inject malicious code.
Overview
Eval results
Files

utils.mddocs/

Utility Functions

Bull provides utility functions for safe error handling, Redis client management, and metrics constants through the utils module.

Core Imports

const Queue = require('bull');
const { utils } = Queue;

// Or access directly
const { utils } = require('bull');

Capabilities

Safe Function Execution

Utilities for safe function execution with error handling.

/**
 * Safe function execution with error handling
 * @param fn - Function to execute safely
 * @param ctx - Context object for function execution
 * @param args - Arguments to pass to the function
 * @returns Result of function execution or error object
 */
tryCatch(fn: Function, ctx?: any, args?: any[]): any;

/**
 * Error object used by tryCatch for error states
 */
errorObject: {
  value: Error | null;
};

Usage Examples:

const { utils } = require('bull');

// Safe function execution
function riskyOperation(data) {
  if (!data) {
    throw new Error('Data is required');
  }
  return data.toUpperCase();
}

// Execute safely
const result = utils.tryCatch(riskyOperation, null, ['hello world']);

if (result === utils.errorObject) {
  console.error('Function failed:', utils.errorObject.value.message);
} else {
  console.log('Result:', result); // 'HELLO WORLD'
}

// Safe execution with context
class DataProcessor {
  constructor(prefix) {
    this.prefix = prefix;
  }
  
  process(data) {
    return this.prefix + data;
  }
}

const processor = new DataProcessor('Processed: ');
const contextResult = utils.tryCatch(processor.process, processor, ['test data']);

if (contextResult !== utils.errorObject) {
  console.log(contextResult); // 'Processed: test data'
}

Redis Client Management

Utilities for checking Redis client state and readiness.

/**
 * Check if Redis client is ready for operations
 * @param client - Redis client instance to check
 * @returns Boolean indicating if client is ready
 */
isRedisReady(client: Redis.Redis): boolean;

Usage Examples:

const Queue = require('bull');
const { utils } = require('bull');

const queue = new Queue('test queue');

// Wait for queue to be ready and check Redis client
await queue.isReady();

if (utils.isRedisReady(queue.client)) {
  console.log('Redis client is ready for operations');
  
  // Safe to perform Redis operations
  const info = await queue.client.info();
  console.log('Redis info:', info);
} else {
  console.log('Redis client is not ready yet');
}

// Check multiple clients
const clients = queue.clients;
const readyClients = clients.filter(utils.isRedisReady);
console.log(`${readyClients.length}/${clients.length} clients are ready`);

Safe Event Emission

Utility for safe event emission with error handling.

/**
 * Safely emit events with error handling
 * @param emitter - EventEmitter instance
 * @param event - Event name to emit
 * @param args - Arguments to pass to event listeners
 */
emitSafe(emitter: EventEmitter, event: string, ...args: any[]): void;

Usage Examples:

const { EventEmitter } = require('events');
const { utils } = require('bull');

const eventEmitter = new EventEmitter();

// Add listener that might throw
eventEmitter.on('risky-event', (data) => {
  if (!data.valid) {
    throw new Error('Invalid data received');
  }
  console.log('Processing:', data);
});

// Emit safely - won't crash if listener throws
utils.emitSafe(eventEmitter, 'risky-event', { valid: false });
console.log('Application continues running despite listener error');

// Normal emit for comparison (would crash)
// eventEmitter.emit('risky-event', { valid: false }); // This would throw

Metrics Time Constants

Predefined time constants for metrics collection and job scheduling.

/**
 * Time constants for metrics and scheduling (in minutes)
 */
MetricsTime: {
  /** 1 minute */
  ONE_MINUTE: 1;
  /** 5 minutes */
  FIVE_MINUTES: 5;
  /** 15 minutes */
  FIFTEEN_MINUTES: 15;
  /** 30 minutes */
  THIRTY_MINUTES: 30;
  /** 1 hour (60 minutes) */
  ONE_HOUR: 60;
  /** 1 week (10080 minutes) */
  ONE_WEEK: 10080;
  /** 2 weeks (20160 minutes) */
  TWO_WEEKS: 20160;
  /** 1 month (40320 minutes) */
  ONE_MONTH: 40320;
};

Usage Examples:

const Queue = require('bull');
const { utils } = require('bull');

const taskQueue = new Queue('scheduled tasks');

// Use time constants for job scheduling
await taskQueue.add('hourly-report', 
  { reportType: 'hourly' },
  { 
    repeat: { 
      every: utils.MetricsTime.ONE_HOUR * 60 * 1000 // Convert to milliseconds
    }
  }
);

await taskQueue.add('daily-cleanup',
  { task: 'cleanup' },
  {
    repeat: {
      every: utils.MetricsTime.ONE_HOUR * 24 * 60 * 1000 // 24 hours in milliseconds
    }
  }
);

// Use with metrics configuration
const metricsQueue = new Queue('metrics queue', {
  metrics: {
    maxDataPoints: utils.MetricsTime.ONE_WEEK // Store 1 week of data points
  }
});

// Use for retention policies
const retentionTime = utils.MetricsTime.TWO_WEEKS * 60 * 1000; // Convert to milliseconds

await taskQueue.add('data-retention',
  { retentionPolicy: 'two-weeks' },
  {
    removeOnComplete: 10,
    removeOnFail: 5,
    delay: retentionTime
  }
);

// Available constants reference
console.log('Available time constants:');
Object.entries(utils.MetricsTime).forEach(([name, minutes]) => {
  const hours = minutes / 60;
  const days = hours / 24;
  console.log(`${name}: ${minutes} minutes (${hours} hours, ${days.toFixed(1)} days)`);
});

Utility Integration Examples

Combine multiple utilities for robust queue management.

const Queue = require('bull');
const { utils } = require('bull');

class RobustQueueManager {
  constructor(queueName, redisOptions = {}) {
    this.queue = new Queue(queueName, redisOptions);
    this.setupSafeEventHandlers();
  }
  
  async initialize() {
    await this.queue.isReady();
    
    if (!utils.isRedisReady(this.queue.client)) {
      throw new Error('Redis client is not ready');
    }
    
    console.log('Queue manager initialized successfully');
  }
  
  setupSafeEventHandlers() {
    // Use safe event emission for all handlers
    this.queue.on('completed', (job, result) => {
      const processResult = utils.tryCatch(this.handleJobCompletion, this, [job, result]);
      
      if (processResult === utils.errorObject) {
        console.error('Error handling job completion:', utils.errorObject.value);
      }
    });
    
    this.queue.on('failed', (job, error) => {
      const processResult = utils.tryCatch(this.handleJobFailure, this, [job, error]);
      
      if (processResult === utils.errorObject) {
        console.error('Error handling job failure:', utils.errorObject.value);
      }
    });
  }
  
  handleJobCompletion(job, result) {
    // This method might throw, but it's called safely
    console.log(`Job ${job.id} completed:`, result);
    
    // Emit custom events safely
    utils.emitSafe(this, 'job-processed', {
      jobId: job.id,
      success: true,
      result: result,
      processingTime: job.finishedOn - job.processedOn
    });
  }
  
  handleJobFailure(job, error) {
    console.error(`Job ${job.id} failed:`, error.message);
    
    // Emit failure event safely
    utils.emitSafe(this, 'job-processed', {
      jobId: job.id,
      success: false,
      error: error.message,
      attemptsMade: job.attemptsMade
    });
  }
  
  async scheduleMaintenanceJob() {
    // Use time constants for scheduling
    const maintenanceInterval = utils.MetricsTime.ONE_HOUR * 60 * 1000; // 1 hour in ms
    
    return this.queue.add('maintenance',
      { task: 'cleanup' },
      {
        repeat: { every: maintenanceInterval },
        removeOnComplete: 5,
        removeOnFail: 2
      }
    );
  }
}

// Usage
const queueManager = new RobustQueueManager('robust-queue');

queueManager.on('job-processed', (data) => {
  if (data.success) {
    console.log(`✅ Job ${data.jobId} processed successfully in ${data.processingTime}ms`);
  } else {
    console.log(`❌ Job ${data.jobId} failed: ${data.error} (attempt ${data.attemptsMade})`);
  }
});

await queueManager.initialize();
await queueManager.scheduleMaintenanceJob();

Install with Tessl CLI

npx tessl i tessl/npm-bull

docs

event-system.md

index.md

job-creation.md

job-processing.md

job-state.md

queue-management.md

queue-monitoring.md

repeatable-jobs.md

utils.md

tile.json