or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

configuration.mddocs/

Configuration

Comprehensive configuration options for queues, workers, jobs, and Redis connections. BullMQ supports advanced features like rate limiting, metrics collection, telemetry, and fine-tuned performance settings.

Capabilities

Queue Configuration

interface QueueOptions {
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Default options applied to all jobs */
  defaultJobOptions?: BaseJobOptions;
  
  /** Key prefix for Redis keys (default: 'bull') */
  prefix?: string;
  
  /** Stream configuration options */
  streams?: StreamsOptions;
  
  /** Skip updating queue metadata */
  skipMetasUpdate?: boolean;
  
  /** Enable job scheduler */
  skipJobScheduler?: boolean;
  
  /** Settings for the job scheduler */
  settings?: AdvancedSettings;
}

interface StreamsOptions {
  /** Events stream configuration */
  events?: {
    /** Maximum stream length */
    maxLen?: number;
  };
}

interface AdvancedSettings {
  /** Retry delay in milliseconds for stalled jobs */
  stalledInterval?: number;
  
  /** Maximum delay for retries */
  maxStalledCount?: number;
  
  /** Retry delay for failed jobs */
  retryProcessDelay?: number;
}

Usage Examples:

import { Queue } from "bullmq";

// Basic queue configuration
const basicQueue = new Queue("basic-queue", {
  connection: {
    host: "localhost",
    port: 6379,
    db: 0,
  },
  defaultJobOptions: {
    removeOnComplete: 100,
    removeOnFail: 50,
    attempts: 3,
    backoff: "exponential",
  },
});

// Advanced queue configuration
const advancedQueue = new Queue("advanced-queue", {
  connection: {
    host: "redis-cluster.example.com",
    port: 6379,
    password: "secure-password",
    tls: {
      rejectUnauthorized: false,
    },
    retryDelayOnFailedAttempt: 100,
    maxRetriesPerRequest: 3,
  },
  prefix: "myapp", // Keys will be prefixed with "myapp:"
  streams: {
    events: {
      maxLen: 10000, // Keep max 10k events
    },
  },
  settings: {
    stalledInterval: 30000,    // Check for stalled jobs every 30s
    maxStalledCount: 1,        // Max 1 stall before failing
    retryProcessDelay: 5000,   // 5s delay for retry processing
  },
  skipMetasUpdate: false,      // Keep queue metadata
  skipJobScheduler: false,     // Enable scheduling
});

Worker Configuration

interface WorkerOptions {
  /** Maximum number of jobs to process concurrently (default: 1) */
  concurrency?: number;
  
  /** Rate limiting configuration */
  limiter?: RateLimiterOptions;
  
  /** Maximum number of times a job can be stalled before failed (default: 1) */
  maxStalledCount?: number;
  
  /** Interval for checking stalled jobs in ms (default: 30000) */
  stalledInterval?: number;
  
  /** Keep completed jobs count/age */
  removeOnComplete?: number | boolean | KeepJobs;
  
  /** Keep failed jobs count/age */
  removeOnFail?: number | boolean | KeepJobs;
  
  /** Job lock duration in ms (default: 30000) */
  lockDuration?: number;
  
  /** Lock renewal interval in ms (default: 15000) */
  lockRenewTime?: number;
  
  /** Skip stalled job detection */
  skipStalledCheck?: boolean;
  
  /** Skip automatic lock renewal */
  skipLockRenewal?: boolean;
  
  /** Use versioned locks */
  useWorkerVersioning?: boolean;
  
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Key prefix for Redis keys */
  prefix?: string;
  
  /** Metrics collection options */
  metrics?: MetricsOptions;
  
  /** Telemetry configuration */
  telemetry?: TelemetryOptions;
  
  /** Sandbox options for isolated job processing */
  sandbox?: SandboxOptions;
}

Usage Examples:

import { Worker } from "bullmq";

// High-throughput worker configuration
const highThroughputWorker = new Worker("image-processing", processor, {
  concurrency: 10,
  maxStalledCount: 2,
  stalledInterval: 30000,
  lockDuration: 60000,      // 1 minute job timeout
  lockRenewTime: 30000,     // Renew every 30 seconds
  
  // Cleanup settings
  removeOnComplete: {
    count: 1000,            // Keep 1000 completed jobs
    age: 24 * 60 * 60,      // or 24 hours
  },
  removeOnFail: {
    count: 100,             // Keep 100 failed jobs
    age: 7 * 24 * 60 * 60,  // or 7 days
  },
  
  // Rate limiting
  limiter: {
    max: 50,                // Max 50 jobs
    duration: 60 * 1000,    // per minute
    bucketSize: 10,         // with burst capacity of 10
  },
  
  // Metrics collection
  metrics: {
    maxDataPoints: 100,
  },
});

