or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

job-processing.mddocs/

Job Processing

Worker-based job processing with configurable concurrency, error handling, and retry mechanisms. The Worker class provides robust job processing capabilities with support for rate limiting, stalled job recovery, and graceful shutdown.

Capabilities

Worker Class

Main class for processing jobs from queues with comprehensive concurrency and error handling.

/**
 * Worker class for processing jobs from queues
 */
class Worker<T = any, R = any, N extends string = string> {
  constructor(name: string, processor: Processor<T, R, N>, opts?: WorkerOptions);
  
  /** Start processing jobs */
  run(): Promise<void>;
  
  /** Pause job processing */
  pause(): Promise<void>;
  
  /** Resume job processing */
  resume(): Promise<void>;
  
  /** Get next job for processing */
  getNextJob(token: string): Promise<Job<T, R, N> | undefined>;
  
  /** Close worker gracefully */
  close(force?: boolean): Promise<void>;
  
  /** Check if worker is running */
  isRunning(): boolean;
  
  /** Check if worker is paused */
  isPaused(): boolean;
  
  /** Wait until worker is ready */
  waitUntilReady(): Promise<void>;
}

Processor Function

/**
 * Job processor function signature
 */
type Processor<T = any, R = any, N extends string = string> = (
  job: Job<T, R, N>,
  token?: string
) => Promise<R>;

Usage Examples:

import { Worker, Job } from "bullmq";

// Basic worker
const emailWorker = new Worker("email processing", async (job) => {
  const { to, subject, body } = job.data;
  
  // Process the job
  console.log(`Sending email to ${to}`);
  
  // Update progress
  await job.updateProgress(25);
  
  // Simulate email sending
  await sendEmail(to, subject, body);
  
  await job.updateProgress(100);
  
  return { emailId: "email-123", sentAt: new Date() };
});

// Worker with advanced options
const imageWorker = new Worker("image processing", 
  async (job: Job<{ imageUrl: string; transformations: string[] }>) => {
    const { imageUrl, transformations } = job.data;
    
    await job.updateProgress(10);
    
    // Download image
    const image = await downloadImage(imageUrl);
    await job.updateProgress(30);
    
    // Apply transformations
    let processedImage = image;
    for (let i = 0; i < transformations.length; i++) {
      processedImage = await applyTransformation(processedImage, transformations[i]);
      await job.updateProgress(30 + (50 * (i + 1) / transformations.length));
    }
    
    // Upload processed image
    const newUrl = await uploadImage(processedImage);
    await job.updateProgress(100);
    
    return { originalUrl: imageUrl, processedUrl: newUrl };
  },
  {
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000,
    },
    removeOnComplete: 100,
    removeOnFail: 50,
  }
);

// Start processing
await emailWorker.run();
await imageWorker.run();

Worker Options

interface WorkerOptions {
  /** Maximum number of jobs to process concurrently (default: 1) */
  concurrency?: number;
  
  /** Rate limiting configuration */
  limiter?: RateLimiterOptions;
  
  /** Maximum number of times a job can be stalled before failed (default: 1) */
  maxStalledCount?: number;
  
  /** Interval for checking stalled jobs in ms (default: 30000) */
  stalledInterval?: number;
  
  /** Keep completed jobs count/age */
  removeOnComplete?: number | KeepJobs;
  
  /** Keep failed jobs count/age */
  removeOnFail?: number | KeepJobs;
  
  /** Job lock duration in ms (default: 30000) */
  lockDuration?: number;
  
  /** Lock renewal interval in ms (default: 15000) */
  lockRenewTime?: number;
  
  /** Skip stalled job detection */
  skipStalledCheck?: boolean;
  
  /** Skip automatic lock renewal */
  skipLockRenewal?: boolean;
  
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Key prefix for Redis keys */
  prefix?: string;
  
  /** Metrics collection options */
  metrics?: MetricsOptions;
  
  /** Telemetry configuration */
  telemetry?: TelemetryOptions;
}

Rate Limiter Options

interface RateLimiterOptions {
  /** Maximum number of jobs to process */
  max: number;
  
  /** Duration window in milliseconds */
  duration: number;
  
  /** Bucket size for token bucket algorithm */
  bucketSize?: number;
  
  /** Group jobs by a function for separate rate limiting */
  groupKey?: string | ((job: Job) => string);
}

