CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bullmq

Redis-based distributed queue system for Node.js providing robust message and job processing capabilities with features like job scheduling, retries, and flow orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Specialized error classes for controlling job flow and retry behavior. BullMQ provides fine-grained error handling strategies that enable sophisticated job failure management and recovery patterns.

Capabilities

Error Classes

BullMQ provides specialized error classes that control job behavior when thrown from processors.

/**
 * Forces job to failed state regardless of retry attempts
 */
class UnrecoverableError extends Error {
  constructor(message: string);
}

/**
 * Moves job to delayed state for later retry
 */
class DelayedError extends Error {
  constructor(delay?: number);
}

/**
 * Indicates rate limit exceeded, delays job processing
 */
class RateLimitError extends Error {
  constructor(message: string, delay?: number);
}

/**
 * Moves job to waiting-children state until dependencies complete
 */
class WaitingChildrenError extends Error {
  constructor(message?: string);
}

/**
 * Returns job to waiting state for retry
 */
class WaitingError extends Error {
  constructor(message?: string);
}

Error Constants

/** Marker for unrecoverable errors */
const UNRECOVERABLE_ERROR = 'bullmq:unrecoverable';

Usage Examples:

import { 
  Worker, 
  UnrecoverableError, 
  DelayedError, 
  RateLimitError,
  WaitingChildrenError,
  WaitingError 
} from "bullmq";

// Worker with comprehensive error handling
const apiWorker = new Worker("api-calls", async (job) => {
  const { url, method, data } = job.data;
  
  try {
    const response = await makeApiCall(url, method, data);
    return response.data;
    
  } catch (error) {
    // Handle different error scenarios
    
    if (error.status === 401) {
      // Authentication failed - don't retry
      throw new UnrecoverableError("Authentication failed - invalid credentials");
    }
    
    if (error.status === 429) {
      // Rate limited - delay retry
      const retryAfter = error.headers['retry-after'] * 1000 || 60000;
      throw new RateLimitError("API rate limit exceeded", retryAfter);
    }
    
    if (error.status === 400) {
      // Bad request - don't retry
      throw new UnrecoverableError(`Bad request: ${error.message}`);
    }
    
    if (error.status >= 500) {
      // Server error - delay retry
      throw new DelayedError(30000); // 30 second delay
    }
    
    if (error.code === 'ECONNREFUSED') {
      // Connection refused - delay retry
      throw new DelayedError(60000); // 1 minute delay
    }
    
    // Other errors - normal retry behavior
    throw error;
  }
});

// Start worker
await apiWorker.run();

UnrecoverableError

Forces immediate job failure without retries, regardless of job options.

// Examples of unrecoverable errors
const dataWorker = new Worker("data-processing", async (job) => {
  const { data, schema } = job.data;
  
  // Validate required fields
  if (!data || !schema) {
    throw new UnrecoverableError("Missing required data or schema");
  }
  
  // Check data format
  if (!isValidFormat(data)) {
    throw new UnrecoverableError("Invalid data format - cannot be processed");
  }
  
  // Check permissions
  if (!userHasPermission(job.data.userId, "process-data")) {
    throw new UnrecoverableError("User lacks permission to process this data");
  }
  
  // Process data
  return await processData(data, schema);
});

DelayedError

Moves job to delayed state for retry after specified delay.

// Delayed retry scenarios
const fileWorker = new Worker("file-processing", async (job) => {
  const { filePath } = job.data;
  
  try {
    // Check if file exists
    if (!await fileExists(filePath)) {
      // File might be still uploading - delay retry
      throw new DelayedError(30000); // 30 seconds
    }
    
    // Check if file is locked by another process
    if (await isFileLocked(filePath)) {
      throw new DelayedError(10000); // 10 seconds
    }
    
    // Process file
    return await processFile(filePath);
    
  } catch (error) {
    if (error.code === 'EBUSY') {
      // File busy - delay retry
      throw new DelayedError(15000);
    }
    
    throw error;
  }
});

// Exponential backoff with DelayedError
const exponentialDelayWorker = new Worker("flaky-service", async (job) => {
  const delay = Math.pow(2, job.attemptsMade) * 1000; // 1s, 2s, 4s, 8s, ...
  
  try {
    return await callFlakyService(job.data);
  } catch (error) {
    if (job.attemptsMade < 5) {
      throw new DelayedError(delay);
    }
    throw error; // Final attempt - let it fail normally
  }
});

RateLimitError

Handles rate limiting with automatic delay calculation.

// Rate limit handling
const socialMediaWorker = new Worker("social-posts", async (job) => {
  const { platform, content } = job.data;
  
  try {
    return await postToSocialMedia(platform, content);
    
  } catch (error) {
    if (error.code === 'RATE_LIMITED') {
      // Use rate limit info from API response
      const resetTime = error.rateLimit?.reset || Date.now() + 3600000;
      const delay = resetTime - Date.now();
      
      throw new RateLimitError(
        `Rate limited until ${new Date(resetTime).toISOString()}`,
        Math.max(delay, 0)
      );
    }
    
    throw error;
  }
});

// Global rate limiting
let lastApiCall = 0;
const minInterval = 1000; // 1 second between calls

const rateLimitedWorker = new Worker("api-worker", async (job) => {
  const now = Date.now();
  const timeSinceLastCall = now - lastApiCall;
  
  if (timeSinceLastCall < minInterval) {
    const delay = minInterval - timeSinceLastCall;
    throw new RateLimitError("Global rate limit", delay);
  }
  
  lastApiCall = now;
  return await makeApiCall(job.data.url);
});

WaitingChildrenError

Used in flow contexts to wait for child jobs to complete.