// Reliable worker configuration
const reliableWorker = new Worker("critical-tasks", processor, {
  concurrency: 1,           // Sequential processing
  maxStalledCount: 0,       // No stalling allowed
  stalledInterval: 10000,   // Check every 10 seconds
  lockDuration: 300000,     // 5 minute timeout
  skipStalledCheck: false,  // Enable stalled check
  skipLockRenewal: false,   // Enable lock renewal
  useWorkerVersioning: true, // Use versioned locks
});

Job Configuration

interface BaseJobOptions {
  /** Job priority (0-2097152, higher = more priority) */
  priority?: number;
  
  /** Delay before processing (milliseconds) */
  delay?: number;
  
  /** Number of retry attempts (default: 0) */
  attempts?: number;
  
  /** Retry backoff strategy */
  backoff?: BackoffStrategy | BackoffOptions;
  
  /** Lifo (last in, first out) processing */
  lifo?: boolean;
  
  /** Job timeout in milliseconds */
  jobTimeout?: number;
  
  /** Keep completed job count/age */
  removeOnComplete?: number | boolean | KeepJobs;
  
  /** Keep failed job count/age */
  removeOnFail?: number | boolean | KeepJobs;
  
  /** Parent job relationship */
  parent?: ParentOptions;
  
  /** Repeatable job configuration */
  repeat?: RepeatOptions;
  
  /** Custom job ID */
  jobId?: string;
  
  /** Job deduplication settings */
  deduplication?: DeduplicationOptions;
}

interface BackoffOptions {
  /** Backoff type */
  type: 'fixed' | 'exponential';
  
  /** Base delay in milliseconds */
  delay?: number;
  
  /** Add randomization to delay */
  jitter?: boolean;
}

interface KeepJobs {
  /** Keep jobs by age (seconds) */
  age?: number;
  
  /** Keep jobs by count */
  count?: number;
}

Usage Examples:

// Priority job with retry configuration
await queue.add("urgent-task", data, {
  priority: 100,
  attempts: 5,
  backoff: {
    type: "exponential",
    delay: 2000,
    jitter: true,
  },
  removeOnComplete: 10,
  removeOnFail: false, // Keep all failed jobs
});

// Delayed job with cleanup
await queue.add("scheduled-task", data, {
  delay: 60000, // 1 minute delay
  jobTimeout: 30000, // 30 second timeout
  removeOnComplete: {
    count: 50,
    age: 3600, // 1 hour
  },
});

// Job with deduplication
await queue.add("unique-task", data, {
  jobId: "unique-task-123",
  deduplication: {
    id: "user-action-123",
  },
});

Rate Limiting Configuration

interface RateLimiterOptions {
  /** Maximum number of jobs to process */
  max: number;
  
  /** Duration window in milliseconds */
  duration: number;
  
  /** Bucket size for token bucket algorithm */
  bucketSize?: number;
  
  /** Group jobs by a function for separate rate limiting */
  groupKey?: string | ((job: Job) => string);
}

Usage Examples:

// Simple rate limiting
const rateLimitedWorker = new Worker("api-calls", processor, {
  limiter: {
    max: 100,               // 100 requests
    duration: 60 * 1000,    // per minute
  },
});

// Per-user rate limiting
const perUserWorker = new Worker("user-actions", processor, {
  limiter: {
    max: 10,                // 10 actions
    duration: 60 * 1000,    // per minute
    groupKey: (job) => job.data.userId, // per user
  },
});

// Burst-capable rate limiting
const burstWorker = new Worker("batch-processing", processor, {
  limiter: {
    max: 1000,              // 1000 jobs per hour
    duration: 60 * 60 * 1000,
    bucketSize: 100,        // but allow bursts of 100
  },
});

Connection Configuration

interface ConnectionOptions {
  /** Redis host */
  host?: string;
  
  /** Redis port */
  port?: number;
  
  /** Database number */
  db?: number;
  
  /** Authentication password */
  password?: string;
  
  /** Username for ACL authentication */
  username?: string;
  
  /** Connection timeout */
  connectTimeout?: number;
  
  /** Command timeout */
  commandTimeout?: number;
  
  /** Retry delay on failed attempts */
  retryDelayOnFailedAttempt?: number;
  
  /** Maximum retries per request */
  maxRetriesPerRequest?: number;
  
  /** TLS configuration */
  tls?: TLSOptions;
  
  /** Keep alive settings */
  keepAlive?: number;
  
  /** Family preference (4 or 6) */
  family?: number;
}

interface TLSOptions {
  /** Reject unauthorized certificates */
  rejectUnauthorized?: boolean;
  
  /** Certificate authority */
  ca?: string | Buffer | Array<string | Buffer>;
  
  /** Client certificate */
  cert?: string | Buffer;
  
  /** Client private key */
  key?: string | Buffer;
}

Usage Examples:

// Secure Redis connection
const secureConnection = {
  host: "redis.example.com",
  port: 6380,
  password: "secure-password",
  username: "bullmq-user",
  db: 1,
  tls: {
    rejectUnauthorized: true,
    ca: fs.readFileSync("ca-cert.pem"),
    cert: fs.readFileSync("client-cert.pem"),
    key: fs.readFileSync("client-key.pem"),
  },
  connectTimeout: 10000,
  commandTimeout: 5000,
  retryDelayOnFailedAttempt: 100,
  maxRetriesPerRequest: 3,
};

