Redis-based distributed queue system for Node.js providing robust message and job processing capabilities with features like job scheduling, retries, and flow orchestration.
npx @tessl/cli install tessl/npm-bullmq@5.58.0BullMQ is a high-performance, Redis-based distributed queue system for Node.js applications. It provides robust message and job processing capabilities with features like job scheduling, retries, prioritization, delayed jobs, rate limiting, and flow orchestration. Designed for mission-critical applications requiring reliable background job processing with built-in atomicity guarantees, comprehensive error handling, and horizontal scaling across multiple worker processes.
npm install bullmq or yarn add bullmqimport { Queue, Worker, Job, FlowProducer, QueueEvents, JobScheduler } from "bullmq";For CommonJS:
const { Queue, Worker, Job, FlowProducer, QueueEvents, JobScheduler } = require("bullmq");import { Queue, Worker } from "bullmq";
// Create a queue
const myQueue = new Queue("my queue");
// Add a job to the queue
await myQueue.add("my job", { data: "some data" });
// Create a worker to process jobs
const worker = new Worker("my queue", async (job) => {
console.log(`Processing job ${job.id} with data:`, job.data);
// Update progress
await job.updateProgress(50);
// Return result
return { result: "job completed" };
});
// Listen to worker events
worker.on("completed", (job) => {
console.log(`Job ${job.id} completed with result:`, job.returnvalue);
});
worker.on("failed", (job, err) => {
console.log(`Job ${job.id} failed:`, err.message);
});BullMQ is built around several key components:
Core queue operations for adding jobs, managing state, and controlling flow. Supports job prioritization, delayed execution, and bulk operations.
class Queue<T = any, R = any, N extends string = string> {
constructor(name: string, opts?: QueueOptions);
add(name: N, data: T, opts?: JobsOptions): Promise<Job<T, R, N>>;
addBulk(jobs: Array<{ name: N; data: T; opts?: JobsOptions }>): Promise<Job<T, R, N>[]>;
pause(): Promise<void>;
resume(): Promise<void>;
close(): Promise<void>;
}Worker-based job processing with configurable concurrency, error handling, and retry mechanisms. Supports rate limiting and stalled job recovery.
class Worker<T = any, R = any, N extends string = string> {
constructor(name: string, processor: Processor<T, R, N>, opts?: WorkerOptions);
run(): Promise<void>;
pause(): Promise<void>;
resume(): Promise<void>;
close(): Promise<void>;
}
type Processor<T = any, R = any, N extends string = string> = (
job: Job<T, R, N>,
token?: string
) => Promise<R>;Individual job management including state transitions, progress tracking, retry logic, and metadata handling.
class Job<T = any, R = any, N extends string = string> {
readonly id: string;
readonly name: N;
readonly data: T;
readonly opts: JobsOptions;
updateProgress(progress: number | object): Promise<void>;
log(row: string): Promise<number>;
remove(): Promise<void>;
}Complex workflow management with job dependencies across multiple queues. Enables sophisticated pipeline architectures with conditional execution.
class FlowProducer {
constructor(opts?: FlowProducerOptions);
add(flow: FlowJob, opts?: FlowOpts): Promise<JobNode>;
addBulk(flows: FlowJob[], opts?: FlowOpts): Promise<JobNode[]>;
close(): Promise<void>;
}
interface FlowJob {
name: string;
queueName: string;
data?: any;
opts?: JobsOptions;
children?: FlowJob[];
}Advanced job scheduling and repeatable job management. Enables cron-based scheduling, interval-based jobs, and complex recurring patterns.
class JobScheduler {
constructor(name: string, opts: RepeatBaseOptions);
upsertJobScheduler<T, R, N extends string>(
jobSchedulerId: string,
repeatOpts: RepeatOptions,
jobName: N,
jobData: T,
opts: JobSchedulerTemplateOptions,
options: { override: boolean; producerId?: string }
): Promise<Job<T, R, N> | undefined>;
removeJobScheduler(jobSchedulerId: string): Promise<number>;
getSchedulersCount(): Promise<number>;
}Real-time queue event monitoring via Redis streams for observability and debugging. Provides comprehensive event tracking across all queue operations.
class QueueEvents {
constructor(queueName: string, opts?: QueueEventsOptions);
run(): Promise<void>;
close(): Promise<void>;
}Specialized error classes for controlling job flow and retry behavior. Enables fine-grained error handling strategies.
class UnrecoverableError extends Error {
constructor(message: string);
}
class DelayedError extends Error {
constructor(delay?: number);
}
class RateLimitError extends Error {
constructor(message: string, delay?: number);
}Comprehensive configuration options for queues, workers, jobs, and Redis connections. Supports advanced features like rate limiting, metrics, and telemetry.
interface QueueOptions {
connection?: ConnectionOptions;
defaultJobOptions?: BaseJobOptions;
prefix?: string;
streams?: StreamsOptions;
}
interface WorkerOptions {
concurrency?: number;
limiter?: RateLimiterOptions;
maxStalledCount?: number;
stalledInterval?: number;
removeOnComplete?: number | KeepJobs;
removeOnFail?: number | KeepJobs;
}Core utility functions for Redis operations, data manipulation, and queue management:
/** Promise-based delay with optional abort controller */
function delay(ms: number, abortController?: AbortController): Promise<void>;
/** Check if object is empty */
function isEmpty(obj: object): boolean;
/** Convert string array to key-value object */
function array2obj(arr: string[]): Record<string, string>;
/** Remove all queue data from Redis */
function removeAllQueueData(client: RedisClient, queueName: string, prefix?: string): Promise<void | boolean>;
/** Check if object is Redis instance */
function isRedisInstance(obj: any): obj is Redis | Cluster;
/** Check if Redis is cluster instance */
function isRedisCluster(obj: unknown): obj is Cluster;
/** Get UTF-8 byte length of string */
function lengthInUtf8Bytes(str: string): number;
/** Generate parent job key */
function getParentKey(opts: ParentOptions): string | undefined;
/** Telemetry tracing wrapper */
function trace<T>(telemetry: any, spanKind: SpanKind, queueName: string, operation: string, destination: string, callback: Function, srcPropagationMetadata?: string): Promise<T>;Built-in retry backoff strategy implementations:
class Backoffs {
/** Fixed delay backoff strategy */
static fixed(delay: number, jitter?: boolean): number;
/** Exponential backoff strategy */
static exponential(delay: number, jitter?: boolean): number;
}Important constants used throughout BullMQ:
/** Short delay duration (100ms) */
const DELAY_TIME_1 = 100;
/** Long delay duration (5000ms) */
const DELAY_TIME_5 = 5000;
/** Queue events key suffix */
const QUEUE_EVENT_SUFFIX = ':qe';
/** Unrecoverable error marker */
const UNRECOVERABLE_ERROR = 'bullmq:unrecoverable';Core enums for commands, errors, and telemetry:
enum ChildCommand {
Init = 'init',
Start = 'start',
Stop = 'stop',
GetChildrenValuesResponse = 'get-children-values:response',
GetIgnoredChildrenFailuresResponse = 'get-ignored-children-failures:response',
MoveToWaitingChildrenResponse = 'move-to-waiting-children:response'
}
enum ParentCommand {
GetChildrenValues = 'get-children-values',
GetIgnoredChildrenFailures = 'get-ignored-children-failures',
MoveToWaitingChildren = 'move-to-waiting-children'
}
enum ErrorCode {
JobNotExist = -1,
JobLockNotExist = -2,
JobNotInState = -3,
JobPendingChildren = -4,
ParentJobNotExist = -5,
JobLockMismatch = -6,
ParentJobCannotBeReplaced = -7,
JobBelongsToJobScheduler = -8,
JobHasFailedChildren = -9
}
enum SpanKind {
PRODUCER = 0,
CONSUMER = 1,
INTERNAL = 2
}
enum TelemetryAttributes {
QueueName = 'bullmq.queue.name',
QueueOperation = 'bullmq.queue.operation'
}
enum MetricsTime {
ONE_MINUTE = 60000,
FIVE_MINUTES = 300000,
FIFTEEN_MINUTES = 900000,
THIRTY_MINUTES = 1800000,
ONE_HOUR = 3600000,
ONE_DAY = 86400000,
ONE_WEEK = 604800000
}Core TypeScript types and interfaces used throughout the API:
type JobState = 'completed' | 'failed' | 'active' | 'delayed' | 'prioritized' | 'waiting' | 'waiting-children';
type FinishedStatus = 'completed' | 'failed';
type JobProgress = number | object;
type BackoffStrategy = 'fixed' | 'exponential' | ((attemptsMade: number, type: string, err: Error, job: Job) => number | Promise<number>);
type RepeatStrategy = (millis: number, opts?: RepeatOptions) => number | undefined;
type DeduplicationOptions = { id?: string };