Individual job management including state transitions, progress tracking, retry logic, and metadata handling. The Job class represents work units with comprehensive lifecycle management capabilities.
Represents individual jobs with state management and lifecycle methods.
/**
* Represents a job in the queue with lifecycle management
*/
class Job<T = any, R = any, N extends string = string> {
/** Unique job identifier */
readonly id: string;
/** Job name/type */
readonly name: N;
/** Job data payload */
readonly data: T;
/** Job options */
readonly opts: JobsOptions;
/** Current progress (0-100 or custom object) */
progress: number | object;
/** Job return value after completion */
returnvalue?: R;
/** Error stack traces from failed attempts */
stacktrace?: string[];
/** Delay before processing (milliseconds) */
delay?: number;
/** Job priority (higher = more priority) */
priority?: number;
/** Job creation timestamp */
timestamp: number;
/** Processing start timestamp */
processedOn?: number;
/** Completion timestamp */
finishedOn?: number;
/** Number of attempts made */
attemptsMade: number;
/** Update job progress */
updateProgress(progress: number | object): Promise<void>;
/** Add log entry to job */
log(row: string): Promise<number>;
/** Get job logs */
getState(): Promise<JobState>;
/** Move job to completed state */
moveToCompleted(returnvalue: R, token: string, fetchNext?: boolean): Promise<JobNode>;
/** Move job to failed state */
moveToFailed(errorInfo: ErrorInfo, token: string, fetchNext?: boolean): Promise<JobNode>;
/** Retry failed job */
retry(state?: 'completed' | 'failed'): Promise<void>;
/** Remove job from queue */
remove(): Promise<void>;
/** Discard job (mark as failed without retry) */
discard(): Promise<void>;
/** Promote delayed/prioritized job to waiting */
promote(): Promise<void>;
/** Update job data */
update(data: T): Promise<void>;
/** Check if job is waiting */
isWaiting(): Promise<boolean>;
/** Check if job is active */
isActive(): Promise<boolean>;
/** Check if job is completed */
isCompleted(): Promise<boolean>;
/** Check if job is failed */
isFailed(): Promise<boolean>;
/** Check if job is delayed */
isDelayed(): Promise<boolean>;
/** Get job duration in milliseconds */
duration(): number;
}Usage Examples:
import { Job } from "bullmq";
// In a worker processor
const worker = new Worker("data processing", async (job: Job) => {
console.log(`Processing job ${job.id}`);
console.log("Job data:", job.data);
console.log("Created at:", new Date(job.timestamp));
// Update progress
await job.updateProgress(25);
await job.log("Started data validation");
// Process data
const validatedData = await validateData(job.data);
await job.updateProgress(50);
await job.log("Data validation completed");
const processedData = await processData(validatedData);
await job.updateProgress(75);
await job.log("Data processing completed");
const result = await saveResults(processedData);
await job.updateProgress(100);
await job.log("Results saved successfully");
return result;
});
// Working with jobs outside of processors
const queue = new Queue("my queue");
const job = await queue.add("process data", { items: [1, 2, 3] });
// Check job state
const state = await job.getState();
console.log(`Job ${job.id} is in state: ${state}`);
// Wait for completion and get result
const worker = new Worker("my queue", async (job) => {
return { processed: job.data.items.length };
});
await worker.run();
// Manually manage job lifecycle
if (await job.isWaiting()) {
console.log("Job is waiting to be processed");
}
if (await job.isActive()) {
console.log("Job is currently being processed");
}
if (await job.isCompleted()) {
console.log("Job completed with result:", job.returnvalue);
}interface JobsOptions extends BaseJobOptions {
/** Job deduplication settings */
deduplication?: DeduplicationOptions;
}
interface BaseJobOptions {
/** Job priority (0-2097152, higher = more priority) */
priority?: number;
/** Delay before processing (milliseconds) */
delay?: number;
/** Number of retry attempts (default: 0) */
attempts?: number;
/** Retry backoff strategy */
backoff?: BackoffStrategy | BackoffOptions;
/** Lifo (last in, first out) processing */
lifo?: boolean;
/** Job timeout in milliseconds */
jobTimeout?: number;
/** Keep completed job count/age */
removeOnComplete?: number | boolean | KeepJobs;
/** Keep failed job count/age */
removeOnFail?: number | boolean | KeepJobs;
/** Parent job relationship */
parent?: ParentOptions;
/** Repeatable job configuration */
repeat?: RepeatOptions;
/** Job metadata */
jobId?: string;
}type JobState =
| 'completed' // Job finished successfully
| 'failed' // Job failed permanently
| 'active' // Job is currently being processed
| 'delayed' // Job is delayed until future time
| 'prioritized' // Job is in priority queue
| 'waiting' // Job is waiting to be processed
| 'waiting-children'; // Job is waiting for child jobs to complete// Numeric progress (0-100)
await job.updateProgress(50);
// Object progress with detailed information
await job.updateProgress({
percentage: 75,
stage: "processing",
itemsProcessed: 150,
totalItems: 200,
estimatedTimeRemaining: 30000,
});
// Listen to progress events
worker.on("progress", (job, progress) => {
if (typeof progress === "number") {
console.log(`Job ${job.id}: ${progress}% complete`);
} else {
console.log(`Job ${job.id} progress:`, progress);
}
});// Add log entries during processing
await job.log("Starting data download");
await job.log("Downloaded 1000 records");
await job.log("Processing complete");
// Get job logs (requires separate method - not part of Job class)
// Logs are typically accessed via queue.getJobLogs(jobId)// Check various job states
const isWaiting = await job.isWaiting();
const isActive = await job.isActive();
const isCompleted = await job.isCompleted();
const isFailed = await job.isFailed();
const isDelayed = await job.isDelayed();
// Get current state
const currentState = await job.getState();
console.log(`Job is currently: ${currentState}`);
// Manual state transitions (typically done by workers)
// These require proper tokens and are usually internal operations// Retry a failed job
const failedJob = await queue.getJob("failed-job-id");
if (failedJob && await failedJob.isFailed()) {
await failedJob.retry();
console.log("Job queued for retry");
}
// Promote delayed job to immediate processing
const delayedJob = await queue.getJob("delayed-job-id");
if (delayedJob && await delayedJob.isDelayed()) {
await delayedJob.promote();
console.log("Job promoted to waiting queue");
}// Update job data (for waiting jobs)
const waitingJob = await queue.getJob("waiting-job-id");
if (waitingJob && await waitingJob.isWaiting()) {
await waitingJob.update({
...waitingJob.data,
updatedField: "new value",
timestamp: Date.now(),
});
}// Remove completed job
const completedJob = await queue.getJob("completed-job-id");
if (completedJob && await completedJob.isCompleted()) {
await completedJob.remove();
console.log("Job removed from queue");
}
// Discard active job (mark as failed without retry)
const activeJob = await queue.getJob("active-job-id");
if (activeJob && await activeJob.isActive()) {
await activeJob.discard();
console.log("Job discarded");
}// Calculate job processing time
if (job.processedOn && job.finishedOn) {
const processingTime = job.finishedOn - job.processedOn;
console.log(`Job took ${processingTime}ms to process`);
}
// Total job duration (from creation to completion)
const totalDuration = job.duration();
console.log(`Total job lifecycle: ${totalDuration}ms`);
// Wait time (from creation to processing start)
if (job.processedOn) {
const waitTime = job.processedOn - job.timestamp;
console.log(`Job waited ${waitTime}ms before processing`);
}interface ErrorInfo {
message: string;
stack?: string;
}When jobs fail, error information is stored:
// Access error information for failed jobs
if (await job.isFailed() && job.stacktrace) {
console.log("Failure reasons:");
job.stacktrace.forEach((trace, index) => {
console.log(`Attempt ${index + 1}: ${trace}`);
});
}interface ParentOptions {
/** Parent job ID */
id: string;
/** Parent job queue name */
queue: string;
}// Create child job with parent relationship
const parentJob = await queue.add("parent task", { items: [1, 2, 3] });
const childJob = await queue.add("child task", { item: 1 }, {
parent: {
id: parentJob.id,
queue: "my queue",
},
});