Core queue creation, configuration, and lifecycle management for Redis-based job queues.
Creates a new Queue instance with Redis backend for reliable job processing.
/**
* Creates a new Queue instance with Redis backend
* @param name - Unique name for the queue
* @param url - Redis connection URL (optional)
* @param opts - Queue configuration options
* @returns Queue instance
*/
function Queue(name: string, url?: string, opts?: QueueOptions): Queue;
function Queue(name: string, opts?: QueueOptions): Queue;
// Can also be called with 'new' keyword
new Queue(name: string, url?: string, opts?: QueueOptions): Queue;
new Queue(name: string, opts?: QueueOptions): Queue;
interface QueueOptions {
/** Redis connection options or connection string */
redis?: RedisOptions | string;
/** Key prefix for Redis keys (default: 'bull') */
prefix?: string;
/** Advanced queue settings */
settings?: AdvancedSettings;
/** Rate limiting configuration */
limiter?: RateLimiter;
/** Default options for all jobs in this queue */
defaultJobOptions?: JobOptions;
/** Metrics collection configuration */
metrics?: MetricsOpts;
/** Custom Redis client factory function */
createClient?: (type: 'client' | 'subscriber' | 'bclient', redisOpts?: RedisOptions) => Redis.Redis;
/** Skip Redis version validation check */
skipVersionCheck?: boolean;
}Usage Examples:
const Queue = require('bull');
// Basic queue with default Redis connection
const basicQueue = new Queue('my queue');
// Queue with Redis URL
const remoteQueue = new Queue('remote queue', 'redis://user:pass@redis-server:6379');
// Queue with detailed configuration
const configuredQueue = new Queue('configured queue', {
redis: {
host: 'localhost',
port: 6379,
db: 0,
password: 'secret'
},
prefix: 'myapp',
settings: {
stalledInterval: 30000,
maxStalledCount: 1
},
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5
}
});Access queue properties and Redis clients.
interface Queue {
/** Queue name */
readonly name: string;
/** Main Redis client for queue operations */
readonly client: Redis.Redis;
/** Array of all Redis clients used by the queue */
readonly clients: Redis.Redis[];
}Wait for queue to be ready for operations.
/**
* Wait for the queue to be ready for operations
* @returns Promise that resolves to the queue instance when ready
*/
isReady(): Promise<Queue>;Usage Example:
const queue = new Queue('my queue');
await queue.isReady();
console.log('Queue is ready for operations');
// Or using then()
queue.isReady().then((readyQueue) => {
console.log(`${readyQueue.name} is ready`);
});Control queue processing state.
/**
* Pause the queue to stop processing new jobs
* @param isLocal - If true, pause only this worker instance
* @param doNotWaitActive - If true, don't wait for active jobs to finish
* @returns Promise that resolves when queue is paused
*/
pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise<void>;
/**
* Resume the paused queue
* @param isLocal - If true, resume only this worker instance
* @returns Promise that resolves when queue is resumed
*/
resume(isLocal?: boolean): Promise<void>;
/**
* Check if the queue is paused
* @param isLocal - If true, check only this worker instance
* @returns Promise that resolves to pause status
*/
isPaused(isLocal?: boolean): Promise<boolean>;Usage Examples:
// Pause queue globally (affects all workers)
await queue.pause();
console.log('Queue paused');
// Pause only this worker
await queue.pause(true);
// Resume queue
await queue.resume();
// Check pause status
const isPaused = await queue.isPaused();
if (isPaused) {
console.log('Queue is currently paused');
}Clean and close queue operations.
/**
* Remove all waiting jobs from the queue
* @returns Promise that resolves when queue is emptied
*/
empty(): Promise<void>;
/**
* Gracefully close the queue and Redis connections
* @param doNotWaitJobs - If true, don't wait for active jobs to finish
* @returns Promise that resolves when queue is closed
*/
close(doNotWaitJobs?: boolean): Promise<void>;
/**
* Completely destroy the queue and all its data (irreversible)
* @param opts - Obliteration options
* @returns Promise that resolves when queue is obliterated
*/
obliterate(opts?: { force: boolean }): Promise<void>;Usage Examples:
// Remove all waiting jobs
await queue.empty();
console.log('All waiting jobs removed');
// Graceful shutdown
await queue.close();
console.log('Queue closed gracefully');
// Force close without waiting for jobs
await queue.close(true);
// Completely destroy queue (use with caution!)
await queue.obliterate({ force: true });Generate Redis keys and manage transactions.
/**
* Generate Redis key for specific queue type
* @param queueType - Type of queue data structure
* @returns Redis key string
*/
toKey(queueType: string): string;
/**
* Create Redis multi/pipeline for atomic operations
* @returns Redis pipeline for batched operations
*/
multi(): Redis.Pipeline;
/**
* Get queue name encoded in base64
* @returns Base64 encoded queue name
*/
base64Name(): string;
/**
* Get client name with prefix
* @returns Prefixed client name
*/
clientName(): string;Usage Examples:
// Generate Redis keys
const waitingKey = queue.toKey('waiting'); // 'bull:myqueue:waiting'
const activeKey = queue.toKey('active'); // 'bull:myqueue:active'
// Create atomic transaction
const multi = queue.multi();
multi.lpush('somekey', 'value');
multi.exec();
// Get encoded names
const encodedName = queue.base64Name();
const clientName = queue.clientName();Manage and inspect worker processes.
/**
* Set worker name for this client
* @returns Promise that resolves when name is set
*/
setWorkerName(): Promise<any>;
/**
* Get information about active workers
* @returns Promise that resolves to array of worker information
*/
getWorkers(): Promise<Array<{ [key: string]: string }>>;
/**
* Parse Redis client list string into worker information
* @param list - Redis CLIENT LIST response
* @returns Parsed worker information arrays
*/
parseClientList(list: string): Array<Array<{ [key: string]: string }>>;
/**
* Wait for all currently active jobs to finish
* @returns Promise that resolves when all active jobs complete
*/
whenCurrentJobsFinished(): Promise<void>;Usage Examples:
// Set worker name
await queue.setWorkerName();
// Get active workers
const workers = await queue.getWorkers();
console.log(`Active workers: ${workers.length}`);
// Wait for jobs to finish before shutdown
await queue.whenCurrentJobsFinished();
console.log('All active jobs completed');Control Redis connections and queue lifecycle.
/**
* Disconnect Redis clients without closing the queue completely
* @returns Promise that resolves when disconnected
*/
disconnect(): Promise<void>;
/**
* Retry all failed jobs with optional count limit
* @param opts - Retry options
* @returns Promise that resolves when retry is complete
*/
retryJobs(opts?: { count?: number }): Promise<void>;
/**
* Remove a specific debounce key
* @param id - Debounce ID to remove
* @returns Promise that resolves to number of keys removed
*/
removeDebounceKey(id: string): Promise<number>;
/**
* Get count of stalled jobs (useful for monitoring)
* @returns Promise that resolves to stalled job count
*/
getStalledCount(): Promise<number>;
/**
* Get job IDs in ranges by job status types
* @param types - Array of job status types to retrieve
* @param start - Start index (default: 0)
* @param end - End index (default: -1)
* @param asc - Ascending order (default: false)
* @returns Promise that resolves to array of job IDs
*/
getRanges(types: JobStatus[], start?: number, end?: number, asc?: boolean): Promise<string[]>;Usage Examples:
// Disconnect from Redis without destroying queue
await queue.disconnect();
console.log('Disconnected from Redis');
// Retry all failed jobs
await queue.retryJobs();
console.log('All failed jobs retried');
// Retry only first 10 failed jobs
await queue.retryJobs({ count: 10 });
// Remove specific debounce key
await queue.removeDebounceKey('user-email-12345');
// Monitor stalled jobs
const stalledCount = await queue.getStalledCount();
if (stalledCount > 0) {
console.log(`Found ${stalledCount} stalled jobs`);
}
// Get job IDs by status
const activeJobIds = await queue.getRanges(['active'], 0, 10);
const waitingAndDelayedIds = await queue.getRanges(['waiting', 'delayed']);interface AdvancedSettings {
/** Key expiration time for job locks (default: 30000) */
lockDuration?: number;
/** Interval to renew job locks (default: lockDuration / 2) */
lockRenewTime?: number;
/** How often to check for stalled jobs (default: 30000) */
stalledInterval?: number;
/** Max times a stalled job can be recovered (default: 1) */
maxStalledCount?: number;
/** Poll interval for delayed jobs (default: 5000) */
guardInterval?: number;
/** Delay before processing next job after error (default: 5000) */
retryProcessDelay?: number;
/** Timeout for drained state (default: 5) */
drainDelay?: number;
/** Custom backoff strategies */
backoffStrategies?: { [key: string]: (attemptsMade: number, err: Error, options?: any) => number };
/** Share child process pool across queues */
isSharedChildPool?: boolean;
}
interface RateLimiter {
/** Maximum number of jobs */
max: number;
/** Per duration in milliseconds */
duration: number;
/** When rate limited, keep jobs in waiting queue instead of delayed */
bounceBack?: boolean;
/** Group jobs by this key from job data */
groupKey?: string;
}
interface MetricsOpts {
/** Maximum number of data points to collect (default: 1440) */
maxDataPoints?: number;
}