or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

queue-management.mddocs/

Queue Management

Core queue operations for adding jobs, managing state, and controlling flow in BullMQ. The Queue class provides the primary interface for job management with support for prioritization, delayed execution, and bulk operations.

Capabilities

Queue Class

Primary interface for queue operations with comprehensive job management capabilities.

/**
 * Main queue class for managing jobs and queue operations
 */
class Queue<T = any, R = any, N extends string = string> {
  constructor(name: string, opts?: QueueOptions);
  
  /** Add a single job to the queue */
  add(name: N, data: T, opts?: JobsOptions): Promise<Job<T, R, N>>;
  
  /** Add multiple jobs to the queue in a single operation */
  addBulk(jobs: Array<{ name: N; data: T; opts?: JobsOptions }>): Promise<Job<T, R, N>[]>;
  
  /** Get a job by its ID */
  getJob(jobId: string): Promise<Job<T, R, N> | undefined>;
  
  /** Get multiple jobs by state and range */
  getJobs(
    types?: JobState[],
    start?: number,
    end?: number,
    asc?: boolean
  ): Promise<Job<T, R, N>[]>;
  
  /** Get counts of jobs by state */
  getJobCounts(...types: JobType[]): Promise<{[index: string]: number}>;
  
  /** Get total count of jobs by specified types */
  getJobCountByTypes(...types: JobType[]): Promise<number>;
  
  /** Get job state by ID */
  getJobState(jobId: string): Promise<JobState | 'unknown'>;
  
  /** Get job logs for a specific job */
  getJobLogs(jobId: string, start?: number, end?: number, asc?: boolean): Promise<{logs: string[], count: number}>;
  
  /** Get specific job counts by state */
  getCompletedCount(): Promise<number>;
  getFailedCount(): Promise<number>;
  getDelayedCount(): Promise<number>;
  getActiveCount(): Promise<number>;
  getPrioritizedCount(): Promise<number>;
  getWaitingCount(): Promise<number>;
  getWaitingChildrenCount(): Promise<number>;
  
  /** Get jobs by specific states */
  getWaiting(start?: number, end?: number): Promise<Job[]>;
  getActive(start?: number, end?: number): Promise<Job[]>;
  getCompleted(start?: number, end?: number): Promise<Job[]>;
  getFailed(start?: number, end?: number): Promise<Job[]>;
  getDelayed(start?: number, end?: number): Promise<Job[]>;
  getPrioritized(start?: number, end?: number): Promise<Job[]>;
  getWaitingChildren(start?: number, end?: number): Promise<Job[]>;
  
  /** Get worker information */
  getWorkers(): Promise<{[index: string]: string}[]>;
  getWorkersCount(): Promise<number>;
  
  /** Get rate limit TTL */
  getRateLimitTtl(maxJobs?: number): Promise<number>;
  
  /** Get deduplication job ID */
  getDeduplicationJobId(id: string): Promise<string | null>;
  
  /** Get job counts per priority level */
  getCountsPerPriority(priorities: number[]): Promise<{[index: string]: number}>;
  
  /** Get dependencies for parent jobs */
  getDependencies(parentId: string, type: 'processed' | 'pending', start: number, end: number): Promise<{
    items: {id: string, v?: any, err?: string}[],
    jobs: JobJsonRaw[],
    total: number
  }>;
  
  /** Get queue metrics */
  getMetrics(type: 'completed' | 'failed', start?: number, end?: number): Promise<Metrics>;
  
  /** Export Prometheus metrics */
  exportPrometheusMetrics(globalVariables?: Record<string, string>): Promise<string>;
  
  /** Clean old jobs from the queue */
  clean(grace: number, limit?: number, type?: string): Promise<Job<T, R, N>[]>;
  
  /** Completely remove all queue data */
  obliterate(opts?: { force?: boolean }): Promise<void>;
  
  /** Pause the queue - no new jobs will be processed */
  pause(): Promise<void>;
  
  /** Resume a paused queue */
  resume(): Promise<void>;
  
  /** Check if queue is paused */
  isPaused(): Promise<boolean>;
  
  /** Remove all jobs from queue */
  drain(delayed?: boolean): Promise<void>;
  
  /** Get repeatable/scheduled jobs */
  getRepeatableJobs(start?: number, end?: number, asc?: boolean): Promise<RepeatableJob[]>;
  
  /** Remove a repeatable job by its key */
  removeRepeatableByKey(key: string): Promise<void>;
  
  /** Close the queue and its connections */
  close(): Promise<void>;
  
  /** Wait until the queue is ready */
  waitUntilReady(): Promise<void>;
}

Usage Examples:

import { Queue } from "bullmq";

// Create a queue
const emailQueue = new Queue("email processing", {
  connection: {
    host: "localhost",
    port: 6379,
  },
  defaultJobOptions: {
    removeOnComplete: 100,
    removeOnFail: 50,
  },
});

// Add a single job
const job = await emailQueue.add("send email", {
  to: "user@example.com",
  subject: "Welcome!",
  body: "Thank you for signing up",
});

