or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

tessl/npm-bull

Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/bull@4.16.x

To install, run

npx @tessl/cli install tessl/npm-bull@4.16.0

index.mddocs/

Bull Job Queue

Bull is a Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries. The library offers a comprehensive job management system with support for concurrent processing, job state tracking, and various queue patterns including LIFO and FIFO processing.

Package Information

  • Package Name: bull
  • Package Type: npm
  • Language: JavaScript (with TypeScript definitions)
  • Installation: npm install bull

Core Imports

const Queue = require('bull');
const { Job } = Queue; // Access Job class
const { utils } = Queue; // Access utility functions

For ES modules:

import Queue from 'bull';
// or
import * as Bull from 'bull';
const { Job, utils } = Bull;

Basic Usage

const Queue = require('bull');
const emailQueue = new Queue('email processing', 'redis://127.0.0.1:6379');

// Add jobs to the queue
emailQueue.add('welcome email', {
  email: 'user@example.com',
  name: 'John Doe'
});

// Process jobs
emailQueue.process('welcome email', async (job) => {
  const { email, name } = job.data;
  console.log(`Processing welcome email for ${name} at ${email}`);
  // Send email logic here
  return { sent: true, timestamp: Date.now() };
});

// Listen for completed jobs
emailQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

Architecture

Bull is built around several key components:

  • Queue Class: Main interface for creating queues, adding jobs, and managing processing
  • Job Class: Represents individual jobs with state management, progress tracking, and lifecycle methods
  • Redis Backend: Uses Redis for reliable job storage, state persistence, and coordination between workers
  • Event System: Comprehensive event-driven architecture for monitoring job lifecycle
  • Process Management: Support for child processes, sandboxing, and concurrent processing
  • Scheduling Engine: Advanced scheduling with cron-like syntax and delayed execution

Capabilities

Queue Management

Core queue creation, configuration, and lifecycle management for Redis-based job queues.

function Queue(name, url?, opts?): Queue;
function Queue(name, opts?): Queue;

interface QueueOptions {
  redis?: RedisOptions | string;
  prefix?: string;
  settings?: AdvancedSettings;
  limiter?: RateLimiter;
  defaultJobOptions?: JobOptions;
  metrics?: MetricsOpts;
  createClient?: (type: 'client' | 'subscriber' | 'bclient', redisOpts?: RedisOptions) => Redis.Redis;
}

Queue Management

Job Processing

Define job processors with concurrency control, named job handling, and error management.

process(callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;

type ProcessCallbackFunction<T> = (job: Job<T>, done?: DoneCallback) => void | Promise<void>;
type DoneCallback = (error?: Error | null, value?: any) => void;

Job Processing

Job Creation and Management

Add jobs to queues with various options, bulk operations, and lifecycle management.

add(data: any, opts?: JobOptions): Promise<Job>;
add(name: string, data: any, opts?: JobOptions): Promise<Job>;
addBulk(jobs: Array<{name?: string, data: any, opts?: JobOptions}>): Promise<Job[]>;

interface JobOptions {
  priority?: number;
  delay?: number;
  attempts?: number;
  repeat?: RepeatOptions;
  backoff?: number | BackoffOptions;
  lifo?: boolean;
  timeout?: number;
  jobId?: string | number;
  removeOnComplete?: boolean | number | KeepJobsOptions;
  removeOnFail?: boolean | number | KeepJobsOptions;
}

Job Creation and Management

Job State and Control

Individual job state management, progress tracking, retry logic, and job manipulation.

class Job {
  id: string | number;
  name: string;
  data: any;
  opts: JobOptions;
  progress(): any;
  progress(value: any): Promise<void>;
  remove(): Promise<void>;
  retry(): Promise<void>;
  isCompleted(): Promise<boolean>;
  isFailed(): Promise<boolean>;
  getState(): Promise<JobStatus>;
}

Job State and Control

Queue Monitoring and Statistics

Comprehensive monitoring, metrics collection, and queue introspection capabilities.

getJobCounts(): Promise<JobCounts>;
getWaiting(start?: number, end?: number): Promise<Job[]>;
getActive(start?: number, end?: number): Promise<Job[]>;
getCompleted(start?: number, end?: number): Promise<Job[]>;
getFailed(start?: number, end?: number): Promise<Job[]>;
getMetrics(type: 'completed' | 'failed', start?: number, end?: number): Promise<MetricsData>;

interface JobCounts {
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  waiting: number;
}

Queue Monitoring

Repeatable Jobs and Scheduling

Advanced job scheduling with cron syntax, repeatable patterns, and time-based execution control.

interface RepeatOptions {
  cron?: string;
  every?: number;
  tz?: string;
  endDate?: Date | string | number;
  limit?: number;
  count?: number;
}

getRepeatableJobs(start?: number, end?: number, asc?: boolean): Promise<JobInformation[]>;
removeRepeatable(repeat: RepeatOptions): Promise<void>;
removeRepeatable(name: string, repeat: RepeatOptions): Promise<void>;

Repeatable Jobs

Event System and Monitoring

Comprehensive event-driven architecture for real-time job lifecycle monitoring and queue state changes.

// Queue Events
on(event: 'waiting', callback: (jobId: string | number) => void): Queue;
on(event: 'active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
on(event: 'completed', callback: (job: Job, result: any) => void): Queue;
on(event: 'failed', callback: (job: Job, error: Error) => void): Queue;
on(event: 'progress', callback: (job: Job, progress: any) => void): Queue;
on(event: 'stalled', callback: (job: Job) => void): Queue;
on(event: 'paused' | 'resumed' | 'drained', callback: () => void): Queue;

Event System

Types

type JobId = string | number;
type JobStatus = 'completed' | 'waiting' | 'active' | 'delayed' | 'failed' | 'paused';

interface AdvancedSettings {
  lockDuration?: number;
  lockRenewTime?: number; 
  stalledInterval?: number;
  maxStalledCount?: number;
  guardInterval?: number;
  retryProcessDelay?: number;
  drainDelay?: number;
  backoffStrategies?: { [key: string]: (attemptsMade: number, err: Error, options?: any) => number };
}

interface RateLimiter {
  max: number;
  duration: number;
  bounceBack?: boolean;
  groupKey?: string;
}

interface BackoffOptions {
  type: string;
  delay?: number;
  options?: any;
}

interface KeepJobsOptions {
  age?: number;
  count?: number;
}

interface MetricsOpts {
  maxDataPoints?: number;
}

interface JobInformation {
  key: string;
  name: string;
  id?: string;
  endDate?: number;
  tz?: string;
  cron: string;
  every: number;
  next: number;
}

interface MetricsData {
  meta: {
    count: number;
    prevTS: number;
    prevCount: number;
  };
  data: number[];
  count: number;
}

// Redis-related types (from ioredis)
interface RedisOptions {
  host?: string;
  port?: number;
  db?: number;
  password?: string;
  connectTimeout?: number;
  retryStrategy?: (times: number) => number | null;
  // Additional ioredis options...
}

namespace Redis {
  interface Redis {
    // Redis client interface (ioredis)
    status: string;
    options: RedisOptions;
    // Additional Redis client methods...
  }
  
  interface Pipeline {
    // Redis pipeline interface
    exec(): Promise<any>;
    lpush(key: string, value: string): Pipeline;
    // Additional pipeline methods...
  }
}