CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bull

Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.

Pending
This version of the tile failed moderation
Malicious code detected in tile.json: This tile claims to describe 'pkg:npm/bull@4.16.5' but the tile itself is named 'tessl/npm-bull' with version '4.16.2'. This is a potential supply chain attack pattern (typosquatting/dependency confusion). The legitimate 'bull' package is maintained by OptimalBits, not 'tessl'. The version mismatch (tile 4.16.2 claiming to describe 4.16.5) and the unofficial namespace suggest this could be an attempt to impersonate the legitimate bull package to intercept installations or inject malicious code.
Overview
Eval results
Files

job-processing.mddocs/

Job Processing

Define job processors with concurrency control, named job handling, and error management.

Capabilities

Basic Job Processing

Define how jobs are processed in the queue.

/**
 * Define a job processor function for all jobs in the queue
 * @param callback - Function to process jobs (callback or promise-based)
 * @returns Promise that resolves when processor is registered
 */
process(callback: ProcessCallbackFunction): Promise<void>;
process(callback: ProcessPromiseFunction): Promise<void>;
process(callback: string): Promise<void>; // Path to processor file

type ProcessCallbackFunction<T> = (job: Job<T>, done: DoneCallback) => void;
type ProcessPromiseFunction<T> = (job: Job<T>) => Promise<any>;
type DoneCallback = (error?: Error | null, value?: any) => void;

Usage Examples:

const Queue = require('bull');
const emailQueue = new Queue('email processing');

// Promise-based processor
emailQueue.process(async (job) => {
  const { email, name } = job.data;
  console.log(`Processing email for ${name}`);
  
  // Simulate async work
  await sendEmail(email, name);
  
  return { sent: true, timestamp: Date.now() };
});

// Callback-based processor
emailQueue.process((job, done) => {
  const { email, name } = job.data;
  
  sendEmail(email, name, (err, result) => {
    if (err) {
      return done(err);
    }
    done(null, { sent: true, result });
  });
});

// External processor file
emailQueue.process('./processors/emailProcessor.js');

Concurrent Processing

Process multiple jobs simultaneously with concurrency control.

/**
 * Define a job processor with concurrency limit
 * @param concurrency - Maximum number of jobs to process simultaneously
 * @param callback - Function to process jobs
 * @returns Promise that resolves when processor is registered
 */
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(concurrency: number, callback: string): Promise<void>;

Usage Examples:

// Process up to 5 jobs simultaneously
emailQueue.process(5, async (job) => {
  console.log(`Processing job ${job.id} with data:`, job.data);
  await processEmailJob(job.data);
  return { processed: true };
});

// Concurrent processing with callback
emailQueue.process(3, (job, done) => {
  processJob(job.data, done);
});

Named Job Processing

Process specific types of jobs by name.

/**
 * Define a processor for jobs with a specific name
 * @param name - Job name to process
 * @param callback - Function to process named jobs
 * @returns Promise that resolves when processor is registered
 */
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, callback: string): Promise<void>;

Usage Examples:

const Queue = require('bull');
const taskQueue = new Queue('task processing');

// Process only 'welcome-email' jobs
taskQueue.process('welcome-email', async (job) => {
  const { email, name } = job.data;
  await sendWelcomeEmail(email, name);
  return { emailSent: true };
});

// Process only 'generate-report' jobs  
taskQueue.process('generate-report', async (job) => {
  const { reportType, userId } = job.data;
  const report = await generateReport(reportType, userId);
  return { reportId: report.id };
});

// You can have multiple processors for different job types
taskQueue.process('send-notification', async (job) => {
  await sendNotification(job.data);
  return { notificationSent: true };
});

Named Job Processing with Concurrency

Combine named job processing with concurrency control.

/**
 * Define a processor for named jobs with concurrency limit
 * @param name - Job name to process
 * @param concurrency - Maximum concurrent jobs of this type
 * @param callback - Function to process jobs
 * @returns Promise that resolves when processor is registered
 */
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, concurrency: number, callback: string): Promise<void>;

Usage Examples:

// Process up to 2 'image-resize' jobs concurrently
taskQueue.process('image-resize', 2, async (job) => {
  const { imagePath, width, height } = job.data;
  const resizedPath = await resizeImage(imagePath, width, height);
  return { resizedPath };
});

