CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bull

Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

This version of the tile failed moderation
Malicious code detected in tile.json: This tile claims to describe 'pkg:npm/bull@4.16.5' but the tile itself is named 'tessl/npm-bull' with version '4.16.2'. This is a potential supply chain attack pattern (typosquatting/dependency confusion). The legitimate 'bull' package is maintained by OptimalBits, not 'tessl'. The version mismatch (tile 4.16.2 claiming to describe 4.16.5) and the unofficial namespace suggest this could be an attempt to impersonate the legitimate bull package to intercept installations or inject malicious code.
Overview
Eval results
Files

event-system.mddocs/

Event System

Comprehensive event-driven architecture for real-time job lifecycle monitoring and queue state changes.

Capabilities

Job Lifecycle Events

Monitor individual job state transitions and processing events.

/**
 * Job becomes waiting to be processed
 * @param event - 'waiting'
 * @param callback - Function called with job ID
 */
on(event: 'waiting', callback: (jobId: string | number) => void): Queue;

/**
 * Job starts processing
 * @param event - 'active'  
 * @param callback - Function called with job and optional job promise
 */
on(event: 'active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;

/**
 * Job completes successfully
 * @param event - 'completed'
 * @param callback - Function called with job and result
 */
on(event: 'completed', callback: (job: Job, result: any) => void): Queue;

/**
 * Job fails after processing
 * @param event - 'failed'
 * @param callback - Function called with job and error
 */
on(event: 'failed', callback: (job: Job, error: Error) => void): Queue;

/**
 * Job progress is updated
 * @param event - 'progress'
 * @param callback - Function called with job and progress value
 */
on(event: 'progress', callback: (job: Job, progress: any) => void): Queue;

/**
 * Job becomes stalled (processing too long)
 * @param event - 'stalled'
 * @param callback - Function called with stalled job
 */
on(event: 'stalled', callback: (job: Job) => void): Queue;

/**
 * Job is removed from queue
 * @param event - 'removed'
 * @param callback - Function called with removed job
 */
on(event: 'removed', callback: (job: Job) => void): Queue;

/**
 * Job was duplicated/deduplicated (not processed due to existing duplicate)
 * @param event - 'duplicated'
 * @param callback - Function called with duplicated job ID
 */
on(event: 'duplicated', callback: (jobId: string | number) => void): Queue;

/**
 * Job was debounced (delayed due to debounce settings)
 * @param event - 'debounced'
 * @param callback - Function called with debounced job ID
 */
on(event: 'debounced', callback: (jobId: string | number) => void): Queue;

/**
 * Job lock extension failed during processing
 * @param event - 'lock-extension-failed'
 * @param callback - Function called with job and lock extension error
 */
on(event: 'lock-extension-failed', callback: (job: Job, error: Error) => void): Queue;

interface JobPromise {
  /** Cancel the running job */
  cancel(): void;
}

Usage Examples:

const Queue = require('bull');
const taskQueue = new Queue('event monitoring');

// Monitor job lifecycle
taskQueue.on('waiting', (jobId) => {
  console.log(`Job ${jobId} is waiting to be processed`);
});

taskQueue.on('active', (job, jobPromise) => {
  console.log(`Job ${job.id} (${job.name}) started processing`);
  console.log('Job data:', job.data);
  
  // Optionally cancel job after timeout
  setTimeout(() => {
    if (jobPromise) {
      jobPromise.cancel();
      console.log(`Cancelled job ${job.id} due to timeout`);
    }
  }, 60000); // Cancel after 1 minute
});

taskQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed successfully`);
  console.log('Result:', result);
  
  // Log completion metrics
  const processingTime = job.finishedOn - job.processedOn;
  console.log(`Processing time: ${processingTime}ms`);
});

taskQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
  console.error(`Attempts made: ${job.attemptsMade}/${job.opts.attempts}`);
  
  // Send alert for important job failures
  if (job.data.priority === 'critical') {
    sendAlert(`Critical job ${job.id} failed: ${err.message}`);
  }
});

taskQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

taskQueue.on('stalled', (job) => {
  console.warn(`Job ${job.id} stalled and will be retried`);
});

Queue State Events

Monitor overall queue state changes and management operations.

/**
 * Queue is paused
 * @param event - 'paused'
 * @param callback - Function called when queue is paused
 */
on(event: 'paused', callback: () => void): Queue;

/**
 * Queue is resumed
 * @param event - 'resumed'
 * @param callback - Function called when queue is resumed
 */
on(event: 'resumed', callback: () => void): Queue;

/**
 * Queue has processed all waiting jobs
 * @param event - 'drained'
 * @param callback - Function called when queue is drained
 */
on(event: 'drained', callback: () => void): Queue;

/**
 * Old jobs have been cleaned from queue
 * @param event - 'cleaned'
 * @param callback - Function called with cleaned jobs and their status
 */
on(event: 'cleaned', callback: (jobs: Job[], status: JobStatusClean) => void): Queue;

type JobStatusClean = 'completed' | 'wait' | 'active' | 'delayed' | 'failed' | 'paused';

Usage Examples:

// Monitor queue state changes
taskQueue.on('paused', () => {
  console.log('Queue has been paused');
  sendNotification('Task queue paused - no new jobs will be processed');
});

taskQueue.on('resumed', () => {
  console.log('Queue has been resumed');
  sendNotification('Task queue resumed - processing continuing');
});

taskQueue.on('drained', () => {
  console.log('Queue is drained - all waiting jobs have been processed');
  
  // Opportunity to perform maintenance or shutdown
  performMaintenanceTasks();
});

taskQueue.on('cleaned', (jobs, status) => {
  console.log(`Cleaned ${jobs.length} ${status} jobs from queue`);
});

Global Events

Monitor job and queue events across all workers and queue instances using global event variants.

/**
 * Global variants of job lifecycle events (fired across all workers)
 * These events are published to all queue instances, not just the local one
 */
on(event: 'global:waiting', callback: (jobId: string | number) => void): Queue;
on(event: 'global:active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
on(event: 'global:completed', callback: (job: Job, result: any) => void): Queue;
on(event: 'global:failed', callback: (job: Job, error: Error) => void): Queue;
on(event: 'global:progress', callback: (job: Job, progress: any) => void): Queue;
on(event: 'global:stalled', callback: (job: Job) => void): Queue;
on(event: 'global:duplicated', callback: (jobId: string | number) => void): Queue;
on(event: 'global:debounced', callback: (jobId: string | number) => void): Queue;

/**
 * Global queue state events
 */
on(event: 'global:paused', callback: () => void): Queue;
on(event: 'global:resumed', callback: () => void): Queue;
on(event: 'global:drained', callback: () => void): Queue;

Usage Examples:

// Monitor events across all workers
taskQueue.on('global:completed', (job, result) => {
  console.log(`Job ${job.id} completed on any worker`);
  
  // Update shared metrics or database
  updateGlobalMetrics('completed', job, result);
});

taskQueue.on('global:failed', (job, error) => {
  console.log(`Job ${job.id} failed on any worker: ${error.message}`);
  
  // Send notifications that work across instances
  notifyFailure(job, error);
});

// Monitor queue state across all instances
taskQueue.on('global:paused', () => {
  console.log('Queue paused globally across all workers');
});

taskQueue.on('global:drained', () => {
  console.log('All queues are drained across all workers');
  // Safe to perform system maintenance
  scheduleMaintenanceTask();
});

// Track duplicated jobs globally
taskQueue.on('global:duplicated', (jobId) => {
  console.log(`Job ${jobId} was duplicated and skipped globally`);
});

Error Handling Events

Handle queue-level errors and system issues.

/**
 * Error occurred in queue operations
 * @param event - 'error'
 * @param callback - Function called with error details
 */
on(event: 'error', callback: (error: Error) => void): Queue;

Usage Examples:

// Handle queue errors
taskQueue.on('error', (error) => {
  console.error('Queue error occurred:', error.message);
  console.error('Stack trace:', error.stack);
  
  // Send critical alert
  sendCriticalAlert(`Queue error: ${error.message}`);
  
  // Log error for analysis
  logError('queue_error', {
    queue: taskQueue.name,
    error: error.message,
    stack: error.stack,
    timestamp: new Date().toISOString()
  });
});

Event-Driven Job Processing

Use events for advanced job processing patterns.

// Example: Rate limiting based on events
let activeJobs = 0;
const MAX_CONCURRENT_JOBS = 5;

taskQueue.on('active', (job) => {
  activeJobs++;
  console.log(`Active jobs: ${activeJobs}/${MAX_CONCURRENT_JOBS}`);
  
  if (activeJobs >= MAX_CONCURRENT_JOBS) {
    console.log('Maximum concurrent jobs reached');
  }
});

taskQueue.on('completed', (job) => {
  activeJobs--;
  console.log(`Job completed. Active jobs: ${activeJobs}`);
});

taskQueue.on('failed', (job) => {
  activeJobs--;
  console.log(`Job failed. Active jobs: ${activeJobs}`);
});

// Example: Job dependency tracking
const jobDependencies = new Map();

taskQueue.on('completed', (job, result) => {
  // Check if other jobs were waiting for this one
  const dependentJobs = jobDependencies.get(job.id);
  if (dependentJobs) {
    console.log(`Job ${job.id} completed, triggering ${dependentJobs.length} dependent jobs`);
    
    dependentJobs.forEach(async (dependentJobData) => {
      await taskQueue.add(dependentJobData.name, {
        ...dependentJobData.data,
        parentResult: result
      });
    });
    
    jobDependencies.delete(job.id);
  }
});

Event-Based Metrics Collection

Collect detailed metrics using events.

// Metrics collection using events
class QueueMetrics {
  constructor(queue) {
    this.queue = queue;
    this.metrics = {
      completed: 0,
      failed: 0,
      totalProcessingTime: 0,
      averageProcessingTime: 0,
      errorsByType: new Map(),
      jobsByName: new Map()
    };
    
    this.setupEventListeners();
  }
  
  setupEventListeners() {
    this.queue.on('completed', (job, result) => {
      this.metrics.completed++;
      
      const processingTime = job.finishedOn - job.processedOn;
      this.metrics.totalProcessingTime += processingTime;
      this.metrics.averageProcessingTime = 
        this.metrics.totalProcessingTime / this.metrics.completed;
      
      // Track by job name
      const nameCount = this.metrics.jobsByName.get(job.name) || 0;
      this.metrics.jobsByName.set(job.name, nameCount + 1);
      
      console.log(`Avg processing time: ${this.metrics.averageProcessingTime.toFixed(2)}ms`);
    });
    
    this.queue.on('failed', (job, error) => {
      this.metrics.failed++;
      
      // Track error types
      const errorType = error.name || 'Unknown';
      const errorCount = this.metrics.errorsByType.get(errorType) || 0;
      this.metrics.errorsByType.set(errorType, errorCount + 1);
      
      const successRate = this.metrics.completed / 
        (this.metrics.completed + this.metrics.failed) * 100;
      console.log(`Success rate: ${successRate.toFixed(2)}%`);
    });
    
    this.queue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled - possible worker issue`);
    });
  }
  
  getReport() {
    return {
      summary: {
        completed: this.metrics.completed,
        failed: this.metrics.failed,
        successRate: this.metrics.completed / 
          (this.metrics.completed + this.metrics.failed) * 100,
        averageProcessingTime: this.metrics.averageProcessingTime
      },
      errorsByType: Object.fromEntries(this.metrics.errorsByType),
      jobsByName: Object.fromEntries(this.metrics.jobsByName)
    };
  }
}

