CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bullmq

Redis-based distributed queue system for Node.js providing robust message and job processing capabilities with features like job scheduling, retries, and flow orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

configuration.mddocs/

Configuration

Comprehensive configuration options for queues, workers, jobs, and Redis connections. BullMQ supports advanced features like rate limiting, metrics collection, telemetry, and fine-tuned performance settings.

Capabilities

Queue Configuration

interface QueueOptions {
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Default options applied to all jobs */
  defaultJobOptions?: BaseJobOptions;
  
  /** Key prefix for Redis keys (default: 'bull') */
  prefix?: string;
  
  /** Stream configuration options */
  streams?: StreamsOptions;
  
  /** Skip updating queue metadata */
  skipMetasUpdate?: boolean;
  
  /** Enable job scheduler */
  skipJobScheduler?: boolean;
  
  /** Settings for the job scheduler */
  settings?: AdvancedSettings;
}

interface StreamsOptions {
  /** Events stream configuration */
  events?: {
    /** Maximum stream length */
    maxLen?: number;
  };
}

interface AdvancedSettings {
  /** Retry delay in milliseconds for stalled jobs */
  stalledInterval?: number;
  
  /** Maximum delay for retries */
  maxStalledCount?: number;
  
  /** Retry delay for failed jobs */
  retryProcessDelay?: number;
}

Usage Examples:

import { Queue } from "bullmq";

// Basic queue configuration
const basicQueue = new Queue("basic-queue", {
  connection: {
    host: "localhost",
    port: 6379,
    db: 0,
  },
  defaultJobOptions: {
    removeOnComplete: 100,
    removeOnFail: 50,
    attempts: 3,
    backoff: "exponential",
  },
});

// Advanced queue configuration
const advancedQueue = new Queue("advanced-queue", {
  connection: {
    host: "redis-cluster.example.com",
    port: 6379,
    password: "secure-password",
    tls: {
      rejectUnauthorized: false,
    },
    retryDelayOnFailedAttempt: 100,
    maxRetriesPerRequest: 3,
  },
  prefix: "myapp", // Keys will be prefixed with "myapp:"
  streams: {
    events: {
      maxLen: 10000, // Keep max 10k events
    },
  },
  settings: {
    stalledInterval: 30000,    // Check for stalled jobs every 30s
    maxStalledCount: 1,        // Max 1 stall before failing
    retryProcessDelay: 5000,   // 5s delay for retry processing
  },
  skipMetasUpdate: false,      // Keep queue metadata
  skipJobScheduler: false,     // Enable scheduling
});

Worker Configuration

interface WorkerOptions {
  /** Maximum number of jobs to process concurrently (default: 1) */
  concurrency?: number;
  
  /** Rate limiting configuration */
  limiter?: RateLimiterOptions;
  
  /** Maximum number of times a job can be stalled before failed (default: 1) */
  maxStalledCount?: number;
  
  /** Interval for checking stalled jobs in ms (default: 30000) */
  stalledInterval?: number;
  
  /** Keep completed jobs count/age */
  removeOnComplete?: number | boolean | KeepJobs;
  
  /** Keep failed jobs count/age */
  removeOnFail?: number | boolean | KeepJobs;
  
  /** Job lock duration in ms (default: 30000) */
  lockDuration?: number;
  
  /** Lock renewal interval in ms (default: 15000) */
  lockRenewTime?: number;
  
  /** Skip stalled job detection */
  skipStalledCheck?: boolean;
  
  /** Skip automatic lock renewal */
  skipLockRenewal?: boolean;
  
  /** Use versioned locks */
  useWorkerVersioning?: boolean;
  
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Key prefix for Redis keys */
  prefix?: string;
  
  /** Metrics collection options */
  metrics?: MetricsOptions;
  
  /** Telemetry configuration */
  telemetry?: TelemetryOptions;
  
  /** Sandbox options for isolated job processing */
  sandbox?: SandboxOptions;
}

Usage Examples:

import { Worker } from "bullmq";

// High-throughput worker configuration
const highThroughputWorker = new Worker("image-processing", processor, {
  concurrency: 10,
  maxStalledCount: 2,
  stalledInterval: 30000,
  lockDuration: 60000,      // 1 minute job timeout
  lockRenewTime: 30000,     // Renew every 30 seconds
  
  // Cleanup settings
  removeOnComplete: {
    count: 1000,            // Keep 1000 completed jobs
    age: 24 * 60 * 60,      // or 24 hours
  },
  removeOnFail: {
    count: 100,             // Keep 100 failed jobs
    age: 7 * 24 * 60 * 60,  // or 7 days
  },
  
  // Rate limiting
  limiter: {
    max: 50,                // Max 50 jobs
    duration: 60 * 1000,    // per minute
    bucketSize: 10,         // with burst capacity of 10
  },
  
  // Metrics collection
  metrics: {
    maxDataPoints: 100,
  },
});

