Individual job state management, progress tracking, retry logic, and job manipulation.
Access job data and metadata.
class Job {
/** Unique job identifier */
readonly id: string | number;
/** Job name/type */
readonly name: string;
/** Job payload data */
readonly data: any;
/** Job configuration options */
readonly opts: JobOptions;
/** Reference to parent queue */
readonly queue: Queue;
/** Job creation timestamp */
readonly timestamp: number;
/** Processing start timestamp (if started) */
readonly processedOn?: number;
/** Completion timestamp (if completed) */
readonly finishedOn?: number;
/** Number of processing attempts made */
readonly attemptsMade: number;
/** Failure reason (if failed) */
readonly failedReason?: string;
/** Error stack traces from failed attempts */
readonly stacktrace: string[];
/** Return value from successful completion */
readonly returnvalue: any;
/** Timestamp when job was retried (if retried) */
readonly retriedOn?: number;
/** Debounce identifier for the job (if debounced) */
readonly debounceId?: string;
}Check current job state with boolean methods.
/**
* Check if job is completed
* @returns Promise that resolves to true if job is completed
*/
isCompleted(): Promise<boolean>;
/**
* Check if job has failed
* @returns Promise that resolves to true if job has failed
*/
isFailed(): Promise<boolean>;
/**
* Check if job is delayed
* @returns Promise that resolves to true if job is delayed
*/
isDelayed(): Promise<boolean>;
/**
* Check if job is currently active/processing
* @returns Promise that resolves to true if job is active
*/
isActive(): Promise<boolean>;
/**
* Check if job is waiting to be processed
* @returns Promise that resolves to true if job is waiting
*/
isWaiting(): Promise<boolean>;
/**
* Check if job is paused
* @returns Promise that resolves to true if job is paused
*/
isPaused(): Promise<boolean>;
/**
* Check if job is stuck/stalled
* @returns Promise that resolves to true if job is stuck
*/
isStuck(): Promise<boolean>;
/**
* Check if job is discarded (won't be retried)
* @returns Boolean indicating if job is discarded
*/
isDiscarded(): boolean;
/**
* Get the current job state
* @returns Promise that resolves to current JobStatus or 'stuck'
*/
getState(): Promise<JobStatus | 'stuck'>;
type JobStatus = 'completed' | 'waiting' | 'active' | 'delayed' | 'failed' | 'paused';Usage Examples:
const Queue = require('bull');
const taskQueue = new Queue('tasks');
// Check job states
const job = await taskQueue.getJob(123);
if (job) {
const isComplete = await job.isCompleted();
const isFailed = await job.isFailed();
const isActive = await job.isActive();
console.log(`Job ${job.id} - Completed: ${isComplete}, Failed: ${isFailed}, Active: ${isActive}`);
// Get detailed state
const state = await job.getState();
console.log(`Current state: ${state}`);
}
// Monitor job state changes
const newJob = await taskQueue.add('monitor-me', { data: 'test' });
// Poll job state
const checkState = setInterval(async () => {
const state = await newJob.getState();
console.log(`Job ${newJob.id} state: ${state}`);
if (state === 'completed' || state === 'failed') {
clearInterval(checkState);
}
}, 1000);Track and update job progress during processing.
/**
* Get current job progress
* @returns Current progress value (any type)
*/
progress(): any;
/**
* Update job progress (call from within job processor)
* @param value - Progress value (number, object, or any data)
* @returns Promise that resolves when progress is updated
*/
progress(value: any): Promise<void>;Usage Examples:
// Inside a job processor
taskQueue.process('file-upload', async (job) => {
const { fileUrl, chunks } = job.data;
// Initial progress
await job.progress(0);
for (let i = 0; i < chunks.length; i++) {
await processChunk(chunks[i]);
// Update progress as percentage
const progressPercent = Math.round((i + 1) / chunks.length * 100);
await job.progress(progressPercent);
}
return { uploaded: true, chunks: chunks.length };
});
// Monitor progress from outside
taskQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
// Get current progress of existing job
const job = await taskQueue.getJob(456);
const currentProgress = job.progress();
console.log(`Current progress: ${currentProgress}`);Control job lifecycle and state transitions.
/**
* Remove job from queue and all associated data
* @returns Promise that resolves when job is removed
*/
remove(): Promise<void>;
/**
* Retry a failed job
* @returns Promise that resolves when job is scheduled for retry
*/
retry(): Promise<void>;
/**
* Discard job to prevent further retry attempts
* @returns Promise that resolves when job is discarded
*/
discard(): Promise<void>;
/**
* Promote a delayed job to waiting state
* @returns Promise that resolves when job is promoted
*/
promote(): Promise<void>;
/**
* Update job data
* @param data - New job data to replace existing data
* @returns Promise that resolves when data is updated
*/
update(data: any): Promise<void>;Usage Examples:
const job = await taskQueue.getJob(789);
if (job) {
const state = await job.getState();
switch (state) {
case 'failed':
// Retry failed job
console.log(`Retrying failed job ${job.id}`);
await job.retry();
break;
case 'delayed':
// Promote delayed job to run immediately
console.log(`Promoting delayed job ${job.id}`);
await job.promote();
break;
case 'waiting':
// Update job data
const updatedData = { ...job.data, priority: 'high' };
await job.update(updatedData);
break;
case 'completed':
// Remove completed job
console.log(`Removing completed job ${job.id}`);
await job.remove();
break;
}
}
// Discard job to prevent retries
const problematicJob = await taskQueue.getJob(999);
if (problematicJob && await problematicJob.isFailed()) {
await problematicJob.discard();
console.log('Job discarded, will not retry');
}Wait for job completion and handle results.
/**
* Wait for job to finish and get the result
* @returns Promise that resolves to job result or rejects with error
*/
finished(): Promise<any>;Usage Examples:
// Create job and wait for completion
const job = await taskQueue.add('process-data', {
data: 'important data'
});
try {
// Wait for job to complete
const result = await job.finished();
console.log(`Job ${job.id} completed with result:`, result);
} catch (error) {
console.error(`Job ${job.id} failed:`, error.message);
}
// Multiple jobs with Promise.all
const jobs = await Promise.all([
taskQueue.add('task1', { id: 1 }),
taskQueue.add('task2', { id: 2 }),
taskQueue.add('task3', { id: 3 })
]);
// Wait for all jobs to complete
try {
const results = await Promise.all(
jobs.map(job => job.finished())
);
console.log('All jobs completed:', results);
} catch (error) {
console.error('One or more jobs failed:', error);
}Add log entries to jobs for debugging and monitoring.
/**
* Add a log entry to the job
* @param row - Log message string
* @returns Promise that resolves when log is added
*/
log(row: string): Promise<any>;Usage Examples:
// Inside job processor
taskQueue.process('detailed-task', async (job) => {
await job.log('Task started');
try {
await job.log('Step 1: Validating input');
validateInput(job.data);
await job.log('Step 2: Processing data');
const result = await processData(job.data);
await job.log('Step 3: Saving results');
await saveResults(result);
await job.log('Task completed successfully');
return result;
} catch (error) {
await job.log(`Error: ${error.message}`);
throw error;
}
});
// View job logs (see Queue Monitoring documentation)
const logs = await taskQueue.getJobLogs(job.id);
console.log('Job logs:', logs.logs);Advanced job state manipulation for complex scenarios.
/**
* Move job to completed state manually
* @param returnValue - Value to set as job result
* @param ignoreLock - Whether to ignore job lock
* @param notFetch - Whether to skip fetching next job
* @returns Promise that resolves to next job data or null
*/
moveToCompleted(returnValue?: string, ignoreLock?: boolean, notFetch?: boolean): Promise<[any, string | number] | null>;
/**
* Move job to failed state manually
* @param errorInfo - Error information object
* @param ignoreLock - Whether to ignore job lock
* @returns Promise that resolves to next job data or null
*/
moveToFailed(errorInfo: { message: string }, ignoreLock?: boolean): Promise<[any, string | number] | null>;
/**
* Move job to delayed state with specific timestamp
* @param timestamp - Unix timestamp when job should be processed
* @param ignoreLock - Whether to ignore job lock
* @returns Promise that resolves when job is moved to delayed state
*/
moveToDelayed(timestamp: number, ignoreLock?: boolean): Promise<any>;Usage Examples:
// Manual job state management (advanced usage)
const job = await taskQueue.getJob(123);
// Manually complete job
await job.moveToCompleted('Success', true);
// Manually fail job
await job.moveToFailed({ message: 'Manual failure' }, true);Manage job locks for coordination between workers.
/**
* Take a lock on the job
* @returns Promise that resolves to lock identifier or false if failed
*/
takeLock(): Promise<number | false>;
/**
* Release the job lock
* @returns Promise that resolves when lock is released
*/
releaseLock(): Promise<void>;
/**
* Extend the job lock duration
* @param duration - Lock extension duration in milliseconds
* @returns Promise that resolves to new lock expiration time
*/
extendLock(duration: number): Promise<number>;
/**
* Get the lock key for this job
* @returns Lock key string
*/
lockKey(): string;Usage Examples:
// Advanced job locking (typically not needed in normal usage)
const job = await taskQueue.getJob(456);
// Take lock
const lockId = await job.takeLock();
if (lockId) {
console.log(`Acquired lock ${lockId} for job ${job.id}`);
try {
// Perform work while holding lock
await performWork(job.data);
// Extend lock if needed
await job.extendLock(30000); // Extend by 30 seconds
} finally {
// Always release lock
await job.releaseLock();
}
} else {
console.log(`Could not acquire lock for job ${job.id}`);
}Convert job to JSON representation.
/**
* Convert job to JSON object
* @returns Job data as plain object
*/
toJSON(): {
id: string | number;
name: string;
data: any;
opts: JobOptions;
progress: number;
delay: number;
timestamp: number;
attemptsMade: number;
failedReason: any;
stacktrace: string[] | null;
returnvalue: any;
finishedOn: number | null;
processedOn: number | null;
};Usage Examples:
const job = await taskQueue.getJob(789);
const jobData = job.toJSON();
console.log('Job as JSON:', jobData);
// Save job state for external processing
const jobState = {
...jobData,
queueName: job.queue.name,
currentState: await job.getState()
};
await saveJobState(jobState);