or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

index.mddocs/

Bull Job Queue

Bull is a Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries. The library offers a comprehensive job management system with support for concurrent processing, job state tracking, and various queue patterns including LIFO and FIFO processing.

Package Information

  • Package Name: bull
  • Package Type: npm
  • Language: JavaScript (with TypeScript definitions)
  • Installation: npm install bull

Core Imports

const Queue = require('bull');
const { Job } = Queue; // Access Job class
const { utils } = Queue; // Access utility functions

For ES modules:

import Queue from 'bull';
// or
import * as Bull from 'bull';
const { Job, utils } = Bull;

Basic Usage

const Queue = require('bull');
const emailQueue = new Queue('email processing', 'redis://127.0.0.1:6379');

// Add jobs to the queue
emailQueue.add('welcome email', {
  email: 'user@example.com',
  name: 'John Doe'
});

// Process jobs
emailQueue.process('welcome email', async (job) => {
  const { email, name } = job.data;
  console.log(`Processing welcome email for ${name} at ${email}`);
  // Send email logic here
  return { sent: true, timestamp: Date.now() };
});

// Listen for completed jobs
emailQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

Architecture

Bull is built around several key components:

  • Queue Class: Main interface for creating queues, adding jobs, and managing processing
  • Job Class: Represents individual jobs with state management, progress tracking, and lifecycle methods
  • Redis Backend: Uses Redis for reliable job storage, state persistence, and coordination between workers
  • Event System: Comprehensive event-driven architecture for monitoring job lifecycle
  • Process Management: Support for child processes, sandboxing, and concurrent processing
  • Scheduling Engine: Advanced scheduling with cron-like syntax and delayed execution

Capabilities

Queue Management

Core queue creation, configuration, and lifecycle management for Redis-based job queues.

function Queue(name, url?, opts?): Queue;
function Queue(name, opts?): Queue;

interface QueueOptions {
  redis?: RedisOptions | string;
  prefix?: string;
  settings?: AdvancedSettings;
  limiter?: RateLimiter;
  defaultJobOptions?: JobOptions;
  metrics?: MetricsOpts;
  createClient?: (type: 'client' | 'subscriber' | 'bclient', redisOpts?: RedisOptions) => Redis.Redis;
}

Queue Management

Job Processing

Define job processors with concurrency control, named job handling, and error management.

process(callback: ProcessCallbackFunction): Promise<void>;
process(concurrency: number, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, callback: ProcessCallbackFunction): Promise<void>;
process(name: string, concurrency: number, callback: ProcessCallbackFunction): Promise<void>;

type ProcessCallbackFunction<T> = (job: Job<T>, done?: DoneCallback) => void | Promise<void>;
type DoneCallback = (error?: Error | null, value?: any) => void;

Job Processing

Job Creation and Management

Add jobs to queues with various options, bulk operations, and lifecycle management.

add(data: any, opts?: JobOptions): Promise<Job>;
add(name: string, data: any, opts?: JobOptions): Promise<Job>;
addBulk(jobs: Array<{name?: string, data: any, opts?: JobOptions}>): Promise<Job[]>;

interface JobOptions {
  priority?: number;
  delay?: number;
  attempts?: number;
  repeat?: RepeatOptions;
  backoff?: number | BackoffOptions;
  lifo?: boolean;
  timeout?: number;
  jobId?: string | number;
  removeOnComplete?: boolean | number | KeepJobsOptions;
  removeOnFail?: boolean | number | KeepJobsOptions;
}

Job Creation and Management

Job State and Control

Individual job state management, progress tracking, retry logic, and job manipulation.

class Job {
  id: string | number;
  name: string;
  data: any;
  opts: JobOptions;
  progress(): any;
  progress(value: any): Promise<void>;
  remove(): Promise<void>;
  retry(): Promise<void>;
  isCompleted(): Promise<boolean>;
  isFailed(): Promise<boolean>;
  getState(): Promise<JobStatus>;
}

Job State and Control

Queue Monitoring and Statistics

Comprehensive monitoring, metrics collection, and queue introspection capabilities.

getJobCounts(): Promise<JobCounts>;
getWaiting(start?: number, end?: number): Promise<Job[]>;
getActive(start?: number, end?: number): Promise<Job[]>;
getCompleted(start?: number, end?: number): Promise<Job[]>;
getFailed(start?: number, end?: number): Promise<Job[]>;
getMetrics(type: 'completed' | 'failed', start?: number, end?: number): Promise<MetricsData>;

interface JobCounts {
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  waiting: number;
}

Queue Monitoring

Repeatable Jobs and Scheduling

Advanced job scheduling with cron syntax, repeatable patterns, and time-based execution control.

interface RepeatOptions {
  cron?: string;
  every?: number;
  tz?: string;
  endDate?: Date | string | number;
  limit?: number;
  count?: number;
}

getRepeatableJobs(start?: number, end?: number, asc?: boolean): Promise<JobInformation[]>;
removeRepeatable(repeat: RepeatOptions): Promise<void>;
removeRepeatable(name: string, repeat: RepeatOptions): Promise<void>;

Repeatable Jobs

Event System and Monitoring

Comprehensive event-driven architecture for real-time job lifecycle monitoring and queue state changes.

// Queue Events
on(event: 'waiting', callback: (jobId: string | number) => void): Queue;
on(event: 'active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
on(event: 'completed', callback: (job: Job, result: any) => void): Queue;
on(event: 'failed', callback: (job: Job, error: Error) => void): Queue;
on(event: 'progress', callback: (job: Job, progress: any) => void): Queue;
on(event: 'stalled', callback: (job: Job) => void): Queue;
on(event: 'paused' | 'resumed' | 'drained', callback: () => void): Queue;

Event System

Types

type JobId = string | number;
type JobStatus = 'completed' | 'waiting' | 'active' | 'delayed' | 'failed' | 'paused';

interface AdvancedSettings {
  lockDuration?: number;
  lockRenewTime?: number; 
  stalledInterval?: number;
  maxStalledCount?: number;
  guardInterval?: number;
  retryProcessDelay?: number;
  drainDelay?: number;
  backoffStrategies?: { [key: string]: (attemptsMade: number, err: Error, options?: any) => number };
}

interface RateLimiter {
  max: number;
  duration: number;
  bounceBack?: boolean;
  groupKey?: string;
}

interface BackoffOptions {
  type: string;
  delay?: number;
  options?: any;
}

interface KeepJobsOptions {
  age?: number;
  count?: number;
}

interface MetricsOpts {
  maxDataPoints?: number;
}

interface JobInformation {
  key: string;
  name: string;
  id?: string;
  endDate?: number;
  tz?: string;
  cron: string;
  every: number;
  next: number;
}

interface MetricsData {
  meta: {
    count: number;
    prevTS: number;
    prevCount: number;
  };
  data: number[];
  count: number;
}

// Redis-related types (from ioredis)
interface RedisOptions {
  host?: string;
  port?: number;
  db?: number;
  password?: string;
  connectTimeout?: number;
  retryStrategy?: (times: number) => number | null;
  // Additional ioredis options...
}

namespace Redis {
  interface Redis {
    // Redis client interface (ioredis)
    status: string;
    options: RedisOptions;
    // Additional Redis client methods...
  }
  
  interface Pipeline {
    // Redis pipeline interface
    exec(): Promise<any>;
    lpush(key: string, value: string): Pipeline;
    // Additional pipeline methods...
  }
}