// Process up to 10 'email-send' jobs concurrently
taskQueue.process('email-send', 10, async (job) => {
  await sendEmail(job.data);
  return { sent: true };
});

Job Progress Tracking

Track and report job progress during processing.

// Progress tracking is done within job processors
// Access via job.progress() method in processors

Usage Examples:

taskQueue.process('large-task', async (job) => {
  const { items } = job.data;
  
  for (let i = 0; i < items.length; i++) {
    await processItem(items[i]);
    
    // Update progress (0-100 or any value)
    const progress = Math.round((i + 1) / items.length * 100);
    await job.progress(progress);
  }
  
  return { processed: items.length };
});

// Listen for progress updates
taskQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} is ${progress}% complete`);
});

Error Handling in Processors

Handle errors and job failures in processors.

// Error handling patterns for different processor types

Usage Examples:

// Promise-based error handling
taskQueue.process(async (job) => {
  try {
    const result = await riskyOperation(job.data);
    return result;
  } catch (error) {
    // Error will be caught by Bull and job marked as failed
    throw new Error(`Processing failed: ${error.message}`);
  }
});

// Callback-based error handling
taskQueue.process((job, done) => {
  riskyOperation(job.data, (err, result) => {
    if (err) {
      // Pass error to done callback
      return done(new Error(`Processing failed: ${err.message}`));
    }
    done(null, result);
  });
});

// Custom error handling with job methods
taskQueue.process(async (job) => {
  try {
    await job.progress(10);
    const data = await fetchData(job.data.url);
    
    await job.progress(50);
    const processed = await processData(data);
    
    await job.progress(100);
    return processed;
  } catch (error) {
    // Log error details
    await job.log(`Error occurred: ${error.message}`);
    throw error;
  }
});

Processor Lifecycle and Events

Monitor processor lifecycle through events.

// Events are emitted during job processing lifecycle
// See Event System documentation for complete event handling

Usage Examples:

taskQueue.process(async (job) => {
  await processJob(job.data);
  return { success: true };
});

// Monitor processing events
taskQueue.on('active', (job) => {
  console.log(`Job ${job.id} started processing`);
});

taskQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

taskQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
});

taskQueue.on('stalled', (job) => {
  console.warn(`Job ${job.id} stalled and will be retried`);
});

External Processor Files

Use external files for job processors.

Usage Examples:

// In your main file
const taskQueue = new Queue('tasks');

// Use external processor file
taskQueue.process('./processors/taskProcessor.js');

// Use external processor with concurrency
taskQueue.process(5, './processors/concurrentProcessor.js');

// Use external processor for named jobs
taskQueue.process('specific-task', './processors/specificTaskProcessor.js');
// In processors/taskProcessor.js
module.exports = async function(job) {
  const { taskType, data } = job.data;
  
  console.log(`Processing ${taskType} with data:`, data);
  
  // Update progress
  await job.progress(50);
  
  // Process the task
  const result = await performTask(taskType, data);
  
  await job.progress(100);
  
  return result;
};

Job Processing Best Practices

// Example of robust job processor with best practices
taskQueue.process('robust-task', 3, async (job) => {
  const startTime = Date.now();
  
  try {
    // Validate job data
    if (!job.data || !job.data.required_field) {
      throw new Error('Invalid job data: missing required_field');
    }
    
    // Log job start
    await job.log(`Started processing at ${new Date().toISOString()}`);
    
    // Update progress at key points
    await job.progress(10);
    
    // Perform work with proper error handling
    const step1 = await performStep1(job.data);
    await job.progress(30);
    
    const step2 = await performStep2(step1);
    await job.progress(60);
    
    const result = await performStep3(step2);
    await job.progress(100);
    
    // Log completion
    const duration = Date.now() - startTime;
    await job.log(`Completed in ${duration}ms`);
    
    return {
      success: true,
      result,
      duration
    };
    
  } catch (error) {
    // Log detailed error information
    await job.log(`Error: ${error.message}`);
    await job.log(`Stack: ${error.stack}`);
    
    // Re-throw to mark job as failed
    throw error;
  }
});

Install with Tessl CLI

npx tessl i tessl/npm-bull@4.16.2

docs

event-system.md

index.md

job-creation.md

job-processing.md

job-state.md

queue-management.md

queue-monitoring.md

repeatable-jobs.md

utils.md

tile.json