// Add job with options
const priorityJob = await emailQueue.add("urgent email", emailData, {
  priority: 10,
  delay: 5000, // 5 second delay
  attempts: 3,
  backoff: "exponential",
});

// Add multiple jobs
const jobs = await emailQueue.addBulk([
  { name: "welcome email", data: { userId: 1 } },
  { name: "newsletter", data: { userId: 2 } },
  { name: "reminder", data: { userId: 3 }, opts: { delay: 3600000 } }, // 1 hour delay
]);

// Get job counts
const counts = await emailQueue.getJobCounts();
console.log(`Active: ${counts.active}, Waiting: ${counts.waiting}, Completed: ${counts.completed}`);

// Clean completed jobs older than 1 hour
const cleanedJobs = await emailQueue.clean(3600000, 100, "completed");

// Pause and resume queue
await emailQueue.pause();
await emailQueue.resume();

Job Counts Interface

interface JobCounts {
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  waiting: number;
  paused: number;
  prioritized: number;
}

Additional Interfaces

interface JobJsonRaw {
  id: string;
  name: string;
  data: any;
  opts: any;
  progress: number | object;
  returnvalue?: any;
  stacktrace?: string[];
  timestamp: number;
  attemptsMade: number;
  processedOn?: number;
  finishedOn?: number;
  delay?: number;
}

interface Metrics {
  meta: {
    count: number;
    prevTS: number;
    prevCount: number;
  };
  data: any[];
  count: number;
}

Queue Options

interface QueueOptions {
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Default options applied to all jobs */
  defaultJobOptions?: BaseJobOptions;
  
  /** Key prefix for Redis keys (default: 'bull') */
  prefix?: string;
  
  /** Stream configuration options */
  streams?: StreamsOptions;
  
  /** Skip updating queue metadata */
  skipMetasUpdate?: boolean;
  
  /** Enable job scheduler */
  skipJobScheduler?: boolean;
}

Repeatable Job Interface

interface RepeatableJob {
  key: string;
  name: string;
  id?: string;
  endDate?: number;
  tz?: string;
  pattern?: string;
  every?: number;
  next: number;
}

Queue Events

The Queue class emits several events for monitoring queue state:

// Queue event types
interface QueueEvents {
  'waiting': (job: Job) => void;
  'active': (job: Job) => void;
  'completed': (job: Job, result: any) => void;
  'failed': (job: Job, error: Error) => void;
  'paused': () => void;
  'resumed': () => void;
  'cleaned': (jobs: Job[], type: string) => void;
  'progress': (job: Job, progress: number | object) => void;
  'removed': (job: Job) => void;
  'error': (error: Error) => void;
}

Event Usage:

// Listen to queue events
emailQueue.on("waiting", (job) => {
  console.log(`Job ${job.id} added to queue`);
});

emailQueue.on("completed", (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

emailQueue.on("failed", (job, error) => {
  console.log(`Job ${job.id} failed:`, error.message);
});

emailQueue.on("cleaned", (jobs, type) => {
  console.log(`Cleaned ${jobs.length} ${type} jobs`);
});

Advanced Queue Operations

// Drain all jobs (careful - this removes all jobs!)
await emailQueue.drain();

// Obliterate queue completely (removes all data)
await emailQueue.obliterate({ force: true });

// Get specific job
const job = await emailQueue.getJob("job-id-123");
if (job) {
  console.log("Job data:", job.data);
  console.log("Job state:", await job.getState());
}

// Get jobs by state
const activeJobs = await emailQueue.getJobs(["active"], 0, 10);
const waitingJobs = await emailQueue.getJobs(["waiting", "delayed"]);

// Get specific state job counts
const completedCount = await emailQueue.getCompletedCount();
const failedCount = await emailQueue.getFailedCount();
const activeCount = await emailQueue.getActiveCount();

// Get jobs by specific states
const waitingJobs = await emailQueue.getWaiting(0, 10);
const completedJobs = await emailQueue.getCompleted(0, 20);

// Get job logs
const jobLogs = await emailQueue.getJobLogs("job-123", 0, -1, true);
console.log(`Job has ${jobLogs.count} log entries:`, jobLogs.logs);

// Get job state
const jobState = await emailQueue.getJobState("job-123");
console.log(`Job is in state: ${jobState}`);

// Get worker information
const workers = await emailQueue.getWorkers();
const workerCount = await emailQueue.getWorkersCount();
console.log(`${workerCount} workers are available`);

// Get metrics
const metrics = await emailQueue.getMetrics("completed", 0, 10);
console.log("Completed job metrics:", metrics);

// Export Prometheus metrics
const prometheusMetrics = await emailQueue.exportPrometheusMetrics({
  service: "email-service",
  environment: "production"
});

// Clean failed jobs older than 24 hours, keep max 10
const cleanedFailed = await emailQueue.clean(24 * 60 * 60 * 1000, 10, "failed");

Connection Management

// Wait for queue to be ready
await emailQueue.waitUntilReady();

// Close queue and cleanup connections
await emailQueue.close();