CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bullmq

Redis-based distributed queue system for Node.js providing robust message and job processing capabilities with features like job scheduling, retries, and flow orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

job-scheduling.mddocs/

Job Scheduling

Advanced job scheduling and repeatable job management with the JobScheduler class. Enables cron-based scheduling, interval-based jobs, and complex recurring patterns for automated job execution.

Capabilities

JobScheduler Class

Manages repeatable jobs with advanced scheduling patterns and lifecycle management.

/**
 * JobScheduler manages repeatable jobs with advanced scheduling patterns
 */
class JobScheduler {
  constructor(name: string, opts: RepeatBaseOptions);
  
  /** Create or update a scheduled job */
  upsertJobScheduler<T, R, N extends string>(
    jobSchedulerId: string,
    repeatOpts: RepeatOptions,
    jobName: N,
    jobData: T,
    opts: JobSchedulerTemplateOptions,
    options: { override: boolean; producerId?: string }
  ): Promise<Job<T, R, N> | undefined>;
  
  /** Remove a scheduled job */
  removeJobScheduler(jobSchedulerId: string): Promise<number>;
  
  /** Get a specific scheduler by ID */
  getScheduler<D>(id: string): Promise<JobSchedulerJson<D> | undefined>;
  
  /** Get all job schedulers */
  getJobSchedulers<D>(start?: number, end?: number, asc?: boolean): Promise<JobSchedulerJson<D>[]>;
  
  /** Get count of active schedulers */
  getSchedulersCount(): Promise<number>;
  
  /** Close scheduler and cleanup connections */
  close(): Promise<void>;
}

Usage Examples:

import { JobScheduler } from "bullmq";

// Create job scheduler
const scheduler = new JobScheduler("email-scheduler", {
  connection: {
    host: "localhost",
    port: 6379,
  },
});

// Schedule daily reports
await scheduler.upsertJobScheduler(
  "daily-report",
  {
    cron: "0 9 * * *", // 9 AM daily
    tz: "America/New_York",
  },
  "generate-report",
  { reportType: "daily", recipients: ["admin@company.com"] },
  {
    attempts: 3,
    backoff: "exponential",
  },
  {
    override: true,
  }
);

// Schedule weekly maintenance
await scheduler.upsertJobScheduler(
  "weekly-maintenance",
  {
    cron: "0 2 * * 0", // 2 AM every Sunday
    limit: 52, // Run for one year
  },
  "system-maintenance",
  { type: "full-maintenance" },
  {
    priority: 10,
    attempts: 1,
  },
  {
    override: false,
  }
);

// Schedule interval-based health checks
await scheduler.upsertJobScheduler(
  "health-check",
  {
    every: 30000, // Every 30 seconds
    immediately: true,
  },
  "health-check",
  { services: ["database", "redis", "api"] },
  {},
  {
    override: true,
  }
);

Repeat Options

interface RepeatOptions {
  /** Cron expression for scheduling */
  cron?: string;
  
  /** Timezone for cron (default: UTC) */
  tz?: string;
  
  /** Start date for scheduling */
  startDate?: Date | string | number;
  
  /** End date for scheduling */
  endDate?: Date | string | number;
  
  /** Maximum number of iterations */
  limit?: number;
  
  /** Interval in milliseconds */
  every?: number;
  
  /** Start immediately */
  immediately?: boolean;
  
  /** Current iteration count */
  count?: number;
  
  /** Previous run time */
  prevMillis?: number;
  
  /** Job key for deduplication */
  jobId?: string;
}

JobScheduler Template Options

interface JobSchedulerTemplateOptions {
  /** Job priority (0-2097152, higher = more priority) */
  priority?: number;
  
  /** Number of retry attempts */
  attempts?: number;
  
  /** Retry backoff strategy */
  backoff?: BackoffStrategy | BackoffOptions;
  
  /** Job timeout in milliseconds */
  jobTimeout?: number;
  
  /** Keep completed job count/age */
  removeOnComplete?: number | boolean | KeepJobs;
  
  /** Keep failed job count/age */
  removeOnFail?: number | boolean | KeepJobs;
}

JobScheduler JSON Interface

interface JobSchedulerJson<D = any> {
  id: string;
  name: string;
  data: D;
  opts: JobSchedulerTemplateOptions;
  repeatOpts: RepeatOptions;
  nextMillis: number;
  endDate?: number;
  tz?: string;
  pattern?: string;
}

Repeat Base Options

interface RepeatBaseOptions {
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Key prefix for Redis keys */
  prefix?: string;
}

Advanced Scheduling Patterns