// Use metrics collector
const metrics = new QueueMetrics(taskQueue);

// Generate report periodically
setInterval(() => {
  const report = metrics.getReport();
  console.log('Queue Metrics Report:', JSON.stringify(report, null, 2));
}, 60000); // Every minute

Event-Based Job Monitoring Dashboard

Create real-time monitoring using events.

class QueueDashboard {
  constructor(queue) {
    this.queue = queue;
    this.state = {
      activeJobs: new Map(),
      recentCompleted: [],
      recentFailed: [],
      queueState: 'active'
    };
    
    this.setupEventListeners();
    this.startPeriodicReport();
  }
  
  setupEventListeners() {
    this.queue.on('active', (job) => {
      this.state.activeJobs.set(job.id, {
        id: job.id,
        name: job.name,
        startTime: Date.now(),
        progress: 0
      });
      this.updateDisplay();
    });
    
    this.queue.on('progress', (job, progress) => {
      const activeJob = this.state.activeJobs.get(job.id);
      if (activeJob) {
        activeJob.progress = progress;
        this.updateDisplay();
      }
    });
    
    this.queue.on('completed', (job, result) => {
      this.state.activeJobs.delete(job.id);
      this.state.recentCompleted.unshift({
        id: job.id,
        name: job.name,
        completedAt: Date.now(),
        result: result
      });
      
      // Keep only last 10 completed jobs
      this.state.recentCompleted = this.state.recentCompleted.slice(0, 10);
      this.updateDisplay();
    });
    
    this.queue.on('failed', (job, error) => {
      this.state.activeJobs.delete(job.id);
      this.state.recentFailed.unshift({
        id: job.id,
        name: job.name,
        failedAt: Date.now(),
        error: error.message
      });
      
      // Keep only last 10 failed jobs
      this.state.recentFailed = this.state.recentFailed.slice(0, 10);
      this.updateDisplay();
    });
    
    this.queue.on('paused', () => {
      this.state.queueState = 'paused';
      this.updateDisplay();
    });
    
    this.queue.on('resumed', () => {
      this.state.queueState = 'active';
      this.updateDisplay();
    });
    
    this.queue.on('drained', () => {
      console.log('\n🎉 Queue is drained - all jobs processed!');
    });
  }
  
