or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

tessl/npm-bullmq

Redis-based distributed queue system for Node.js providing robust message and job processing capabilities with features like job scheduling, retries, and flow orchestration.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/bullmq@5.58.x

To install, run

npx @tessl/cli install tessl/npm-bullmq@5.58.0

index.mddocs/

BullMQ

BullMQ is a high-performance, Redis-based distributed queue system for Node.js applications. It provides robust message and job processing capabilities with features like job scheduling, retries, prioritization, delayed jobs, rate limiting, and flow orchestration. Designed for mission-critical applications requiring reliable background job processing with built-in atomicity guarantees, comprehensive error handling, and horizontal scaling across multiple worker processes.

Package Information

  • Package Name: bullmq
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install bullmq or yarn add bullmq

Core Imports

import { Queue, Worker, Job, FlowProducer, QueueEvents, JobScheduler } from "bullmq";

For CommonJS:

const { Queue, Worker, Job, FlowProducer, QueueEvents, JobScheduler } = require("bullmq");

Basic Usage

import { Queue, Worker } from "bullmq";

// Create a queue
const myQueue = new Queue("my queue");

// Add a job to the queue
await myQueue.add("my job", { data: "some data" });

// Create a worker to process jobs
const worker = new Worker("my queue", async (job) => {
  console.log(`Processing job ${job.id} with data:`, job.data);
  
  // Update progress
  await job.updateProgress(50);
  
  // Return result
  return { result: "job completed" };
});

// Listen to worker events
worker.on("completed", (job) => {
  console.log(`Job ${job.id} completed with result:`, job.returnvalue);
});

worker.on("failed", (job, err) => {
  console.log(`Job ${job.id} failed:`, err.message);
});

Architecture

BullMQ is built around several key components:

  • Queue: Main interface for adding and managing jobs
  • Worker: Processes jobs with configurable concurrency and error handling
  • Job: Represents individual work units with state management and lifecycle
  • FlowProducer: Manages complex job flows with dependencies across queues
  • JobScheduler: Advanced job scheduling with cron patterns and repeatable jobs
  • QueueEvents: Real-time event monitoring via Redis streams
  • Redis Integration: Atomic operations using Lua scripts for reliability

Capabilities

Queue Management

Core queue operations for adding jobs, managing state, and controlling flow. Supports job prioritization, delayed execution, and bulk operations.

class Queue<T = any, R = any, N extends string = string> {
  constructor(name: string, opts?: QueueOptions);
  
  add(name: N, data: T, opts?: JobsOptions): Promise<Job<T, R, N>>;
  addBulk(jobs: Array<{ name: N; data: T; opts?: JobsOptions }>): Promise<Job<T, R, N>[]>;
  pause(): Promise<void>;
  resume(): Promise<void>;
  close(): Promise<void>;
}

Queue Management

Job Processing

Worker-based job processing with configurable concurrency, error handling, and retry mechanisms. Supports rate limiting and stalled job recovery.

class Worker<T = any, R = any, N extends string = string> {
  constructor(name: string, processor: Processor<T, R, N>, opts?: WorkerOptions);
  
  run(): Promise<void>;
  pause(): Promise<void>;
  resume(): Promise<void>;
  close(): Promise<void>;
}

type Processor<T = any, R = any, N extends string = string> = (
  job: Job<T, R, N>,
  token?: string
) => Promise<R>;

Job Processing

Job Lifecycle

Individual job management including state transitions, progress tracking, retry logic, and metadata handling.

class Job<T = any, R = any, N extends string = string> {
  readonly id: string;
  readonly name: N;
  readonly data: T;
  readonly opts: JobsOptions;
  
  updateProgress(progress: number | object): Promise<void>;
  log(row: string): Promise<number>;
  remove(): Promise<void>;
}

Job Lifecycle

Flow Orchestration

Complex workflow management with job dependencies across multiple queues. Enables sophisticated pipeline architectures with conditional execution.

class FlowProducer {
  constructor(opts?: FlowProducerOptions);
  
  add(flow: FlowJob, opts?: FlowOpts): Promise<JobNode>;
  addBulk(flows: FlowJob[], opts?: FlowOpts): Promise<JobNode[]>;
  close(): Promise<void>;
}

interface FlowJob {
  name: string;
  queueName: string;
  data?: any;
  opts?: JobsOptions;
  children?: FlowJob[];
}

Flow Orchestration

Job Scheduling

Advanced job scheduling and repeatable job management. Enables cron-based scheduling, interval-based jobs, and complex recurring patterns.

class JobScheduler {
  constructor(name: string, opts: RepeatBaseOptions);
  
  upsertJobScheduler<T, R, N extends string>(
    jobSchedulerId: string,
    repeatOpts: RepeatOptions,
    jobName: N,
    jobData: T,
    opts: JobSchedulerTemplateOptions,
    options: { override: boolean; producerId?: string }
  ): Promise<Job<T, R, N> | undefined>;
  
  removeJobScheduler(jobSchedulerId: string): Promise<number>;
  getSchedulersCount(): Promise<number>;
}

Job Scheduling

Event Monitoring

