or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

error-handling.mddocs/

Error Handling

Specialized error classes for controlling job flow and retry behavior. BullMQ provides fine-grained error handling strategies that enable sophisticated job failure management and recovery patterns.

Capabilities

Error Classes

BullMQ provides specialized error classes that control job behavior when thrown from processors.

/**
 * Forces job to failed state regardless of retry attempts
 */
class UnrecoverableError extends Error {
  constructor(message: string);
}

/**
 * Moves job to delayed state for later retry
 */
class DelayedError extends Error {
  constructor(delay?: number);
}

/**
 * Indicates rate limit exceeded, delays job processing
 */
class RateLimitError extends Error {
  constructor(message: string, delay?: number);
}

/**
 * Moves job to waiting-children state until dependencies complete
 */
class WaitingChildrenError extends Error {
  constructor(message?: string);
}

/**
 * Returns job to waiting state for retry
 */
class WaitingError extends Error {
  constructor(message?: string);
}

Error Constants

/** Marker for unrecoverable errors */
const UNRECOVERABLE_ERROR = 'bullmq:unrecoverable';

Usage Examples:

import { 
  Worker, 
  UnrecoverableError, 
  DelayedError, 
  RateLimitError,
  WaitingChildrenError,
  WaitingError 
} from "bullmq";

// Worker with comprehensive error handling
const apiWorker = new Worker("api-calls", async (job) => {
  const { url, method, data } = job.data;
  
  try {
    const response = await makeApiCall(url, method, data);
    return response.data;
    
  } catch (error) {
    // Handle different error scenarios
    
    if (error.status === 401) {
      // Authentication failed - don't retry
      throw new UnrecoverableError("Authentication failed - invalid credentials");
    }
    
    if (error.status === 429) {
      // Rate limited - delay retry
      const retryAfter = error.headers['retry-after'] * 1000 || 60000;
      throw new RateLimitError("API rate limit exceeded", retryAfter);
    }
    
    if (error.status === 400) {
      // Bad request - don't retry
      throw new UnrecoverableError(`Bad request: ${error.message}`);
    }
    
    if (error.status >= 500) {
      // Server error - delay retry
      throw new DelayedError(30000); // 30 second delay
    }
    
    if (error.code === 'ECONNREFUSED') {
      // Connection refused - delay retry
      throw new DelayedError(60000); // 1 minute delay
    }
    
    // Other errors - normal retry behavior
    throw error;
  }
});

// Start worker
await apiWorker.run();

UnrecoverableError

Forces immediate job failure without retries, regardless of job options.

// Examples of unrecoverable errors
const dataWorker = new Worker("data-processing", async (job) => {
  const { data, schema } = job.data;
  
  // Validate required fields
  if (!data || !schema) {
    throw new UnrecoverableError("Missing required data or schema");
  }
  
  // Check data format
  if (!isValidFormat(data)) {
    throw new UnrecoverableError("Invalid data format - cannot be processed");
  }
  
  // Check permissions
  if (!userHasPermission(job.data.userId, "process-data")) {
    throw new UnrecoverableError("User lacks permission to process this data");
  }
  
  // Process data
  return await processData(data, schema);
});

DelayedError

Moves job to delayed state for retry after specified delay.

// Delayed retry scenarios
const fileWorker = new Worker("file-processing", async (job) => {
  const { filePath } = job.data;
  
  try {
    // Check if file exists
    if (!await fileExists(filePath)) {
      // File might be still uploading - delay retry
      throw new DelayedError(30000); // 30 seconds
    }
    
    // Check if file is locked by another process
    if (await isFileLocked(filePath)) {
      throw new DelayedError(10000); // 10 seconds
    }
    
    // Process file
    return await processFile(filePath);
    
  } catch (error) {
    if (error.code === 'EBUSY') {
      // File busy - delay retry
      throw new DelayedError(15000);
    }
    
    throw error;
  }
});

// Exponential backoff with DelayedError
const exponentialDelayWorker = new Worker("flaky-service", async (job) => {
  const delay = Math.pow(2, job.attemptsMade) * 1000; // 1s, 2s, 4s, 8s, ...
  
  try {
    return await callFlakyService(job.data);
  } catch (error) {
    if (job.attemptsMade < 5) {
      throw new DelayedError(delay);
    }
    throw error; // Final attempt - let it fail normally
  }
});

RateLimitError

Handles rate limiting with automatic delay calculation.

// Rate limit handling
const socialMediaWorker = new Worker("social-posts", async (job) => {
  const { platform, content } = job.data;
  
  try {
    return await postToSocialMedia(platform, content);
    
  } catch (error) {
    if (error.code === 'RATE_LIMITED') {
      // Use rate limit info from API response
      const resetTime = error.rateLimit?.reset || Date.now() + 3600000;
      const delay = resetTime - Date.now();
      
      throw new RateLimitError(
        `Rate limited until ${new Date(resetTime).toISOString()}`,
        Math.max(delay, 0)
      );
    }
    
    throw error;
  }
});

// Global rate limiting
let lastApiCall = 0;
const minInterval = 1000; // 1 second between calls

const rateLimitedWorker = new Worker("api-worker", async (job) => {
  const now = Date.now();
  const timeSinceLastCall = now - lastApiCall;
  
  if (timeSinceLastCall < minInterval) {
    const delay = minInterval - timeSinceLastCall;
    throw new RateLimitError("Global rate limit", delay);
  }
  
  lastApiCall = now;
  return await makeApiCall(job.data.url);
});

WaitingChildrenError

Used in flow contexts to wait for child jobs to complete.

