or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

queue-management.mddocs/

Queue Management

Core queue creation, configuration, and lifecycle management for Redis-based job queues.

Capabilities

Queue Constructor

Creates a new Queue instance with Redis backend for reliable job processing.

/**
 * Creates a new Queue instance with Redis backend
 * @param name - Unique name for the queue
 * @param url - Redis connection URL (optional)
 * @param opts - Queue configuration options
 * @returns Queue instance
 */
function Queue(name: string, url?: string, opts?: QueueOptions): Queue;
function Queue(name: string, opts?: QueueOptions): Queue;

// Can also be called with 'new' keyword
new Queue(name: string, url?: string, opts?: QueueOptions): Queue;
new Queue(name: string, opts?: QueueOptions): Queue;

interface QueueOptions {
  /** Redis connection options or connection string */
  redis?: RedisOptions | string;
  /** Key prefix for Redis keys (default: 'bull') */
  prefix?: string;
  /** Advanced queue settings */
  settings?: AdvancedSettings;
  /** Rate limiting configuration */
  limiter?: RateLimiter;
  /** Default options for all jobs in this queue */
  defaultJobOptions?: JobOptions;
  /** Metrics collection configuration */
  metrics?: MetricsOpts;
  /** Custom Redis client factory function */
  createClient?: (type: 'client' | 'subscriber' | 'bclient', redisOpts?: RedisOptions) => Redis.Redis;
  /** Skip Redis version validation check */
  skipVersionCheck?: boolean;
}

Usage Examples:

const Queue = require('bull');

// Basic queue with default Redis connection
const basicQueue = new Queue('my queue');

// Queue with Redis URL
const remoteQueue = new Queue('remote queue', 'redis://user:pass@redis-server:6379');

// Queue with detailed configuration
const configuredQueue = new Queue('configured queue', {
  redis: {
    host: 'localhost',
    port: 6379,
    db: 0,
    password: 'secret'
  },
  prefix: 'myapp',
  settings: {
    stalledInterval: 30000,
    maxStalledCount: 1
  },
  defaultJobOptions: {
    removeOnComplete: 10,
    removeOnFail: 5
  }
});

Queue Properties

Access queue properties and Redis clients.

interface Queue {
  /** Queue name */
  readonly name: string;
  /** Main Redis client for queue operations */
  readonly client: Redis.Redis;
  /** Array of all Redis clients used by the queue */
  readonly clients: Redis.Redis[];
}

Queue Readiness

Wait for queue to be ready for operations.

/**
 * Wait for the queue to be ready for operations
 * @returns Promise that resolves to the queue instance when ready
 */
isReady(): Promise<Queue>;

Usage Example:

const queue = new Queue('my queue');

await queue.isReady();
console.log('Queue is ready for operations');

// Or using then()
queue.isReady().then((readyQueue) => {
  console.log(`${readyQueue.name} is ready`);
});

Queue Control

Control queue processing state.

/**
 * Pause the queue to stop processing new jobs
 * @param isLocal - If true, pause only this worker instance
 * @param doNotWaitActive - If true, don't wait for active jobs to finish
 * @returns Promise that resolves when queue is paused
 */
pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise<void>;

/**
 * Resume the paused queue
 * @param isLocal - If true, resume only this worker instance
 * @returns Promise that resolves when queue is resumed
 */
resume(isLocal?: boolean): Promise<void>;

/**
 * Check if the queue is paused
 * @param isLocal - If true, check only this worker instance
 * @returns Promise that resolves to pause status
 */
isPaused(isLocal?: boolean): Promise<boolean>;

Usage Examples:

// Pause queue globally (affects all workers)
await queue.pause();
console.log('Queue paused');

// Pause only this worker
await queue.pause(true);

// Resume queue
await queue.resume();

// Check pause status
const isPaused = await queue.isPaused();
if (isPaused) {
  console.log('Queue is currently paused');
}

Queue Cleanup

Clean and close queue operations.

/**
 * Remove all waiting jobs from the queue
 * @returns Promise that resolves when queue is emptied
 */
empty(): Promise<void>;

/**
 * Gracefully close the queue and Redis connections
 * @param doNotWaitJobs - If true, don't wait for active jobs to finish
 * @returns Promise that resolves when queue is closed
 */
close(doNotWaitJobs?: boolean): Promise<void>;

/**
 * Completely destroy the queue and all its data (irreversible)
 * @param opts - Obliteration options
 * @returns Promise that resolves when queue is obliterated
 */
obliterate(opts?: { force: boolean }): Promise<void>;

Usage Examples:

// Remove all waiting jobs
await queue.empty();
console.log('All waiting jobs removed');

// Graceful shutdown
await queue.close();
console.log('Queue closed gracefully');

// Force close without waiting for jobs
await queue.close(true);

// Completely destroy queue (use with caution!)
await queue.obliterate({ force: true });

Redis Key Management

Generate Redis keys and manage transactions.

/**
 * Generate Redis key for specific queue type
 * @param queueType - Type of queue data structure
 * @returns Redis key string
 */