  updateDisplay() {
    console.clear();
    console.log(`\n=== Queue Dashboard: ${this.queue.name} ===`);
    console.log(`Status: ${this.state.queueState.toUpperCase()}`);
    
    console.log(`\nActive Jobs (${this.state.activeJobs.size}):`);
    this.state.activeJobs.forEach(job => {
      const runningTime = Date.now() - job.startTime;
      console.log(`  ${job.id}: ${job.name} (${job.progress}%) - ${Math.round(runningTime/1000)}s`);
    });
    
    if (this.state.recentCompleted.length > 0) {
      console.log(`\nRecent Completed Jobs:`);
      this.state.recentCompleted.slice(0, 5).forEach(job => {
        const timeAgo = Date.now() - job.completedAt;
        console.log(`  ✅ ${job.id}: ${job.name} (${Math.round(timeAgo/1000)}s ago)`);
      });
    }
    
    if (this.state.recentFailed.length > 0) {
      console.log(`\nRecent Failed Jobs:`);
      this.state.recentFailed.slice(0, 3).forEach(job => {
        const timeAgo = Date.now() - job.failedAt;
        console.log(`  ❌ ${job.id}: ${job.name} - ${job.error} (${Math.round(timeAgo/1000)}s ago)`);
      });
    }
    
    console.log(`\n${'='.repeat(50)}`);
  }
  