// Reliable worker configuration
const reliableWorker = new Worker("critical-tasks", processor, {
  concurrency: 1,           // Sequential processing
  maxStalledCount: 0,       // No stalling allowed
  stalledInterval: 10000,   // Check every 10 seconds
  lockDuration: 300000,     // 5 minute timeout
  skipStalledCheck: false,  // Enable stalled check
  skipLockRenewal: false,   // Enable lock renewal
  useWorkerVersioning: true, // Use versioned locks
});

Job Configuration

interface BaseJobOptions {
  /** Job priority (0-2097152, higher = more priority) */
  priority?: number;
  
  /** Delay before processing (milliseconds) */
  delay?: number;
  
  /** Number of retry attempts (default: 0) */
  attempts?: number;
  
  /** Retry backoff strategy */
  backoff?: BackoffStrategy | BackoffOptions;
  
  /** Lifo (last in, first out) processing */
  lifo?: boolean;
  
  /** Job timeout in milliseconds */
  jobTimeout?: number;
  
  /** Keep completed job count/age */
  removeOnComplete?: number | boolean | KeepJobs;
  
  /** Keep failed job count/age */
  removeOnFail?: number | boolean | KeepJobs;
  
  /** Parent job relationship */
  parent?: ParentOptions;
  
  /** Repeatable job configuration */
  repeat?: RepeatOptions;
  
  /** Custom job ID */
  jobId?: string;
  
  /** Job deduplication settings */
  deduplication?: DeduplicationOptions;
}

interface BackoffOptions {
  /** Backoff type */
  type: 'fixed' | 'exponential';
  
  /** Base delay in milliseconds */
  delay?: number;
  
  /** Add randomization to delay */
  jitter?: boolean;
}

interface KeepJobs {
  /** Keep jobs by age (seconds) */
  age?: number;
  
  /** Keep jobs by count */
  count?: number;
}

Usage Examples:

// Priority job with retry configuration
await queue.add("urgent-task", data, {
  priority: 100,
  attempts: 5,
  backoff: {
    type: "exponential",
    delay: 2000,
    jitter: true,
  },
  removeOnComplete: 10,
  removeOnFail: false, // Keep all failed jobs
});

// Delayed job with cleanup
await queue.add("scheduled-task", data, {
  delay: 60000, // 1 minute delay
  jobTimeout: 30000, // 30 second timeout
  removeOnComplete: {
    count: 50,
    age: 3600, // 1 hour
  },
});

// Job with deduplication
await queue.add("unique-task", data, {
  jobId: "unique-task-123",
  deduplication: {
    id: "user-action-123",
  },
});

Rate Limiting Configuration

interface RateLimiterOptions {
  /** Maximum number of jobs to process */
  max: number;
  
  /** Duration window in milliseconds */
  duration: number;
  
  /** Bucket size for token bucket algorithm */
  bucketSize?: number;
  
  /** Group jobs by a function for separate rate limiting */
  groupKey?: string | ((job: Job) => string);
}

Usage Examples:

// Simple rate limiting
const rateLimitedWorker = new Worker("api-calls", processor, {
  limiter: {
    max: 100,               // 100 requests
    duration: 60 * 1000,    // per minute
  },
});

// Per-user rate limiting
const perUserWorker = new Worker("user-actions", processor, {
  limiter: {
    max: 10,                // 10 actions
    duration: 60 * 1000,    // per minute
    groupKey: (job) => job.data.userId, // per user
  },
});

// Burst-capable rate limiting
const burstWorker = new Worker("batch-processing", processor, {
  limiter: {
    max: 1000,              // 1000 jobs per hour
    duration: 60 * 60 * 1000,
    bucketSize: 100,        // but allow bursts of 100
  },
});

Connection Configuration

interface ConnectionOptions {
  /** Redis host */
  host?: string;
  
  /** Redis port */
  port?: number;
  
  /** Database number */
  db?: number;
  
  /** Authentication password */
  password?: string;
  
  /** Username for ACL authentication */
  username?: string;
  
  /** Connection timeout */
  connectTimeout?: number;
  
  /** Command timeout */
  commandTimeout?: number;
  
  /** Retry delay on failed attempts */
  retryDelayOnFailedAttempt?: number;
  
  /** Maximum retries per request */
  maxRetriesPerRequest?: number;
  
  /** TLS configuration */
  tls?: TLSOptions;
  
  /** Keep alive settings */
  keepAlive?: number;
  
  /** Family preference (4 or 6) */
  family?: number;
}

interface TLSOptions {
  /** Reject unauthorized certificates */
  rejectUnauthorized?: boolean;
  
  /** Certificate authority */
  ca?: string | Buffer | Array<string | Buffer>;
  
  /** Client certificate */
  cert?: string | Buffer;
  
  /** Client private key */
  key?: string | Buffer;
}

Usage Examples:

// Secure Redis connection
const secureConnection = {
  host: "redis.example.com",
  port: 6380,
  password: "secure-password",
  username: "bullmq-user",
  db: 1,
  tls: {
    rejectUnauthorized: true,
    ca: fs.readFileSync("ca-cert.pem"),
    cert: fs.readFileSync("client-cert.pem"),
    key: fs.readFileSync("client-key.pem"),
  },
  connectTimeout: 10000,
  commandTimeout: 5000,
  retryDelayOnFailedAttempt: 100,
  maxRetriesPerRequest: 3,
};