Keep Jobs Options

interface KeepJobs {
  /** Keep jobs by age (milliseconds) */
  age?: number;
  
  /** Keep jobs by count */
  count?: number;
}

Worker Events

The Worker class emits events for monitoring job processing:

interface WorkerEvents {
  'active': (job: Job, prev?: string) => void;
  'completed': (job: Job, result: any, prev?: string) => void;
  'failed': (job: Job, error: Error, prev?: string) => void;
  'progress': (job: Job, progress: number | object) => void;
  'stalled': (jobId: string, prev?: string) => void;
  'error': (error: Error) => void;
  'drained': () => void;
  'paused': () => void;
  'resumed': () => void;
  'closing': () => void;
  'closed': () => void;
  'ioredis:close': () => void;
  'ioredis:select': () => void;
}

Event Usage:

// Worker event listeners
emailWorker.on("active", (job) => {
  console.log(`Started processing job ${job.id}`);
});

emailWorker.on("completed", (job, result) => {
  console.log(`Job ${job.id} completed:`, result);
});

emailWorker.on("failed", (job, error) => {
  console.log(`Job ${job.id} failed:`, error.message);
  
  // Log additional error details
  if (job.stacktrace) {
    console.log("Stack trace:", job.stacktrace);
  }
});

emailWorker.on("progress", (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

emailWorker.on("stalled", (jobId) => {
  console.log(`Job ${jobId} stalled`);
});

emailWorker.on("drained", () => {
  console.log("Queue is drained - no more jobs to process");
});

emailWorker.on("error", (error) => {
  console.error("Worker error:", error);
});

Advanced Worker Configuration

// Worker with comprehensive configuration
const advancedWorker = new Worker(
  "data processing",
  async (job: Job<{ dataset: string; operations: string[] }>) => {
    // Job processing logic
    const { dataset, operations } = job.data;
    
    // Process with progress updates
    for (let i = 0; i < operations.length; i++) {
      await processOperation(dataset, operations[i]);
      await job.updateProgress((i + 1) / operations.length * 100);
    }
    
    return { processed: true, timestamp: Date.now() };
  },
  {
    // Process 3 jobs concurrently
    concurrency: 3,
    
    // Rate limiting: max 20 jobs per minute
    limiter: {
      max: 20,
      duration: 60 * 1000,
    },
    
    // Job timeout and retry settings
    lockDuration: 60 * 1000, // 1 minute timeout
    maxStalledCount: 2, // Allow 2 stalls before failing
    stalledInterval: 30 * 1000, // Check every 30 seconds
    
    // Cleanup settings
    removeOnComplete: 1000, // Keep 1000 completed jobs
    removeOnFail: {
      age: 24 * 60 * 60 * 1000, // Keep failed jobs for 24 hours
      count: 100, // But max 100 failed jobs
    },
    
    // Metrics collection
    metrics: {
      maxDataPoints: 100,
    },
  }
);

// Graceful shutdown
process.on("SIGINT", async () => {
  console.log("Shutting down worker...");
  await advancedWorker.close();
  process.exit(0);
});

Worker Lifecycle Management

// Start worker
await emailWorker.run();

// Pause processing (current jobs continue, no new jobs started)
await emailWorker.pause();

// Resume processing
await emailWorker.resume();

// Check worker status
console.log("Is running:", emailWorker.isRunning());
console.log("Is paused:", emailWorker.isPaused());

// Wait for worker to be ready
await emailWorker.waitUntilReady();

// Graceful shutdown (wait for current jobs to complete)
await emailWorker.close();

// Force shutdown (terminate immediately)
await emailWorker.close(true);

Error Handling in Processors

import { UnrecoverableError, DelayedError, RateLimitError } from "bullmq";

const robustWorker = new Worker("api calls", async (job) => {
  try {
    const response = await makeApiCall(job.data.url);
    return response.data;
  } catch (error) {
    if (error.status === 401) {
      // Don't retry authentication errors
      throw new UnrecoverableError("Authentication failed");
    } else if (error.status === 429) {
      // Rate limited - delay retry
      throw new RateLimitError("Rate limit exceeded", 60000);
    } else if (error.status >= 500) {
      // Server error - delay retry
      throw new DelayedError(30000);
    } else {
      // Other errors - normal retry
      throw error;
    }
  }
});