or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

job-processing.mddocs/

Job Processing

Define job processors with concurrency control, named job handling, and error management.

Capabilities

Basic Job Processing

Define how jobs are processed in the queue.

/**
 * Define a job processor function for all jobs in the queue
 * @param callback - Function to process jobs (callback or promise-based)
 * @returns Promise that resolves when processor is registered
 */
process(callback: ProcessCallbackFunction): Promise<void>;
process(callback: ProcessPromiseFunction): Promise<void>;
process(callback: string): Promise<void>; // Path to processor file

type ProcessCallbackFunction<T> = (job: Job<T>, done: DoneCallback) => void;
type ProcessPromiseFunction<T> = (job: Job<T>) => Promise<any>;
type DoneCallback = (error?: Error | null, value?: any) => void;

Usage Examples:

const Queue = require('bull');
const emailQueue = new Queue('email processing');

// Promise-based processor
emailQueue.process(async (job) => {
  const { email, name } = job.data;
  console.log(`Processing email for ${name}`);
  
  // Simulate async work
  await sendEmail(email, name);
  
  return { sent: true, timestamp: Date.now() };
});

// Callback-based processor
emailQueue.process((job, done) => {
  const { email, name } = job.data;
  
  sendEmail(email, name, (err, result) => {
    if (err) {
      return done(err);
    }
    done(null, { sent: true, result });
  });
});

// External processor file
emailQueue.process('./processors/emailProcessor.js');

Concurrent Processing

Process multiple jobs simultaneously with concurrency control.

/**
 * Define a job processor with concurrency limit
 * @param concurrency - Maximum number of jobs to process simultaneously
 * @param callback - Function to process jobs
 * @returns Promise that resolves when processor is registered
 */
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(concurrency: number, callback: string): Promise<void>;

Usage Examples:

// Process up to 5 jobs simultaneously
emailQueue.process(5, async (job) => {
  console.log(`Processing job ${job.id} with data:`, job.data);
  await processEmailJob(job.data);
  return { processed: true };
});

// Concurrent processing with callback
emailQueue.process(3, (job, done) => {
  processJob(job.data, done);
});

Named Job Processing

Process specific types of jobs by name.

/**
 * Define a processor for jobs with a specific name
 * @param name - Job name to process
 * @param callback - Function to process named jobs
 * @returns Promise that resolves when processor is registered
 */
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, callback: string): Promise<void>;

Usage Examples:

const Queue = require('bull');
const taskQueue = new Queue('task processing');

// Process only 'welcome-email' jobs
taskQueue.process('welcome-email', async (job) => {
  const { email, name } = job.data;
  await sendWelcomeEmail(email, name);
  return { emailSent: true };
});

// Process only 'generate-report' jobs  
taskQueue.process('generate-report', async (job) => {
  const { reportType, userId } = job.data;
  const report = await generateReport(reportType, userId);
  return { reportId: report.id };
});

// You can have multiple processors for different job types
taskQueue.process('send-notification', async (job) => {
  await sendNotification(job.data);
  return { notificationSent: true };
});

Named Job Processing with Concurrency

Combine named job processing with concurrency control.

/**
 * Define a processor for named jobs with concurrency limit
 * @param name - Job name to process
 * @param concurrency - Maximum concurrent jobs of this type
 * @param callback - Function to process jobs
 * @returns Promise that resolves when processor is registered
 */
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessPromiseFunction): Promise<void>;
process(name: string, concurrency: number, callback: string): Promise<void>;

Usage Examples:

// Process up to 2 'image-resize' jobs concurrently
taskQueue.process('image-resize', 2, async (job) => {
  const { imagePath, width, height } = job.data;
  const resizedPath = await resizeImage(imagePath, width, height);
  return { resizedPath };
});

// Process up to 10 'email-send' jobs concurrently
taskQueue.process('email-send', 10, async (job) => {
  await sendEmail(job.data);
  return { sent: true };
});

