Comprehensive configuration options for queues, workers, jobs, and Redis connections. BullMQ supports advanced features like rate limiting, metrics collection, telemetry, and fine-tuned performance settings.
interface QueueOptions {
/** Redis connection options */
connection?: ConnectionOptions;
/** Default options applied to all jobs */
defaultJobOptions?: BaseJobOptions;
/** Key prefix for Redis keys (default: 'bull') */
prefix?: string;
/** Stream configuration options */
streams?: StreamsOptions;
/** Skip updating queue metadata */
skipMetasUpdate?: boolean;
/** Enable job scheduler */
skipJobScheduler?: boolean;
/** Settings for the job scheduler */
settings?: AdvancedSettings;
}
interface StreamsOptions {
/** Events stream configuration */
events?: {
/** Maximum stream length */
maxLen?: number;
};
}
interface AdvancedSettings {
/** Retry delay in milliseconds for stalled jobs */
stalledInterval?: number;
/** Maximum delay for retries */
maxStalledCount?: number;
/** Retry delay for failed jobs */
retryProcessDelay?: number;
}Usage Examples:
import { Queue } from "bullmq";
// Basic queue configuration
const basicQueue = new Queue("basic-queue", {
connection: {
host: "localhost",
port: 6379,
db: 0,
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: "exponential",
},
});
// Advanced queue configuration
const advancedQueue = new Queue("advanced-queue", {
connection: {
host: "redis-cluster.example.com",
port: 6379,
password: "secure-password",
tls: {
rejectUnauthorized: false,
},
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
},
prefix: "myapp", // Keys will be prefixed with "myapp:"
streams: {
events: {
maxLen: 10000, // Keep max 10k events
},
},
settings: {
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 1, // Max 1 stall before failing
retryProcessDelay: 5000, // 5s delay for retry processing
},
skipMetasUpdate: false, // Keep queue metadata
skipJobScheduler: false, // Enable scheduling
});interface WorkerOptions {
/** Maximum number of jobs to process concurrently (default: 1) */
concurrency?: number;
/** Rate limiting configuration */
limiter?: RateLimiterOptions;
/** Maximum number of times a job can be stalled before failed (default: 1) */
maxStalledCount?: number;
/** Interval for checking stalled jobs in ms (default: 30000) */
stalledInterval?: number;
/** Keep completed jobs count/age */
removeOnComplete?: number | boolean | KeepJobs;
/** Keep failed jobs count/age */
removeOnFail?: number | boolean | KeepJobs;
/** Job lock duration in ms (default: 30000) */
lockDuration?: number;
/** Lock renewal interval in ms (default: 15000) */
lockRenewTime?: number;
/** Skip stalled job detection */
skipStalledCheck?: boolean;
/** Skip automatic lock renewal */
skipLockRenewal?: boolean;
/** Use versioned locks */
useWorkerVersioning?: boolean;
/** Redis connection options */
connection?: ConnectionOptions;
/** Key prefix for Redis keys */
prefix?: string;
/** Metrics collection options */
metrics?: MetricsOptions;
/** Telemetry configuration */
telemetry?: TelemetryOptions;
/** Sandbox options for isolated job processing */
sandbox?: SandboxOptions;
}Usage Examples:
import { Worker } from "bullmq";
// High-throughput worker configuration
const highThroughputWorker = new Worker("image-processing", processor, {
concurrency: 10,
maxStalledCount: 2,
stalledInterval: 30000,
lockDuration: 60000, // 1 minute job timeout
lockRenewTime: 30000, // Renew every 30 seconds
// Cleanup settings
removeOnComplete: {
count: 1000, // Keep 1000 completed jobs
age: 24 * 60 * 60, // or 24 hours
},
removeOnFail: {
count: 100, // Keep 100 failed jobs
age: 7 * 24 * 60 * 60, // or 7 days
},
// Rate limiting
limiter: {
max: 50, // Max 50 jobs
duration: 60 * 1000, // per minute
bucketSize: 10, // with burst capacity of 10
},
// Metrics collection
metrics: {
maxDataPoints: 100,
},
});
// Reliable worker configuration
const reliableWorker = new Worker("critical-tasks", processor, {
concurrency: 1, // Sequential processing
maxStalledCount: 0, // No stalling allowed
stalledInterval: 10000, // Check every 10 seconds
lockDuration: 300000, // 5 minute timeout
skipStalledCheck: false, // Enable stalled check
skipLockRenewal: false, // Enable lock renewal
useWorkerVersioning: true, // Use versioned locks
});interface BaseJobOptions {
/** Job priority (0-2097152, higher = more priority) */
priority?: number;
/** Delay before processing (milliseconds) */
delay?: number;
/** Number of retry attempts (default: 0) */
attempts?: number;
/** Retry backoff strategy */
backoff?: BackoffStrategy | BackoffOptions;
/** Lifo (last in, first out) processing */
lifo?: boolean;
/** Job timeout in milliseconds */
jobTimeout?: number;
/** Keep completed job count/age */
removeOnComplete?: number | boolean | KeepJobs;
/** Keep failed job count/age */
removeOnFail?: number | boolean | KeepJobs;
/** Parent job relationship */
parent?: ParentOptions;
/** Repeatable job configuration */
repeat?: RepeatOptions;
/** Custom job ID */
jobId?: string;
/** Job deduplication settings */
deduplication?: DeduplicationOptions;
}
interface BackoffOptions {
/** Backoff type */
type: 'fixed' | 'exponential';
/** Base delay in milliseconds */
delay?: number;
/** Add randomization to delay */
jitter?: boolean;
}
interface KeepJobs {
/** Keep jobs by age (seconds) */
age?: number;
/** Keep jobs by count */
count?: number;
}Usage Examples:
// Priority job with retry configuration
await queue.add("urgent-task", data, {
priority: 100,
attempts: 5,
backoff: {
type: "exponential",
delay: 2000,
jitter: true,
},
removeOnComplete: 10,
removeOnFail: false, // Keep all failed jobs
});
// Delayed job with cleanup
await queue.add("scheduled-task", data, {
delay: 60000, // 1 minute delay
jobTimeout: 30000, // 30 second timeout
removeOnComplete: {
count: 50,
age: 3600, // 1 hour
},
});
// Job with deduplication
await queue.add("unique-task", data, {
jobId: "unique-task-123",
deduplication: {
id: "user-action-123",
},
});interface RateLimiterOptions {
/** Maximum number of jobs to process */
max: number;
/** Duration window in milliseconds */
duration: number;
/** Bucket size for token bucket algorithm */
bucketSize?: number;
/** Group jobs by a function for separate rate limiting */
groupKey?: string | ((job: Job) => string);
}Usage Examples:
// Simple rate limiting
const rateLimitedWorker = new Worker("api-calls", processor, {
limiter: {
max: 100, // 100 requests
duration: 60 * 1000, // per minute
},
});
// Per-user rate limiting
const perUserWorker = new Worker("user-actions", processor, {
limiter: {
max: 10, // 10 actions
duration: 60 * 1000, // per minute
groupKey: (job) => job.data.userId, // per user
},
});
// Burst-capable rate limiting
const burstWorker = new Worker("batch-processing", processor, {
limiter: {
max: 1000, // 1000 jobs per hour
duration: 60 * 60 * 1000,
bucketSize: 100, // but allow bursts of 100
},
});interface ConnectionOptions {
/** Redis host */
host?: string;
/** Redis port */
port?: number;
/** Database number */
db?: number;
/** Authentication password */
password?: string;
/** Username for ACL authentication */
username?: string;
/** Connection timeout */
connectTimeout?: number;
/** Command timeout */
commandTimeout?: number;
/** Retry delay on failed attempts */
retryDelayOnFailedAttempt?: number;
/** Maximum retries per request */
maxRetriesPerRequest?: number;
/** TLS configuration */
tls?: TLSOptions;
/** Keep alive settings */
keepAlive?: number;
/** Family preference (4 or 6) */
family?: number;
}
interface TLSOptions {
/** Reject unauthorized certificates */
rejectUnauthorized?: boolean;
/** Certificate authority */
ca?: string | Buffer | Array<string | Buffer>;
/** Client certificate */
cert?: string | Buffer;
/** Client private key */
key?: string | Buffer;
}Usage Examples:
// Secure Redis connection
const secureConnection = {
host: "redis.example.com",
port: 6380,
password: "secure-password",
username: "bullmq-user",
db: 1,
tls: {
rejectUnauthorized: true,
ca: fs.readFileSync("ca-cert.pem"),
cert: fs.readFileSync("client-cert.pem"),
key: fs.readFileSync("client-key.pem"),
},
connectTimeout: 10000,
commandTimeout: 5000,
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
};
// Connection with keep-alive
const keepAliveConnection = {
host: "localhost",
port: 6379,
keepAlive: 30000, // 30 second keep-alive
family: 4, // IPv4
connectTimeout: 60000, // 1 minute timeout
};interface MetricsOptions {
/** Maximum data points to collect */
maxDataPoints?: number;
}interface TelemetryOptions {
/** Telemetry tracer */
tracer?: Tracer;
/** Context manager for distributed tracing */
contextManager?: ContextManager;
}interface SandboxOptions {
/** Timeout for sandboxed processes */
timeout?: number;
/** Memory limit for sandboxed processes */
memoryLimit?: number;
/** Environment variables */
env?: Record<string, string>;
}interface RepeatOptions {
/** Cron expression */
cron?: string;
/** Timezone for cron */
tz?: string;
/** Start date */
startDate?: Date | string | number;
/** End date */
endDate?: Date | string | number;
/** Maximum number of iterations */
limit?: number;
/** Interval in milliseconds */
every?: number;
/** Start immediately */
immediately?: boolean;
/** Current iteration count */
count?: number;
/** Previous run time */
prevMillis?: number;
/** Job key for deduplication */
jobId?: string;
}Usage Examples:
// Cron-based scheduling
await queue.add("daily-report", {}, {
repeat: {
cron: "0 9 * * *", // 9 AM daily
tz: "America/New_York",
limit: 365, // Max 365 iterations
},
});
// Interval-based scheduling
await queue.add("health-check", {}, {
repeat: {
every: 30000, // Every 30 seconds
immediately: true, // Start immediately
},
});
// Time-bounded scheduling
await queue.add("promotion", data, {
repeat: {
every: 60000, // Every minute
startDate: new Date("2024-01-01"),
endDate: new Date("2024-01-31"),
},
});// Development configuration
const devConfig = {
connection: {
host: "localhost",
port: 6379,
db: 0,
},
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 10,
attempts: 1,
},
};
// Production configuration
const prodConfig = {
connection: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
tls: {
rejectUnauthorized: true,
},
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
},
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 100,
attempts: 3,
backoff: "exponential",
},
settings: {
stalledInterval: 30000,
maxStalledCount: 1,
},
};
// Use environment-specific config
const config = process.env.NODE_ENV === "production" ? prodConfig : devConfig;
const queue = new Queue("my-queue", config);// Centralized configuration management
class BullMQConfig {
static getQueueConfig(queueName: string): QueueOptions {
return {
connection: this.getConnectionConfig(),
prefix: process.env.BULLMQ_PREFIX || "bull",
defaultJobOptions: {
removeOnComplete: parseInt(process.env.KEEP_COMPLETED || "100"),
removeOnFail: parseInt(process.env.KEEP_FAILED || "50"),
attempts: parseInt(process.env.DEFAULT_ATTEMPTS || "3"),
backoff: "exponential",
},
};
}
static getWorkerConfig(concurrency: number = 1): WorkerOptions {
return {
connection: this.getConnectionConfig(),
concurrency,
maxStalledCount: parseInt(process.env.MAX_STALLED || "1"),
stalledInterval: parseInt(process.env.STALLED_INTERVAL || "30000"),
lockDuration: parseInt(process.env.LOCK_DURATION || "30000"),
};
}
private static getConnectionConfig(): ConnectionOptions {
return {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || "0"),
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
};
}
}
// Use centralized config
const queue = new Queue("orders", BullMQConfig.getQueueConfig("orders"));
const worker = new Worker("orders", processor, BullMQConfig.getWorkerConfig(5));