Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.
npx @tessl/cli install tessl/npm-bull@4.16.0Bull is a Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries. The library offers a comprehensive job management system with support for concurrent processing, job state tracking, and various queue patterns including LIFO and FIFO processing.
npm install bullconst Queue = require('bull');
const { Job } = Queue; // Access Job class
const { utils } = Queue; // Access utility functionsFor ES modules:
import Queue from 'bull';
// or
import * as Bull from 'bull';
const { Job, utils } = Bull;const Queue = require('bull');
const emailQueue = new Queue('email processing', 'redis://127.0.0.1:6379');
// Add jobs to the queue
emailQueue.add('welcome email', {
email: 'user@example.com',
name: 'John Doe'
});
// Process jobs
emailQueue.process('welcome email', async (job) => {
const { email, name } = job.data;
console.log(`Processing welcome email for ${name} at ${email}`);
// Send email logic here
return { sent: true, timestamp: Date.now() };
});
// Listen for completed jobs
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});Bull is built around several key components:
Core queue creation, configuration, and lifecycle management for Redis-based job queues.
function Queue(name, url?, opts?): Queue;
function Queue(name, opts?): Queue;
interface QueueOptions {
redis?: RedisOptions | string;
prefix?: string;
settings?: AdvancedSettings;
limiter?: RateLimiter;
defaultJobOptions?: JobOptions;
metrics?: MetricsOpts;
createClient?: (type: 'client' | 'subscriber' | 'bclient', redisOpts?: RedisOptions) => Redis.Redis;
}Define job processors with concurrency control, named job handling, and error management.
process(callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
type ProcessCallbackFunction<T> = (job: Job<T>, done?: DoneCallback) => void | Promise<void>;
type DoneCallback = (error?: Error | null, value?: any) => void;Add jobs to queues with various options, bulk operations, and lifecycle management.
add(data: any, opts?: JobOptions): Promise<Job>;
add(name: string, data: any, opts?: JobOptions): Promise<Job>;
addBulk(jobs: Array<{name?: string, data: any, opts?: JobOptions}>): Promise<Job[]>;
interface JobOptions {
priority?: number;
delay?: number;
attempts?: number;
repeat?: RepeatOptions;
backoff?: number | BackoffOptions;
lifo?: boolean;
timeout?: number;
jobId?: string | number;
removeOnComplete?: boolean | number | KeepJobsOptions;
removeOnFail?: boolean | number | KeepJobsOptions;
}Individual job state management, progress tracking, retry logic, and job manipulation.
class Job {
id: string | number;
name: string;
data: any;
opts: JobOptions;
progress(): any;
progress(value: any): Promise<void>;
remove(): Promise<void>;
retry(): Promise<void>;
isCompleted(): Promise<boolean>;
isFailed(): Promise<boolean>;
getState(): Promise<JobStatus>;
}Comprehensive monitoring, metrics collection, and queue introspection capabilities.
getJobCounts(): Promise<JobCounts>;
getWaiting(start?: number, end?: number): Promise<Job[]>;
getActive(start?: number, end?: number): Promise<Job[]>;
getCompleted(start?: number, end?: number): Promise<Job[]>;
getFailed(start?: number, end?: number): Promise<Job[]>;
getMetrics(type: 'completed' | 'failed', start?: number, end?: number): Promise<MetricsData>;
interface JobCounts {
active: number;
completed: number;
failed: number;
delayed: number;
waiting: number;
}Advanced job scheduling with cron syntax, repeatable patterns, and time-based execution control.
interface RepeatOptions {
cron?: string;
every?: number;
tz?: string;
endDate?: Date | string | number;
limit?: number;
count?: number;
}
getRepeatableJobs(start?: number, end?: number, asc?: boolean): Promise<JobInformation[]>;
removeRepeatable(repeat: RepeatOptions): Promise<void>;
removeRepeatable(name: string, repeat: RepeatOptions): Promise<void>;Comprehensive event-driven architecture for real-time job lifecycle monitoring and queue state changes.
// Queue Events
on(event: 'waiting', callback: (jobId: string | number) => void): Queue;
on(event: 'active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
on(event: 'completed', callback: (job: Job, result: any) => void): Queue;
on(event: 'failed', callback: (job: Job, error: Error) => void): Queue;
on(event: 'progress', callback: (job: Job, progress: any) => void): Queue;
on(event: 'stalled', callback: (job: Job) => void): Queue;
on(event: 'paused' | 'resumed' | 'drained', callback: () => void): Queue;type JobId = string | number;
type JobStatus = 'completed' | 'waiting' | 'active' | 'delayed' | 'failed' | 'paused';
interface AdvancedSettings {
lockDuration?: number;
lockRenewTime?: number;
stalledInterval?: number;
maxStalledCount?: number;
guardInterval?: number;
retryProcessDelay?: number;
drainDelay?: number;
backoffStrategies?: { [key: string]: (attemptsMade: number, err: Error, options?: any) => number };
}
interface RateLimiter {
max: number;
duration: number;
bounceBack?: boolean;
groupKey?: string;
}
interface BackoffOptions {
type: string;
delay?: number;
options?: any;
}
interface KeepJobsOptions {
age?: number;
count?: number;
}
interface MetricsOpts {
maxDataPoints?: number;
}
interface JobInformation {
key: string;
name: string;
id?: string;
endDate?: number;
tz?: string;
cron: string;
every: number;
next: number;
}
interface MetricsData {
meta: {
count: number;
prevTS: number;
prevCount: number;
};
data: number[];
count: number;
}
// Redis-related types (from ioredis)
interface RedisOptions {
host?: string;
port?: number;
db?: number;
password?: string;
connectTimeout?: number;
retryStrategy?: (times: number) => number | null;
// Additional ioredis options...
}
namespace Redis {
interface Redis {
// Redis client interface (ioredis)
status: string;
options: RedisOptions;
// Additional Redis client methods...
}
interface Pipeline {
// Redis pipeline interface
exec(): Promise<any>;
lpush(key: string, value: string): Pipeline;
// Additional pipeline methods...
}
}