// Complex cron patterns
await scheduler.upsertJobScheduler(
  "business-hours-reminder",
  {
    cron: "0 9-17 * * 1-5", // Every hour 9-5, Monday-Friday
    tz: "America/New_York",
  },
  "hourly-reminder",
  { type: "business-hours" },
  {},
  { override: true }
);

// Quarterly reports
await scheduler.upsertJobScheduler(
  "quarterly-report",
  {
    cron: "0 0 1 */3 *", // First day of every 3rd month
    startDate: new Date("2024-01-01"),
    endDate: new Date("2026-12-31"),
  },
  "generate-quarterly-report",
  { format: "pdf", recipients: ["ceo@company.com"] },
  {
    attempts: 5,
    backoff: "exponential",
  },
  { override: true }
);

// Monthly cleanup with limit
await scheduler.upsertJobScheduler(
  "monthly-cleanup",
  {
    cron: "0 3 1 * *", // 3 AM on first day of month
    limit: 12, // Run for 12 months
  },
  "cleanup-old-data",
  { olderThan: "90days" },
  {},
  { override: true }
);

Scheduler Management

// Get all schedulers
const schedulers = await scheduler.getJobSchedulers();
console.log(`${schedulers.length} active schedulers`);

// Get specific scheduler
const dailyReport = await scheduler.getScheduler("daily-report");
if (dailyReport) {
  console.log(`Next run: ${new Date(dailyReport.nextMillis)}`);
}

// Get scheduler count
const count = await scheduler.getSchedulersCount();
console.log(`Total schedulers: ${count}`);

// Remove scheduler
const removed = await scheduler.removeJobScheduler("old-scheduler");
if (removed) {
  console.log("Scheduler removed successfully");
}

Time-Based Scheduling

// Schedule for specific date range
await scheduler.upsertJobScheduler(
  "holiday-promotion",
  {
    every: 60 * 60 * 1000, // Every hour
    startDate: new Date("2024-12-01"),
    endDate: new Date("2024-12-31"),
  },
  "send-promotion",
  { campaign: "holiday-2024" },
  {},
  { override: true }
);

// Schedule with timezone awareness
await scheduler.upsertJobScheduler(
  "regional-newsletter",
  {
    cron: "0 8 * * *", // 8 AM daily
    tz: "Europe/London",
  },
  "send-newsletter",
  { region: "UK" },
  {},
  { override: true }
);

Error Handling and Monitoring

// Schedule with comprehensive error handling
await scheduler.upsertJobScheduler(
  "critical-backup",
  {
    cron: "0 2 * * *", // 2 AM daily
  },
  "database-backup",
  { type: "full-backup" },
  {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 60000, // Start with 1 minute
    },
    removeOnComplete: 7, // Keep 7 days of successful backups
    removeOnFail: false, // Keep all failed attempts
  },
  {
    override: true,
    producerId: "backup-service",
  }
);

// Monitor scheduler health
const monitorScheduler = async () => {
  const schedulers = await scheduler.getJobSchedulers();
  
  for (const sched of schedulers) {
    const nextRun = new Date(sched.nextMillis);
    const now = new Date();
    
    if (nextRun < now) {
      console.warn(`Scheduler ${sched.id} may be behind schedule`);
    }
    
    console.log(`${sched.id}: next run at ${nextRun.toISOString()}`);
  }
};

// Run monitoring every 5 minutes
setInterval(monitorScheduler, 5 * 60 * 1000);

Lifecycle Management

// Graceful shutdown
process.on("SIGINT", async () => {
  console.log("Shutting down job scheduler...");
  await scheduler.close();
  process.exit(0);
});

// Handle scheduler errors
scheduler.on("error", (error) => {
  console.error("JobScheduler error:", error);
});

Best Practices

// Use meaningful scheduler IDs
const schedulerId = `${service}-${environment}-${jobType}`;

// Always specify timezone for cron jobs
const cronOpts = {
  cron: "0 9 * * *",
  tz: process.env.TIMEZONE || "UTC",
};

// Set reasonable limits for recurring jobs
const limitedSchedule = {
  every: 60000, // 1 minute
  limit: 1440, // 24 hours worth
};

// Use override carefully
const updateOptions = {
  override: process.env.NODE_ENV === "development", // Only in dev
};

Install with Tessl CLI

npx tessl i tessl/npm-bullmq

docs

configuration.md

error-handling.md

event-monitoring.md

flow-orchestration.md

index.md

job-lifecycle.md

job-processing.md

job-scheduling.md

queue-management.md

tile.json