  startPeriodicReport() {
    // Update display every 2 seconds if there are active jobs
    setInterval(() => {
      if (this.state.activeJobs.size > 0) {
        this.updateDisplay();
      }
    }, 2000);
  }
}

// Use dashboard
const dashboard = new QueueDashboard(taskQueue);

Event Handler Best Practices

// Example of robust event handling with error recovery
function setupRobustEventHandlers(queue) {
  // Wrap event handlers in try-catch to prevent crashes
  queue.on('completed', (job, result) => {
    try {
      logJobCompletion(job, result);
      updateMetrics('completed', job);
      notifyJobCompletion(job, result);
    } catch (error) {
      console.error('Error in completed handler:', error);
    }
  });
  
  queue.on('failed', (job, error) => {
    try {
      logJobFailure(job, error);
      updateMetrics('failed', job);
      
      // Conditional alerting
      if (job.opts.attempts && job.attemptsMade >= job.opts.attempts) {
        sendFailureAlert(job, error);
      }
    } catch (handlerError) {
      console.error('Error in failed handler:', handlerError);
    }
  });
  
  // Remove event listeners when shutting down
  process.on('SIGTERM', () => {
    console.log('Removing event listeners...');
    queue.removeAllListeners();
    queue.close();
  });
}

// Use robust handlers
setupRobustEventHandlers(taskQueue);

Install with Tessl CLI

npx tessl i tessl/npm-bull

docs

event-system.md

index.md

job-creation.md

job-processing.md

job-state.md

queue-management.md

queue-monitoring.md

repeatable-jobs.md

utils.md

tile.json