// Parent job waiting for children
const parentWorker = new Worker("parent-jobs", async (job) => {
  const { childJobs } = job.data;
  
  // Create child jobs
  const childQueue = new Queue("child-jobs");
  const children = await Promise.all(
    childJobs.map(childData => childQueue.add("process-child", childData))
  );
  
  // Check if all children are completed
  const allCompleted = await Promise.all(
    children.map(child => child.isCompleted())
  );
  
  if (!allCompleted.every(Boolean)) {
    // Some children not completed - wait
    throw new WaitingChildrenError("Waiting for child jobs to complete");
  }
  
  // All children completed - proceed with parent processing
  const childResults = await Promise.all(
    children.map(child => child.returnvalue)
  );
  
  return { childResults, processed: true };
});

WaitingError

Returns job to waiting state for immediate retry.

// Resource availability check
const resourceWorker = new Worker("resource-jobs", async (job) => {
  const { resourceId } = job.data;
  
  // Check resource availability
  const resource = await getResource(resourceId);
  
  if (!resource.available) {
    // Resource not available - return to waiting
    throw new WaitingError("Resource not available, returning to queue");
  }
  
  // Resource available - process job
  return await processWithResource(resource, job.data);
});

// Queue capacity check
const capacityWorker = new Worker("capacity-jobs", async (job) => {
  const currentLoad = await getCurrentSystemLoad();
  
  if (currentLoad > 0.8) {
    // System overloaded - return to waiting
    throw new WaitingError("System overloaded, deferring job");
  }
  
  return await processJob(job.data);
});

Error Handling Patterns

// Comprehensive error handling strategy
class ErrorHandlingWorker {
  constructor(queueName: string, processor: Function) {
    const worker = new Worker(queueName, async (job) => {
      try {
        return await processor(job);
      } catch (error) {
        return this.handleError(error, job);
      }
    });
    
    // Log all failures
    worker.on("failed", (job, error) => {
      console.error(`Job ${job.id} failed:`, error.message);
      
      if (error instanceof UnrecoverableError) {
        console.error("Unrecoverable error - job will not be retried");
      }
    });
    
    return worker;
  }
  
  private async handleError(error: Error, job: any) {
    // Network errors - delay retry
    if (this.isNetworkError(error)) {
      throw new DelayedError(30000);
    }
    
    // Validation errors - unrecoverable
    if (this.isValidationError(error)) {
      throw new UnrecoverableError(error.message);
    }
    
    // Service unavailable - delay retry
    if (this.isServiceUnavailable(error)) {
      const delay = Math.min(60000 * job.attemptsMade, 300000); // Max 5 minutes
      throw new DelayedError(delay);
    }
    
    // Re-throw for normal retry behavior
    throw error;
  }
  
  private isNetworkError(error: Error): boolean {
    return ['ECONNRESET', 'ECONNREFUSED', 'ETIMEDOUT'].includes((error as any).code);
  }
  
  private isValidationError(error: Error): boolean {
    return error.message.includes('validation') || error.message.includes('invalid');
  }
  
  private isServiceUnavailable(error: Error): boolean {
    return (error as any).status === 503;
  }
}

// Usage
const smartWorker = new ErrorHandlingWorker("smart-processing", async (job) => {
  // Your job processing logic
  return await processJob(job.data);
});

Error Recovery Strategies

// Circuit breaker pattern with errors
class CircuitBreakerWorker {
  private failures = 0;
  private lastFailure = 0;
  private readonly threshold = 5;
  private readonly timeout = 60000; // 1 minute

  constructor(queueName: string) {
    const worker = new Worker(queueName, async (job) => {
      // Check circuit breaker state
      if (this.isCircuitOpen()) {
        throw new DelayedError(this.timeout);
      }
      
      try {
        const result = await this.processJob(job.data);
        this.onSuccess();
        return result;
      } catch (error) {
        this.onFailure();
        throw error;
      }
    });
  }
  
  private isCircuitOpen(): boolean {
    return this.failures >= this.threshold && 
           (Date.now() - this.lastFailure) < this.timeout;
  }
  
  private onSuccess() {
    this.failures = 0;
  }
  
  private onFailure() {
    this.failures++;
    this.lastFailure = Date.now();
  }
  
  private async processJob(data: any) {
    // Actual job processing
    return await externalService.process(data);
  }
}

Error Monitoring and Alerting

// Error tracking and alerting
const monitoredWorker = new Worker("monitored-jobs", async (job) => {
  try {
    return await processJob(job.data);
  } catch (error) {
    // Track error types
    trackError(error, job);
    
    // Different handling based on error type
    if (error instanceof UnrecoverableError) {
      // Alert for unrecoverable errors
      await sendAlert("CRITICAL", `Unrecoverable error in job ${job.id}: ${error.message}`);
      throw error;
    }
    
    if (error instanceof RateLimitError) {
      // Track rate limiting
      trackRateLimit(job.name, error.delay);
      throw error;
    }
    
    // Default handling
    throw error;
  }
});

function trackError(error: Error, job: any) {
  console.log(`Error in job ${job.id} (attempt ${job.attemptsMade}):`, {
    errorType: error.constructor.name,
    message: error.message,
    jobName: job.name,
    timestamp: new Date().toISOString(),
  });
}

async function sendAlert(level: string, message: string) {
  // Send to monitoring system
  console.log(`[${level}] ${message}`);
}

function trackRateLimit(jobName: string, delay: number) {
  console.log(`Rate limit hit for ${jobName}, delayed by ${delay}ms`);
}