// Parent job waiting for children
const parentWorker = new Worker("parent-jobs", async (job) => {
  const { childJobs } = job.data;
  
  // Create child jobs
  const childQueue = new Queue("child-jobs");
  const children = await Promise.all(
    childJobs.map(childData => childQueue.add("process-child", childData))
  );
  
  // Check if all children are completed
  const allCompleted = await Promise.all(
    children.map(child => child.isCompleted())
  );
  
  if (!allCompleted.every(Boolean)) {
    // Some children not completed - wait
    throw new WaitingChildrenError("Waiting for child jobs to complete");
  }
  
  // All children completed - proceed with parent processing
  const childResults = await Promise.all(
    children.map(child => child.returnvalue)
  );
  
  return { childResults, processed: true };
});

WaitingError

Returns job to waiting state for immediate retry.

// Resource availability check
const resourceWorker = new Worker("resource-jobs", async (job) => {
  const { resourceId } = job.data;
  
  // Check resource availability
  const resource = await getResource(resourceId);
  
  if (!resource.available) {
    // Resource not available - return to waiting
    throw new WaitingError("Resource not available, returning to queue");
  }
  
  // Resource available - process job
  return await processWithResource(resource, job.data);
});

// Queue capacity check
const capacityWorker = new Worker("capacity-jobs", async (job) => {
  const currentLoad = await getCurrentSystemLoad();
  
  if (currentLoad > 0.8) {
    // System overloaded - return to waiting
    throw new WaitingError("System overloaded, deferring job");
  }
  
  return await processJob(job.data);
});

Error Handling Patterns

// Comprehensive error handling strategy
class ErrorHandlingWorker {
  constructor(queueName: string, processor: Function) {
    const worker = new Worker(queueName, async (job) => {
      try {
        return await processor(job);
      } catch (error) {
        return this.handleError(error, job);
      }
    });
    
    // Log all failures
    worker.on("failed", (job, error) => {
      console.error(`Job ${job.id} failed:`, error.message);
      
      if (error instanceof UnrecoverableError) {
        console.error("Unrecoverable error - job will not be retried");
      }
    });
    
    return worker;
  }
  
  private async handleError(error: Error, job: any) {
    // Network errors - delay retry
    if (this.isNetworkError(error)) {
      throw new DelayedError(30000);
    }
    
    // Validation errors - unrecoverable
    if (this.isValidationError(error)) {
      throw new UnrecoverableError(error.message);
    }
    
    // Service unavailable - delay retry
    if (this.isServiceUnavailable(error)) {
      const delay = Math.min(60000 * job.attemptsMade, 300000); // Max 5 minutes
      throw new DelayedError(delay);
    }
    
    // Re-throw for normal retry behavior
    throw error;
  }
  
  private isNetworkError(error: Error): boolean {
    return ['ECONNRESET', 'ECONNREFUSED', 'ETIMEDOUT'].includes((error as any).code);
  }
  
  private isValidationError(error: Error): boolean {
    return error.message.includes('validation') || error.message.includes('invalid');
  }
  
  private isServiceUnavailable(error: Error): boolean {
    return (error as any).status === 503;
  }
}

// Usage
const smartWorker = new ErrorHandlingWorker("smart-processing", async (job) => {
  // Your job processing logic
  return await processJob(job.data);
});

Error Recovery Strategies

// Circuit breaker pattern with errors
class CircuitBreakerWorker {
  private failures = 0;
  private lastFailure = 0;
  private readonly threshold = 5;
  private readonly timeout = 60000; // 1 minute

  constructor(queueName: string) {
    const worker = new Worker(queueName, async (job) => {
      // Check circuit breaker state
      if (this.isCircuitOpen()) {
        throw new DelayedError(this.timeout);
      }
      
      try {
        const result = await this.processJob(job.data);
        this.onSuccess();
        return result;
      } catch (error) {
        this.onFailure();
        throw error;
      }
    });
  }
  
  private isCircuitOpen(): boolean {
    return this.failures >= this.threshold && 
           (Date.now() - this.lastFailure) < this.timeout;
  }
  
  private onSuccess() {
    this.failures = 0;
  }
  
  private onFailure() {
    this.failures++;
    this.lastFailure = Date.now();
  }
  
  private async processJob(data: any) {
    // Actual job processing
    return await externalService.process(data);
  }
}

Error Monitoring and Alerting

// Error tracking and alerting
const monitoredWorker = new Worker("monitored-jobs", async (job) => {
  try {
    return await processJob(job.data);
  } catch (error) {
    // Track error types
    trackError(error, job);
    
    // Different handling based on error type
    if (error instanceof UnrecoverableError) {
      // Alert for unrecoverable errors
      await sendAlert("CRITICAL", `Unrecoverable error in job ${job.id}: ${error.message}`);
      throw error;
    }
    
    if (error instanceof RateLimitError) {
      // Track rate limiting
      trackRateLimit(job.name, error.delay);
      throw error;
    }
    
    // Default handling
    throw error;
  }
});

function trackError(error: Error, job: any) {
  console.log(`Error in job ${job.id} (attempt ${job.attemptsMade}):`, {
    errorType: error.constructor.name,
    message: error.message,
    jobName: job.name,
    timestamp: new Date().toISOString(),
  });
}

async function sendAlert(level: string, message: string) {
  // Send to monitoring system
  console.log(`[${level}] ${message}`);
}

function trackRateLimit(jobName: string, delay: number) {
  console.log(`Rate limit hit for ${jobName}, delayed by ${delay}ms`);
}

Install with Tessl CLI

npx tessl i tessl/npm-bullmq

docs

configuration.md

error-handling.md

event-monitoring.md

flow-orchestration.md

index.md

job-lifecycle.md

job-processing.md

job-scheduling.md

queue-management.md

tile.json