Job Progress Tracking

Track and report job progress during processing.

// Progress tracking is done within job processors
// Access via job.progress() method in processors

Usage Examples:

taskQueue.process('large-task', async (job) => {
  const { items } = job.data;
  
  for (let i = 0; i < items.length; i++) {
    await processItem(items[i]);
    
    // Update progress (0-100 or any value)
    const progress = Math.round((i + 1) / items.length * 100);
    await job.progress(progress);
  }
  
  return { processed: items.length };
});

// Listen for progress updates
taskQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} is ${progress}% complete`);
});

Error Handling in Processors

Handle errors and job failures in processors.

// Error handling patterns for different processor types

Usage Examples:

// Promise-based error handling
taskQueue.process(async (job) => {
  try {
    const result = await riskyOperation(job.data);
    return result;
  } catch (error) {
    // Error will be caught by Bull and job marked as failed
    throw new Error(`Processing failed: ${error.message}`);
  }
});

// Callback-based error handling
taskQueue.process((job, done) => {
  riskyOperation(job.data, (err, result) => {
    if (err) {
      // Pass error to done callback
      return done(new Error(`Processing failed: ${err.message}`));
    }
    done(null, result);
  });
});

// Custom error handling with job methods
taskQueue.process(async (job) => {
  try {
    await job.progress(10);
    const data = await fetchData(job.data.url);
    
    await job.progress(50);
    const processed = await processData(data);
    
    await job.progress(100);
    return processed;
  } catch (error) {
    // Log error details
    await job.log(`Error occurred: ${error.message}`);
    throw error;
  }
});

Processor Lifecycle and Events

Monitor processor lifecycle through events.

// Events are emitted during job processing lifecycle
// See Event System documentation for complete event handling

Usage Examples:

taskQueue.process(async (job) => {
  await processJob(job.data);
  return { success: true };
});

// Monitor processing events
taskQueue.on('active', (job) => {
  console.log(`Job ${job.id} started processing`);
});

taskQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

taskQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
});

taskQueue.on('stalled', (job) => {
  console.warn(`Job ${job.id} stalled and will be retried`);
});

External Processor Files

Use external files for job processors.

Usage Examples:

// In your main file
const taskQueue = new Queue('tasks');

// Use external processor file
taskQueue.process('./processors/taskProcessor.js');

// Use external processor with concurrency
taskQueue.process(5, './processors/concurrentProcessor.js');

// Use external processor for named jobs
taskQueue.process('specific-task', './processors/specificTaskProcessor.js');
// In processors/taskProcessor.js
module.exports = async function(job) {
  const { taskType, data } = job.data;
  
  console.log(`Processing ${taskType} with data:`, data);
  
  // Update progress
  await job.progress(50);
  
  // Process the task
  const result = await performTask(taskType, data);
  
  await job.progress(100);
  
  return result;
};

Job Processing Best Practices

// Example of robust job processor with best practices
taskQueue.process('robust-task', 3, async (job) => {
  const startTime = Date.now();
  
  try {
    // Validate job data
    if (!job.data || !job.data.required_field) {
      throw new Error('Invalid job data: missing required_field');
    }
    
    // Log job start
    await job.log(`Started processing at ${new Date().toISOString()}`);
    
    // Update progress at key points
    await job.progress(10);
    
    // Perform work with proper error handling
    const step1 = await performStep1(job.data);
    await job.progress(30);
    
    const step2 = await performStep2(step1);
    await job.progress(60);
    
    const result = await performStep3(step2);
    await job.progress(100);
    
    // Log completion
    const duration = Date.now() - startTime;
    await job.log(`Completed in ${duration}ms`);
    
    return {
      success: true,
      result,
      duration
    };
    
  } catch (error) {
    // Log detailed error information
    await job.log(`Error: ${error.message}`);
    await job.log(`Stack: ${error.stack}`);
    
    // Re-throw to mark job as failed
    throw error;
  }
});