Redis-based distributed queue system for Node.js providing robust message and job processing capabilities with features like job scheduling, retries, and flow orchestration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive configuration options for queues, workers, jobs, and Redis connections. BullMQ supports advanced features like rate limiting, metrics collection, telemetry, and fine-tuned performance settings.
interface QueueOptions {
/** Redis connection options */
connection?: ConnectionOptions;
/** Default options applied to all jobs */
defaultJobOptions?: BaseJobOptions;
/** Key prefix for Redis keys (default: 'bull') */
prefix?: string;
/** Stream configuration options */
streams?: StreamsOptions;
/** Skip updating queue metadata */
skipMetasUpdate?: boolean;
/** Enable job scheduler */
skipJobScheduler?: boolean;
/** Settings for the job scheduler */
settings?: AdvancedSettings;
}
interface StreamsOptions {
/** Events stream configuration */
events?: {
/** Maximum stream length */
maxLen?: number;
};
}
interface AdvancedSettings {
/** Retry delay in milliseconds for stalled jobs */
stalledInterval?: number;
/** Maximum delay for retries */
maxStalledCount?: number;
/** Retry delay for failed jobs */
retryProcessDelay?: number;
}Usage Examples:
import { Queue } from "bullmq";
// Basic queue configuration
const basicQueue = new Queue("basic-queue", {
connection: {
host: "localhost",
port: 6379,
db: 0,
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: "exponential",
},
});
// Advanced queue configuration
const advancedQueue = new Queue("advanced-queue", {
connection: {
host: "redis-cluster.example.com",
port: 6379,
password: "secure-password",
tls: {
rejectUnauthorized: false,
},
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
},
prefix: "myapp", // Keys will be prefixed with "myapp:"
streams: {
events: {
maxLen: 10000, // Keep max 10k events
},
},
settings: {
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 1, // Max 1 stall before failing
retryProcessDelay: 5000, // 5s delay for retry processing
},
skipMetasUpdate: false, // Keep queue metadata
skipJobScheduler: false, // Enable scheduling
});interface WorkerOptions {
/** Maximum number of jobs to process concurrently (default: 1) */
concurrency?: number;
/** Rate limiting configuration */
limiter?: RateLimiterOptions;
/** Maximum number of times a job can be stalled before failed (default: 1) */
maxStalledCount?: number;
/** Interval for checking stalled jobs in ms (default: 30000) */
stalledInterval?: number;
/** Keep completed jobs count/age */
removeOnComplete?: number | boolean | KeepJobs;
/** Keep failed jobs count/age */
removeOnFail?: number | boolean | KeepJobs;
/** Job lock duration in ms (default: 30000) */
lockDuration?: number;
/** Lock renewal interval in ms (default: 15000) */
lockRenewTime?: number;
/** Skip stalled job detection */
skipStalledCheck?: boolean;
/** Skip automatic lock renewal */
skipLockRenewal?: boolean;
/** Use versioned locks */
useWorkerVersioning?: boolean;
/** Redis connection options */
connection?: ConnectionOptions;
/** Key prefix for Redis keys */
prefix?: string;
/** Metrics collection options */
metrics?: MetricsOptions;
/** Telemetry configuration */
telemetry?: TelemetryOptions;
/** Sandbox options for isolated job processing */
sandbox?: SandboxOptions;
}Usage Examples:
import { Worker } from "bullmq";
// High-throughput worker configuration
const highThroughputWorker = new Worker("image-processing", processor, {
concurrency: 10,
maxStalledCount: 2,
stalledInterval: 30000,
lockDuration: 60000, // 1 minute job timeout
lockRenewTime: 30000, // Renew every 30 seconds
// Cleanup settings
removeOnComplete: {
count: 1000, // Keep 1000 completed jobs
age: 24 * 60 * 60, // or 24 hours
},
removeOnFail: {
count: 100, // Keep 100 failed jobs
age: 7 * 24 * 60 * 60, // or 7 days
},
// Rate limiting
limiter: {
max: 50, // Max 50 jobs
duration: 60 * 1000, // per minute
bucketSize: 10, // with burst capacity of 10
},
// Metrics collection
metrics: {
maxDataPoints: 100,
},
});
// Reliable worker configuration
const reliableWorker = new Worker("critical-tasks", processor, {
concurrency: 1, // Sequential processing
maxStalledCount: 0, // No stalling allowed
stalledInterval: 10000, // Check every 10 seconds
lockDuration: 300000, // 5 minute timeout
skipStalledCheck: false, // Enable stalled check
skipLockRenewal: false, // Enable lock renewal
useWorkerVersioning: true, // Use versioned locks
});interface BaseJobOptions {
/** Job priority (0-2097152, higher = more priority) */
priority?: number;
/** Delay before processing (milliseconds) */
delay?: number;
/** Number of retry attempts (default: 0) */
attempts?: number;
/** Retry backoff strategy */
backoff?: BackoffStrategy | BackoffOptions;
/** Lifo (last in, first out) processing */
lifo?: boolean;
/** Job timeout in milliseconds */
jobTimeout?: number;
/** Keep completed job count/age */
removeOnComplete?: number | boolean | KeepJobs;
/** Keep failed job count/age */
removeOnFail?: number | boolean | KeepJobs;
/** Parent job relationship */
parent?: ParentOptions;
/** Repeatable job configuration */
repeat?: RepeatOptions;
/** Custom job ID */
jobId?: string;
/** Job deduplication settings */
deduplication?: DeduplicationOptions;
}
interface BackoffOptions {
/** Backoff type */
type: 'fixed' | 'exponential';
/** Base delay in milliseconds */
delay?: number;
/** Add randomization to delay */
jitter?: boolean;
}
interface KeepJobs {
/** Keep jobs by age (seconds) */
age?: number;
/** Keep jobs by count */
count?: number;
}Usage Examples:
// Priority job with retry configuration
await queue.add("urgent-task", data, {
priority: 100,
attempts: 5,
backoff: {
type: "exponential",
delay: 2000,
jitter: true,
},
removeOnComplete: 10,
removeOnFail: false, // Keep all failed jobs
});
// Delayed job with cleanup
await queue.add("scheduled-task", data, {
delay: 60000, // 1 minute delay
jobTimeout: 30000, // 30 second timeout
removeOnComplete: {
count: 50,
age: 3600, // 1 hour
},
});
// Job with deduplication
await queue.add("unique-task", data, {
jobId: "unique-task-123",
deduplication: {
id: "user-action-123",
},
});interface RateLimiterOptions {
/** Maximum number of jobs to process */
max: number;
/** Duration window in milliseconds */
duration: number;
/** Bucket size for token bucket algorithm */
bucketSize?: number;
/** Group jobs by a function for separate rate limiting */
groupKey?: string | ((job: Job) => string);
}Usage Examples:
// Simple rate limiting
const rateLimitedWorker = new Worker("api-calls", processor, {
limiter: {
max: 100, // 100 requests
duration: 60 * 1000, // per minute
},
});
// Per-user rate limiting
const perUserWorker = new Worker("user-actions", processor, {
limiter: {
max: 10, // 10 actions
duration: 60 * 1000, // per minute
groupKey: (job) => job.data.userId, // per user
},
});
// Burst-capable rate limiting
const burstWorker = new Worker("batch-processing", processor, {
limiter: {
max: 1000, // 1000 jobs per hour
duration: 60 * 60 * 1000,
bucketSize: 100, // but allow bursts of 100
},
});interface ConnectionOptions {
/** Redis host */
host?: string;
/** Redis port */
port?: number;
/** Database number */
db?: number;
/** Authentication password */
password?: string;
/** Username for ACL authentication */
username?: string;
/** Connection timeout */
connectTimeout?: number;
/** Command timeout */
commandTimeout?: number;
/** Retry delay on failed attempts */
retryDelayOnFailedAttempt?: number;
/** Maximum retries per request */
maxRetriesPerRequest?: number;
/** TLS configuration */
tls?: TLSOptions;
/** Keep alive settings */
keepAlive?: number;
/** Family preference (4 or 6) */
family?: number;
}
interface TLSOptions {
/** Reject unauthorized certificates */
rejectUnauthorized?: boolean;
/** Certificate authority */
ca?: string | Buffer | Array<string | Buffer>;
/** Client certificate */
cert?: string | Buffer;
/** Client private key */
key?: string | Buffer;
}Usage Examples:
// Secure Redis connection
const secureConnection = {
host: "redis.example.com",
port: 6380,
password: "secure-password",
username: "bullmq-user",
db: 1,
tls: {
rejectUnauthorized: true,
ca: fs.readFileSync("ca-cert.pem"),
cert: fs.readFileSync("client-cert.pem"),
key: fs.readFileSync("client-key.pem"),
},
connectTimeout: 10000,
commandTimeout: 5000,
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
};
// Connection with keep-alive
const keepAliveConnection = {
host: "localhost",
port: 6379,
keepAlive: 30000, // 30 second keep-alive
family: 4, // IPv4
connectTimeout: 60000, // 1 minute timeout
};interface MetricsOptions {
/** Maximum data points to collect */
maxDataPoints?: number;
}interface TelemetryOptions {
/** Telemetry tracer */
tracer?: Tracer;
/** Context manager for distributed tracing */
contextManager?: ContextManager;
}interface SandboxOptions {
/** Timeout for sandboxed processes */
timeout?: number;
/** Memory limit for sandboxed processes */
memoryLimit?: number;
/** Environment variables */
env?: Record<string, string>;
}interface RepeatOptions {
/** Cron expression */
cron?: string;
/** Timezone for cron */
tz?: string;
/** Start date */
startDate?: Date | string | number;
/** End date */
endDate?: Date | string | number;
/** Maximum number of iterations */
limit?: number;
/** Interval in milliseconds */
every?: number;
/** Start immediately */
immediately?: boolean;
/** Current iteration count */
count?: number;
/** Previous run time */
prevMillis?: number;
/** Job key for deduplication */
jobId?: string;
}Usage Examples:
// Cron-based scheduling
await queue.add("daily-report", {}, {
repeat: {
cron: "0 9 * * *", // 9 AM daily
tz: "America/New_York",
limit: 365, // Max 365 iterations
},
});
// Interval-based scheduling
await queue.add("health-check", {}, {
repeat: {
every: 30000, // Every 30 seconds
immediately: true, // Start immediately
},
});
// Time-bounded scheduling
await queue.add("promotion", data, {
repeat: {
every: 60000, // Every minute
startDate: new Date("2024-01-01"),
endDate: new Date("2024-01-31"),
},
});// Development configuration
const devConfig = {
connection: {
host: "localhost",
port: 6379,
db: 0,
},
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 10,
attempts: 1,
},
};
// Production configuration
const prodConfig = {
connection: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
tls: {
rejectUnauthorized: true,
},
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
},
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 100,
attempts: 3,
backoff: "exponential",
},
settings: {
stalledInterval: 30000,
maxStalledCount: 1,
},
};
// Use environment-specific config
const config = process.env.NODE_ENV === "production" ? prodConfig : devConfig;
const queue = new Queue("my-queue", config);// Centralized configuration management
class BullMQConfig {
static getQueueConfig(queueName: string): QueueOptions {
return {
connection: this.getConnectionConfig(),
prefix: process.env.BULLMQ_PREFIX || "bull",
defaultJobOptions: {
removeOnComplete: parseInt(process.env.KEEP_COMPLETED || "100"),
removeOnFail: parseInt(process.env.KEEP_FAILED || "50"),
attempts: parseInt(process.env.DEFAULT_ATTEMPTS || "3"),
backoff: "exponential",
},
};
}
static getWorkerConfig(concurrency: number = 1): WorkerOptions {
return {
connection: this.getConnectionConfig(),
concurrency,
maxStalledCount: parseInt(process.env.MAX_STALLED || "1"),
stalledInterval: parseInt(process.env.STALLED_INTERVAL || "30000"),
lockDuration: parseInt(process.env.LOCK_DURATION || "30000"),
};
}
private static getConnectionConfig(): ConnectionOptions {
return {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || "0"),
retryDelayOnFailedAttempt: 100,
maxRetriesPerRequest: 3,
};
}
}
// Use centralized config
const queue = new Queue("orders", BullMQConfig.getQueueConfig("orders"));
const worker = new Worker("orders", processor, BullMQConfig.getWorkerConfig(5));Install with Tessl CLI
npx tessl i tessl/npm-bullmq