Real-time queue event monitoring via Redis streams for observability and debugging. Provides comprehensive event tracking across all queue operations.

class QueueEvents {
  constructor(queueName: string, opts?: QueueEventsOptions);
  
  run(): Promise<void>;
  close(): Promise<void>;
}

Event Monitoring

Error Handling

Specialized error classes for controlling job flow and retry behavior. Enables fine-grained error handling strategies.

class UnrecoverableError extends Error {
  constructor(message: string);
}

class DelayedError extends Error {
  constructor(delay?: number);
}

class RateLimitError extends Error {
  constructor(message: string, delay?: number);
}

Error Handling

Configuration

Comprehensive configuration options for queues, workers, jobs, and Redis connections. Supports advanced features like rate limiting, metrics, and telemetry.

interface QueueOptions {
  connection?: ConnectionOptions;
  defaultJobOptions?: BaseJobOptions;
  prefix?: string;
  streams?: StreamsOptions;
}

interface WorkerOptions {
  concurrency?: number;
  limiter?: RateLimiterOptions;
  maxStalledCount?: number;
  stalledInterval?: number;
  removeOnComplete?: number | KeepJobs;
  removeOnFail?: number | KeepJobs;
}

Configuration

Utility Functions

Core utility functions for Redis operations, data manipulation, and queue management:

/** Promise-based delay with optional abort controller */
function delay(ms: number, abortController?: AbortController): Promise<void>;

/** Check if object is empty */
function isEmpty(obj: object): boolean;

/** Convert string array to key-value object */
function array2obj(arr: string[]): Record<string, string>;

/** Remove all queue data from Redis */
function removeAllQueueData(client: RedisClient, queueName: string, prefix?: string): Promise<void | boolean>;

/** Check if object is Redis instance */
function isRedisInstance(obj: any): obj is Redis | Cluster;

/** Check if Redis is cluster instance */
function isRedisCluster(obj: unknown): obj is Cluster;

/** Get UTF-8 byte length of string */
function lengthInUtf8Bytes(str: string): number;

/** Generate parent job key */
function getParentKey(opts: ParentOptions): string | undefined;

/** Telemetry tracing wrapper */
function trace<T>(telemetry: any, spanKind: SpanKind, queueName: string, operation: string, destination: string, callback: Function, srcPropagationMetadata?: string): Promise<T>;

Backoff Strategies

Built-in retry backoff strategy implementations:

class Backoffs {
  /** Fixed delay backoff strategy */
  static fixed(delay: number, jitter?: boolean): number;
  
  /** Exponential backoff strategy */
  static exponential(delay: number, jitter?: boolean): number;
}

Constants

Important constants used throughout BullMQ:

/** Short delay duration (100ms) */
const DELAY_TIME_1 = 100;

/** Long delay duration (5000ms) */
const DELAY_TIME_5 = 5000;

/** Queue events key suffix */
const QUEUE_EVENT_SUFFIX = ':qe';

/** Unrecoverable error marker */
const UNRECOVERABLE_ERROR = 'bullmq:unrecoverable';

Enumerations

Core enums for commands, errors, and telemetry:

enum ChildCommand {
  Init = 'init',
  Start = 'start',
  Stop = 'stop',
  GetChildrenValuesResponse = 'get-children-values:response',
  GetIgnoredChildrenFailuresResponse = 'get-ignored-children-failures:response',
  MoveToWaitingChildrenResponse = 'move-to-waiting-children:response'
}

enum ParentCommand {
  GetChildrenValues = 'get-children-values',
  GetIgnoredChildrenFailures = 'get-ignored-children-failures',
  MoveToWaitingChildren = 'move-to-waiting-children'
}

enum ErrorCode {
  JobNotExist = -1,
  JobLockNotExist = -2,
  JobNotInState = -3,
  JobPendingChildren = -4,
  ParentJobNotExist = -5,
  JobLockMismatch = -6,
  ParentJobCannotBeReplaced = -7,
  JobBelongsToJobScheduler = -8,
  JobHasFailedChildren = -9
}

enum SpanKind {
  PRODUCER = 0,
  CONSUMER = 1,
  INTERNAL = 2
}

enum TelemetryAttributes {
  QueueName = 'bullmq.queue.name',
  QueueOperation = 'bullmq.queue.operation'
}

enum MetricsTime {
  ONE_MINUTE = 60000,
  FIVE_MINUTES = 300000,
  FIFTEEN_MINUTES = 900000,
  THIRTY_MINUTES = 1800000,
  ONE_HOUR = 3600000,
  ONE_DAY = 86400000,
  ONE_WEEK = 604800000
}

Types

Core TypeScript types and interfaces used throughout the API:

type JobState = 'completed' | 'failed' | 'active' | 'delayed' | 'prioritized' | 'waiting' | 'waiting-children';

type FinishedStatus = 'completed' | 'failed';

type JobProgress = number | object;

type BackoffStrategy = 'fixed' | 'exponential' | ((attemptsMade: number, type: string, err: Error, job: Job) => number | Promise<number>);

type RepeatStrategy = (millis: number, opts?: RepeatOptions) => number | undefined;

type DeduplicationOptions = { id?: string };