toKey(queueType: string): string;

/**
 * Create Redis multi/pipeline for atomic operations
 * @returns Redis pipeline for batched operations
 */
multi(): Redis.Pipeline;

/**
 * Get queue name encoded in base64
 * @returns Base64 encoded queue name
 */
base64Name(): string;

/**
 * Get client name with prefix
 * @returns Prefixed client name
 */
clientName(): string;

Usage Examples:

// Generate Redis keys
const waitingKey = queue.toKey('waiting');  // 'bull:myqueue:waiting'
const activeKey = queue.toKey('active');    // 'bull:myqueue:active'

// Create atomic transaction
const multi = queue.multi();
multi.lpush('somekey', 'value');
multi.exec();

// Get encoded names
const encodedName = queue.base64Name();
const clientName = queue.clientName();

Worker Management

Manage and inspect worker processes.

/**
 * Set worker name for this client
 * @returns Promise that resolves when name is set
 */
setWorkerName(): Promise<any>;

/**
 * Get information about active workers
 * @returns Promise that resolves to array of worker information
 */
getWorkers(): Promise<Array<{ [key: string]: string }>>;

/**
 * Parse Redis client list string into worker information
 * @param list - Redis CLIENT LIST response
 * @returns Parsed worker information arrays
 */
parseClientList(list: string): Array<Array<{ [key: string]: string }>>;

/**
 * Wait for all currently active jobs to finish
 * @returns Promise that resolves when all active jobs complete
 */
whenCurrentJobsFinished(): Promise<void>;

Usage Examples:

// Set worker name
await queue.setWorkerName();

// Get active workers
const workers = await queue.getWorkers();
console.log(`Active workers: ${workers.length}`);

// Wait for jobs to finish before shutdown
await queue.whenCurrentJobsFinished();
console.log('All active jobs completed');

Connection Management

Control Redis connections and queue lifecycle.

/**
 * Disconnect Redis clients without closing the queue completely
 * @returns Promise that resolves when disconnected
 */
disconnect(): Promise<void>;

/**
 * Retry all failed jobs with optional count limit
 * @param opts - Retry options
 * @returns Promise that resolves when retry is complete
 */
retryJobs(opts?: { count?: number }): Promise<void>;

/**
 * Remove a specific debounce key
 * @param id - Debounce ID to remove
 * @returns Promise that resolves to number of keys removed
 */
removeDebounceKey(id: string): Promise<number>;

/**
 * Get count of stalled jobs (useful for monitoring)
 * @returns Promise that resolves to stalled job count
 */
getStalledCount(): Promise<number>;

/**
 * Get job IDs in ranges by job status types
 * @param types - Array of job status types to retrieve
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1)
 * @param asc - Ascending order (default: false)
 * @returns Promise that resolves to array of job IDs
 */
getRanges(types: JobStatus[], start?: number, end?: number, asc?: boolean): Promise<string[]>;

Usage Examples:

// Disconnect from Redis without destroying queue
await queue.disconnect();
console.log('Disconnected from Redis');

// Retry all failed jobs
await queue.retryJobs();
console.log('All failed jobs retried');

// Retry only first 10 failed jobs
await queue.retryJobs({ count: 10 });

// Remove specific debounce key
await queue.removeDebounceKey('user-email-12345');

// Monitor stalled jobs
const stalledCount = await queue.getStalledCount();
if (stalledCount > 0) {
  console.log(`Found ${stalledCount} stalled jobs`);
}

// Get job IDs by status
const activeJobIds = await queue.getRanges(['active'], 0, 10);
const waitingAndDelayedIds = await queue.getRanges(['waiting', 'delayed']);

Configuration Types

interface AdvancedSettings {
  /** Key expiration time for job locks (default: 30000) */
  lockDuration?: number;
  /** Interval to renew job locks (default: lockDuration / 2) */
  lockRenewTime?: number;
  /** How often to check for stalled jobs (default: 30000) */
  stalledInterval?: number;
  /** Max times a stalled job can be recovered (default: 1) */
  maxStalledCount?: number;
  /** Poll interval for delayed jobs (default: 5000) */
  guardInterval?: number;
  /** Delay before processing next job after error (default: 5000) */
  retryProcessDelay?: number;
  /** Timeout for drained state (default: 5) */
  drainDelay?: number;
  /** Custom backoff strategies */
  backoffStrategies?: { [key: string]: (attemptsMade: number, err: Error, options?: any) => number };
  /** Share child process pool across queues */
  isSharedChildPool?: boolean;
}

interface RateLimiter {
  /** Maximum number of jobs */
  max: number;
  /** Per duration in milliseconds */
  duration: number;
  /** When rate limited, keep jobs in waiting queue instead of delayed */
  bounceBack?: boolean;
  /** Group jobs by this key from job data */
  groupKey?: string;
}

interface MetricsOpts {
  /** Maximum number of data points to collect (default: 1440) */
  maxDataPoints?: number;
}