// Connection with keep-alive
const keepAliveConnection = {
  host: "localhost",
  port: 6379,
  keepAlive: 30000,        // 30 second keep-alive
  family: 4,               // IPv4
  connectTimeout: 60000,   // 1 minute timeout
};

Metrics Configuration

interface MetricsOptions {
  /** Maximum data points to collect */
  maxDataPoints?: number;
}

Telemetry Configuration

interface TelemetryOptions {
  /** Telemetry tracer */
  tracer?: Tracer;
  
  /** Context manager for distributed tracing */
  contextManager?: ContextManager;
}

Sandbox Configuration

interface SandboxOptions {
  /** Timeout for sandboxed processes */
  timeout?: number;
  
  /** Memory limit for sandboxed processes */
  memoryLimit?: number;
  
  /** Environment variables */
  env?: Record<string, string>;
}

Repeat/Schedule Configuration

interface RepeatOptions {
  /** Cron expression */
  cron?: string;
  
  /** Timezone for cron */
  tz?: string;
  
  /** Start date */
  startDate?: Date | string | number;
  
  /** End date */
  endDate?: Date | string | number;
  
  /** Maximum number of iterations */
  limit?: number;
  
  /** Interval in milliseconds */
  every?: number;
  
  /** Start immediately */
  immediately?: boolean;
  
  /** Current iteration count */
  count?: number;
  
  /** Previous run time */
  prevMillis?: number;
  
  /** Job key for deduplication */
  jobId?: string;
}

Usage Examples:

// Cron-based scheduling
await queue.add("daily-report", {}, {
  repeat: {
    cron: "0 9 * * *",      // 9 AM daily
    tz: "America/New_York",
    limit: 365,             // Max 365 iterations
  },
});

// Interval-based scheduling
await queue.add("health-check", {}, {
  repeat: {
    every: 30000,           // Every 30 seconds
    immediately: true,      // Start immediately
  },
});

// Time-bounded scheduling
await queue.add("promotion", data, {
  repeat: {
    every: 60000,           // Every minute
    startDate: new Date("2024-01-01"),
    endDate: new Date("2024-01-31"),
  },
});

Environment-Specific Configurations

// Development configuration
const devConfig = {
  connection: {
    host: "localhost",
    port: 6379,
    db: 0,
  },
  defaultJobOptions: {
    removeOnComplete: 10,
    removeOnFail: 10,
    attempts: 1,
  },
};

// Production configuration
const prodConfig = {
  connection: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || "6379"),
    password: process.env.REDIS_PASSWORD,
    tls: {
      rejectUnauthorized: true,
    },
    retryDelayOnFailedAttempt: 100,
    maxRetriesPerRequest: 3,
  },
  defaultJobOptions: {
    removeOnComplete: 1000,
    removeOnFail: 100,
    attempts: 3,
    backoff: "exponential",
  },
  settings: {
    stalledInterval: 30000,
    maxStalledCount: 1,
  },
};

// Use environment-specific config
const config = process.env.NODE_ENV === "production" ? prodConfig : devConfig;
const queue = new Queue("my-queue", config);

Configuration Best Practices

// Centralized configuration management
class BullMQConfig {
  static getQueueConfig(queueName: string): QueueOptions {
    return {
      connection: this.getConnectionConfig(),
      prefix: process.env.BULLMQ_PREFIX || "bull",
      defaultJobOptions: {
        removeOnComplete: parseInt(process.env.KEEP_COMPLETED || "100"),
        removeOnFail: parseInt(process.env.KEEP_FAILED || "50"),
        attempts: parseInt(process.env.DEFAULT_ATTEMPTS || "3"),
        backoff: "exponential",
      },
    };
  }
  
  static getWorkerConfig(concurrency: number = 1): WorkerOptions {
    return {
      connection: this.getConnectionConfig(),
      concurrency,
      maxStalledCount: parseInt(process.env.MAX_STALLED || "1"),
      stalledInterval: parseInt(process.env.STALLED_INTERVAL || "30000"),
      lockDuration: parseInt(process.env.LOCK_DURATION || "30000"),
    };
  }
  
  private static getConnectionConfig(): ConnectionOptions {
    return {
      host: process.env.REDIS_HOST || "localhost",
      port: parseInt(process.env.REDIS_PORT || "6379"),
      password: process.env.REDIS_PASSWORD,
      db: parseInt(process.env.REDIS_DB || "0"),
      retryDelayOnFailedAttempt: 100,
      maxRetriesPerRequest: 3,
    };
  }
}

// Use centralized config
const queue = new Queue("orders", BullMQConfig.getQueueConfig("orders"));
const worker = new Worker("orders", processor, BullMQConfig.getWorkerConfig(5));