// Connection with keep-alive
const keepAliveConnection = {
  host: "localhost",
  port: 6379,
  keepAlive: 30000,        // 30 second keep-alive
  family: 4,               // IPv4
  connectTimeout: 60000,   // 1 minute timeout
};

Metrics Configuration

interface MetricsOptions {
  /** Maximum data points to collect */
  maxDataPoints?: number;
}

Telemetry Configuration

interface TelemetryOptions {
  /** Telemetry tracer */
  tracer?: Tracer;
  
  /** Context manager for distributed tracing */
  contextManager?: ContextManager;
}

Sandbox Configuration

interface SandboxOptions {
  /** Timeout for sandboxed processes */
  timeout?: number;
  
  /** Memory limit for sandboxed processes */
  memoryLimit?: number;
  
  /** Environment variables */
  env?: Record<string, string>;
}

Repeat/Schedule Configuration

interface RepeatOptions {
  /** Cron expression */
  cron?: string;
  
  /** Timezone for cron */
  tz?: string;
  
  /** Start date */
  startDate?: Date | string | number;
  
  /** End date */
  endDate?: Date | string | number;
  
  /** Maximum number of iterations */
  limit?: number;
  
  /** Interval in milliseconds */
  every?: number;
  
  /** Start immediately */
  immediately?: boolean;
  
  /** Current iteration count */
  count?: number;
  
  /** Previous run time */
  prevMillis?: number;
  
  /** Job key for deduplication */
  jobId?: string;
}

Usage Examples:

// Cron-based scheduling
await queue.add("daily-report", {}, {
  repeat: {
    cron: "0 9 * * *",      // 9 AM daily
    tz: "America/New_York",
    limit: 365,             // Max 365 iterations
  },
});

// Interval-based scheduling
await queue.add("health-check", {}, {
  repeat: {
    every: 30000,           // Every 30 seconds
    immediately: true,      // Start immediately
  },
});

// Time-bounded scheduling
await queue.add("promotion", data, {
  repeat: {
    every: 60000,           // Every minute
    startDate: new Date("2024-01-01"),
    endDate: new Date("2024-01-31"),
  },
});

Environment-Specific Configurations

// Development configuration
const devConfig = {
  connection: {
    host: "localhost",
    port: 6379,
    db: 0,
  },
  defaultJobOptions: {
    removeOnComplete: 10,
    removeOnFail: 10,
    attempts: 1,
  },
};

// Production configuration
const prodConfig = {
  connection: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || "6379"),
    password: process.env.REDIS_PASSWORD,
    tls: {
      rejectUnauthorized: true,
    },
    retryDelayOnFailedAttempt: 100,
    maxRetriesPerRequest: 3,
  },
  defaultJobOptions: {
    removeOnComplete: 1000,
    removeOnFail: 100,
    attempts: 3,
    backoff: "exponential",
  },
  settings: {
    stalledInterval: 30000,
    maxStalledCount: 1,
  },
};

// Use environment-specific config
const config = process.env.NODE_ENV === "production" ? prodConfig : devConfig;
const queue = new Queue("my-queue", config);

Configuration Best Practices

// Centralized configuration management
class BullMQConfig {
  static getQueueConfig(queueName: string): QueueOptions {
    return {
      connection: this.getConnectionConfig(),
      prefix: process.env.BULLMQ_PREFIX || "bull",
      defaultJobOptions: {
        removeOnComplete: parseInt(process.env.KEEP_COMPLETED || "100"),
        removeOnFail: parseInt(process.env.KEEP_FAILED || "50"),
        attempts: parseInt(process.env.DEFAULT_ATTEMPTS || "3"),
        backoff: "exponential",
      },
    };
  }
  
  static getWorkerConfig(concurrency: number = 1): WorkerOptions {
    return {
      connection: this.getConnectionConfig(),
      concurrency,
      maxStalledCount: parseInt(process.env.MAX_STALLED || "1"),
      stalledInterval: parseInt(process.env.STALLED_INTERVAL || "30000"),
      lockDuration: parseInt(process.env.LOCK_DURATION || "30000"),
    };
  }
  
  private static getConnectionConfig(): ConnectionOptions {
    return {
      host: process.env.REDIS_HOST || "localhost",
      port: parseInt(process.env.REDIS_PORT || "6379"),
      password: process.env.REDIS_PASSWORD,
      db: parseInt(process.env.REDIS_DB || "0"),
      retryDelayOnFailedAttempt: 100,
      maxRetriesPerRequest: 3,
    };
  }
}

// Use centralized config
const queue = new Queue("orders", BullMQConfig.getQueueConfig("orders"));
const worker = new Worker("orders", processor, BullMQConfig.getWorkerConfig(5));

Install with Tessl CLI

npx tessl i tessl/npm-bullmq

docs

configuration.md

error-handling.md

event-monitoring.md

flow-orchestration.md

index.md

job-lifecycle.md

job-processing.md

job-scheduling.md

queue-management.md

tile.json