Advanced job scheduling and repeatable job management with the JobScheduler class. Enables cron-based scheduling, interval-based jobs, and complex recurring patterns for automated job execution.
Manages repeatable jobs with advanced scheduling patterns and lifecycle management.
/**
* JobScheduler manages repeatable jobs with advanced scheduling patterns
*/
class JobScheduler {
constructor(name: string, opts: RepeatBaseOptions);
/** Create or update a scheduled job */
upsertJobScheduler<T, R, N extends string>(
jobSchedulerId: string,
repeatOpts: RepeatOptions,
jobName: N,
jobData: T,
opts: JobSchedulerTemplateOptions,
options: { override: boolean; producerId?: string }
): Promise<Job<T, R, N> | undefined>;
/** Remove a scheduled job */
removeJobScheduler(jobSchedulerId: string): Promise<number>;
/** Get a specific scheduler by ID */
getScheduler<D>(id: string): Promise<JobSchedulerJson<D> | undefined>;
/** Get all job schedulers */
getJobSchedulers<D>(start?: number, end?: number, asc?: boolean): Promise<JobSchedulerJson<D>[]>;
/** Get count of active schedulers */
getSchedulersCount(): Promise<number>;
/** Close scheduler and cleanup connections */
close(): Promise<void>;
}Usage Examples:
import { JobScheduler } from "bullmq";
// Create job scheduler
const scheduler = new JobScheduler("email-scheduler", {
connection: {
host: "localhost",
port: 6379,
},
});
// Schedule daily reports
await scheduler.upsertJobScheduler(
"daily-report",
{
cron: "0 9 * * *", // 9 AM daily
tz: "America/New_York",
},
"generate-report",
{ reportType: "daily", recipients: ["admin@company.com"] },
{
attempts: 3,
backoff: "exponential",
},
{
override: true,
}
);
// Schedule weekly maintenance
await scheduler.upsertJobScheduler(
"weekly-maintenance",
{
cron: "0 2 * * 0", // 2 AM every Sunday
limit: 52, // Run for one year
},
"system-maintenance",
{ type: "full-maintenance" },
{
priority: 10,
attempts: 1,
},
{
override: false,
}
);
// Schedule interval-based health checks
await scheduler.upsertJobScheduler(
"health-check",
{
every: 30000, // Every 30 seconds
immediately: true,
},
"health-check",
{ services: ["database", "redis", "api"] },
{},
{
override: true,
}
);interface RepeatOptions {
/** Cron expression for scheduling */
cron?: string;
/** Timezone for cron (default: UTC) */
tz?: string;
/** Start date for scheduling */
startDate?: Date | string | number;
/** End date for scheduling */
endDate?: Date | string | number;
/** Maximum number of iterations */
limit?: number;
/** Interval in milliseconds */
every?: number;
/** Start immediately */
immediately?: boolean;
/** Current iteration count */
count?: number;
/** Previous run time */
prevMillis?: number;
/** Job key for deduplication */
jobId?: string;
}interface JobSchedulerTemplateOptions {
/** Job priority (0-2097152, higher = more priority) */
priority?: number;
/** Number of retry attempts */
attempts?: number;
/** Retry backoff strategy */
backoff?: BackoffStrategy | BackoffOptions;
/** Job timeout in milliseconds */
jobTimeout?: number;
/** Keep completed job count/age */
removeOnComplete?: number | boolean | KeepJobs;
/** Keep failed job count/age */
removeOnFail?: number | boolean | KeepJobs;
}interface JobSchedulerJson<D = any> {
id: string;
name: string;
data: D;
opts: JobSchedulerTemplateOptions;
repeatOpts: RepeatOptions;
nextMillis: number;
endDate?: number;
tz?: string;
pattern?: string;
}interface RepeatBaseOptions {
/** Redis connection options */
connection?: ConnectionOptions;
/** Key prefix for Redis keys */
prefix?: string;
}// Complex cron patterns
await scheduler.upsertJobScheduler(
"business-hours-reminder",
{
cron: "0 9-17 * * 1-5", // Every hour 9-5, Monday-Friday
tz: "America/New_York",
},
"hourly-reminder",
{ type: "business-hours" },
{},
{ override: true }
);
// Quarterly reports
await scheduler.upsertJobScheduler(
"quarterly-report",
{
cron: "0 0 1 */3 *", // First day of every 3rd month
startDate: new Date("2024-01-01"),
endDate: new Date("2026-12-31"),
},
"generate-quarterly-report",
{ format: "pdf", recipients: ["ceo@company.com"] },
{
attempts: 5,
backoff: "exponential",
},
{ override: true }
);
// Monthly cleanup with limit
await scheduler.upsertJobScheduler(
"monthly-cleanup",
{
cron: "0 3 1 * *", // 3 AM on first day of month
limit: 12, // Run for 12 months
},
"cleanup-old-data",
{ olderThan: "90days" },
{},
{ override: true }
);// Get all schedulers
const schedulers = await scheduler.getJobSchedulers();
console.log(`${schedulers.length} active schedulers`);
// Get specific scheduler
const dailyReport = await scheduler.getScheduler("daily-report");
if (dailyReport) {
console.log(`Next run: ${new Date(dailyReport.nextMillis)}`);
}
// Get scheduler count
const count = await scheduler.getSchedulersCount();
console.log(`Total schedulers: ${count}`);
// Remove scheduler
const removed = await scheduler.removeJobScheduler("old-scheduler");
if (removed) {
console.log("Scheduler removed successfully");
}// Schedule for specific date range
await scheduler.upsertJobScheduler(
"holiday-promotion",
{
every: 60 * 60 * 1000, // Every hour
startDate: new Date("2024-12-01"),
endDate: new Date("2024-12-31"),
},
"send-promotion",
{ campaign: "holiday-2024" },
{},
{ override: true }
);
// Schedule with timezone awareness
await scheduler.upsertJobScheduler(
"regional-newsletter",
{
cron: "0 8 * * *", // 8 AM daily
tz: "Europe/London",
},
"send-newsletter",
{ region: "UK" },
{},
{ override: true }
);// Schedule with comprehensive error handling
await scheduler.upsertJobScheduler(
"critical-backup",
{
cron: "0 2 * * *", // 2 AM daily
},
"database-backup",
{ type: "full-backup" },
{
attempts: 3,
backoff: {
type: "exponential",
delay: 60000, // Start with 1 minute
},
removeOnComplete: 7, // Keep 7 days of successful backups
removeOnFail: false, // Keep all failed attempts
},
{
override: true,
producerId: "backup-service",
}
);
// Monitor scheduler health
const monitorScheduler = async () => {
const schedulers = await scheduler.getJobSchedulers();
for (const sched of schedulers) {
const nextRun = new Date(sched.nextMillis);
const now = new Date();
if (nextRun < now) {
console.warn(`Scheduler ${sched.id} may be behind schedule`);
}
console.log(`${sched.id}: next run at ${nextRun.toISOString()}`);
}
};
// Run monitoring every 5 minutes
setInterval(monitorScheduler, 5 * 60 * 1000);// Graceful shutdown
process.on("SIGINT", async () => {
console.log("Shutting down job scheduler...");
await scheduler.close();
process.exit(0);
});
// Handle scheduler errors
scheduler.on("error", (error) => {
console.error("JobScheduler error:", error);
});// Use meaningful scheduler IDs
const schedulerId = `${service}-${environment}-${jobType}`;
// Always specify timezone for cron jobs
const cronOpts = {
cron: "0 9 * * *",
tz: process.env.TIMEZONE || "UTC",
};
// Set reasonable limits for recurring jobs
const limitedSchedule = {
every: 60000, // 1 minute
limit: 1440, // 24 hours worth
};
// Use override carefully
const updateOptions = {
override: process.env.NODE_ENV === "development", // Only in dev
};