Comprehensive event-driven architecture for real-time job lifecycle monitoring and queue state changes.
Monitor individual job state transitions and processing events.
/**
* Job becomes waiting to be processed
* @param event - 'waiting'
* @param callback - Function called with job ID
*/
on(event: 'waiting', callback: (jobId: string | number) => void): Queue;
/**
* Job starts processing
* @param event - 'active'
* @param callback - Function called with job and optional job promise
*/
on(event: 'active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
/**
* Job completes successfully
* @param event - 'completed'
* @param callback - Function called with job and result
*/
on(event: 'completed', callback: (job: Job, result: any) => void): Queue;
/**
* Job fails after processing
* @param event - 'failed'
* @param callback - Function called with job and error
*/
on(event: 'failed', callback: (job: Job, error: Error) => void): Queue;
/**
* Job progress is updated
* @param event - 'progress'
* @param callback - Function called with job and progress value
*/
on(event: 'progress', callback: (job: Job, progress: any) => void): Queue;
/**
* Job becomes stalled (processing too long)
* @param event - 'stalled'
* @param callback - Function called with stalled job
*/
on(event: 'stalled', callback: (job: Job) => void): Queue;
/**
* Job is removed from queue
* @param event - 'removed'
* @param callback - Function called with removed job
*/
on(event: 'removed', callback: (job: Job) => void): Queue;
/**
* Job was duplicated/deduplicated (not processed due to existing duplicate)
* @param event - 'duplicated'
* @param callback - Function called with duplicated job ID
*/
on(event: 'duplicated', callback: (jobId: string | number) => void): Queue;
/**
* Job was debounced (delayed due to debounce settings)
* @param event - 'debounced'
* @param callback - Function called with debounced job ID
*/
on(event: 'debounced', callback: (jobId: string | number) => void): Queue;
/**
* Job lock extension failed during processing
* @param event - 'lock-extension-failed'
* @param callback - Function called with job and lock extension error
*/
on(event: 'lock-extension-failed', callback: (job: Job, error: Error) => void): Queue;
interface JobPromise {
/** Cancel the running job */
cancel(): void;
}Usage Examples:
const Queue = require('bull');
const taskQueue = new Queue('event monitoring');
// Monitor job lifecycle
taskQueue.on('waiting', (jobId) => {
console.log(`Job ${jobId} is waiting to be processed`);
});
taskQueue.on('active', (job, jobPromise) => {
console.log(`Job ${job.id} (${job.name}) started processing`);
console.log('Job data:', job.data);
// Optionally cancel job after timeout
setTimeout(() => {
if (jobPromise) {
jobPromise.cancel();
console.log(`Cancelled job ${job.id} due to timeout`);
}
}, 60000); // Cancel after 1 minute
});
taskQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed successfully`);
console.log('Result:', result);
// Log completion metrics
const processingTime = job.finishedOn - job.processedOn;
console.log(`Processing time: ${processingTime}ms`);
});
taskQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
console.error(`Attempts made: ${job.attemptsMade}/${job.opts.attempts}`);
// Send alert for important job failures
if (job.data.priority === 'critical') {
sendAlert(`Critical job ${job.id} failed: ${err.message}`);
}
});
taskQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
taskQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled and will be retried`);
});Monitor overall queue state changes and management operations.
/**
* Queue is paused
* @param event - 'paused'
* @param callback - Function called when queue is paused
*/
on(event: 'paused', callback: () => void): Queue;
/**
* Queue is resumed
* @param event - 'resumed'
* @param callback - Function called when queue is resumed
*/
on(event: 'resumed', callback: () => void): Queue;
/**
* Queue has processed all waiting jobs
* @param event - 'drained'
* @param callback - Function called when queue is drained
*/
on(event: 'drained', callback: () => void): Queue;
/**
* Old jobs have been cleaned from queue
* @param event - 'cleaned'
* @param callback - Function called with cleaned jobs and their status
*/
on(event: 'cleaned', callback: (jobs: Job[], status: JobStatusClean) => void): Queue;
type JobStatusClean = 'completed' | 'wait' | 'active' | 'delayed' | 'failed' | 'paused';Usage Examples:
// Monitor queue state changes
taskQueue.on('paused', () => {
console.log('Queue has been paused');
sendNotification('Task queue paused - no new jobs will be processed');
});
taskQueue.on('resumed', () => {
console.log('Queue has been resumed');
sendNotification('Task queue resumed - processing continuing');
});
taskQueue.on('drained', () => {
console.log('Queue is drained - all waiting jobs have been processed');
// Opportunity to perform maintenance or shutdown
performMaintenanceTasks();
});
taskQueue.on('cleaned', (jobs, status) => {
console.log(`Cleaned ${jobs.length} ${status} jobs from queue`);
});Monitor job and queue events across all workers and queue instances using global event variants.
/**
* Global variants of job lifecycle events (fired across all workers)
* These events are published to all queue instances, not just the local one
*/
on(event: 'global:waiting', callback: (jobId: string | number) => void): Queue;
on(event: 'global:active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
on(event: 'global:completed', callback: (job: Job, result: any) => void): Queue;
on(event: 'global:failed', callback: (job: Job, error: Error) => void): Queue;
on(event: 'global:progress', callback: (job: Job, progress: any) => void): Queue;
on(event: 'global:stalled', callback: (job: Job) => void): Queue;
on(event: 'global:duplicated', callback: (jobId: string | number) => void): Queue;
on(event: 'global:debounced', callback: (jobId: string | number) => void): Queue;
/**
* Global queue state events
*/
on(event: 'global:paused', callback: () => void): Queue;
on(event: 'global:resumed', callback: () => void): Queue;
on(event: 'global:drained', callback: () => void): Queue;Usage Examples:
// Monitor events across all workers
taskQueue.on('global:completed', (job, result) => {
console.log(`Job ${job.id} completed on any worker`);
// Update shared metrics or database
updateGlobalMetrics('completed', job, result);
});
taskQueue.on('global:failed', (job, error) => {
console.log(`Job ${job.id} failed on any worker: ${error.message}`);
// Send notifications that work across instances
notifyFailure(job, error);
});
// Monitor queue state across all instances
taskQueue.on('global:paused', () => {
console.log('Queue paused globally across all workers');
});
taskQueue.on('global:drained', () => {
console.log('All queues are drained across all workers');
// Safe to perform system maintenance
scheduleMaintenanceTask();
});
// Track duplicated jobs globally
taskQueue.on('global:duplicated', (jobId) => {
console.log(`Job ${jobId} was duplicated and skipped globally`);
});Handle queue-level errors and system issues.
/**
* Error occurred in queue operations
* @param event - 'error'
* @param callback - Function called with error details
*/
on(event: 'error', callback: (error: Error) => void): Queue;Usage Examples:
// Handle queue errors
taskQueue.on('error', (error) => {
console.error('Queue error occurred:', error.message);
console.error('Stack trace:', error.stack);
// Send critical alert
sendCriticalAlert(`Queue error: ${error.message}`);
// Log error for analysis
logError('queue_error', {
queue: taskQueue.name,
error: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
});
});Use events for advanced job processing patterns.
// Example: Rate limiting based on events
let activeJobs = 0;
const MAX_CONCURRENT_JOBS = 5;
taskQueue.on('active', (job) => {
activeJobs++;
console.log(`Active jobs: ${activeJobs}/${MAX_CONCURRENT_JOBS}`);
if (activeJobs >= MAX_CONCURRENT_JOBS) {
console.log('Maximum concurrent jobs reached');
}
});
taskQueue.on('completed', (job) => {
activeJobs--;
console.log(`Job completed. Active jobs: ${activeJobs}`);
});
taskQueue.on('failed', (job) => {
activeJobs--;
console.log(`Job failed. Active jobs: ${activeJobs}`);
});
// Example: Job dependency tracking
const jobDependencies = new Map();
taskQueue.on('completed', (job, result) => {
// Check if other jobs were waiting for this one
const dependentJobs = jobDependencies.get(job.id);
if (dependentJobs) {
console.log(`Job ${job.id} completed, triggering ${dependentJobs.length} dependent jobs`);
dependentJobs.forEach(async (dependentJobData) => {
await taskQueue.add(dependentJobData.name, {
...dependentJobData.data,
parentResult: result
});
});
jobDependencies.delete(job.id);
}
});Collect detailed metrics using events.
// Metrics collection using events
class QueueMetrics {
constructor(queue) {
this.queue = queue;
this.metrics = {
completed: 0,
failed: 0,
totalProcessingTime: 0,
averageProcessingTime: 0,
errorsByType: new Map(),
jobsByName: new Map()
};
this.setupEventListeners();
}
setupEventListeners() {
this.queue.on('completed', (job, result) => {
this.metrics.completed++;
const processingTime = job.finishedOn - job.processedOn;
this.metrics.totalProcessingTime += processingTime;
this.metrics.averageProcessingTime =
this.metrics.totalProcessingTime / this.metrics.completed;
// Track by job name
const nameCount = this.metrics.jobsByName.get(job.name) || 0;
this.metrics.jobsByName.set(job.name, nameCount + 1);
console.log(`Avg processing time: ${this.metrics.averageProcessingTime.toFixed(2)}ms`);
});
this.queue.on('failed', (job, error) => {
this.metrics.failed++;
// Track error types
const errorType = error.name || 'Unknown';
const errorCount = this.metrics.errorsByType.get(errorType) || 0;
this.metrics.errorsByType.set(errorType, errorCount + 1);
const successRate = this.metrics.completed /
(this.metrics.completed + this.metrics.failed) * 100;
console.log(`Success rate: ${successRate.toFixed(2)}%`);
});
this.queue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled - possible worker issue`);
});
}
getReport() {
return {
summary: {
completed: this.metrics.completed,
failed: this.metrics.failed,
successRate: this.metrics.completed /
(this.metrics.completed + this.metrics.failed) * 100,
averageProcessingTime: this.metrics.averageProcessingTime
},
errorsByType: Object.fromEntries(this.metrics.errorsByType),
jobsByName: Object.fromEntries(this.metrics.jobsByName)
};
}
}
// Use metrics collector
const metrics = new QueueMetrics(taskQueue);
// Generate report periodically
setInterval(() => {
const report = metrics.getReport();
console.log('Queue Metrics Report:', JSON.stringify(report, null, 2));
}, 60000); // Every minuteCreate real-time monitoring using events.
class QueueDashboard {
constructor(queue) {
this.queue = queue;
this.state = {
activeJobs: new Map(),
recentCompleted: [],
recentFailed: [],
queueState: 'active'
};
this.setupEventListeners();
this.startPeriodicReport();
}
setupEventListeners() {
this.queue.on('active', (job) => {
this.state.activeJobs.set(job.id, {
id: job.id,
name: job.name,
startTime: Date.now(),
progress: 0
});
this.updateDisplay();
});
this.queue.on('progress', (job, progress) => {
const activeJob = this.state.activeJobs.get(job.id);
if (activeJob) {
activeJob.progress = progress;
this.updateDisplay();
}
});
this.queue.on('completed', (job, result) => {
this.state.activeJobs.delete(job.id);
this.state.recentCompleted.unshift({
id: job.id,
name: job.name,
completedAt: Date.now(),
result: result
});
// Keep only last 10 completed jobs
this.state.recentCompleted = this.state.recentCompleted.slice(0, 10);
this.updateDisplay();
});
this.queue.on('failed', (job, error) => {
this.state.activeJobs.delete(job.id);
this.state.recentFailed.unshift({
id: job.id,
name: job.name,
failedAt: Date.now(),
error: error.message
});
// Keep only last 10 failed jobs
this.state.recentFailed = this.state.recentFailed.slice(0, 10);
this.updateDisplay();
});
this.queue.on('paused', () => {
this.state.queueState = 'paused';
this.updateDisplay();
});
this.queue.on('resumed', () => {
this.state.queueState = 'active';
this.updateDisplay();
});
this.queue.on('drained', () => {
console.log('\nπ Queue is drained - all jobs processed!');
});
}
updateDisplay() {
console.clear();
console.log(`\n=== Queue Dashboard: ${this.queue.name} ===`);
console.log(`Status: ${this.state.queueState.toUpperCase()}`);
console.log(`\nActive Jobs (${this.state.activeJobs.size}):`);
this.state.activeJobs.forEach(job => {
const runningTime = Date.now() - job.startTime;
console.log(` ${job.id}: ${job.name} (${job.progress}%) - ${Math.round(runningTime/1000)}s`);
});
if (this.state.recentCompleted.length > 0) {
console.log(`\nRecent Completed Jobs:`);
this.state.recentCompleted.slice(0, 5).forEach(job => {
const timeAgo = Date.now() - job.completedAt;
console.log(` β
${job.id}: ${job.name} (${Math.round(timeAgo/1000)}s ago)`);
});
}
if (this.state.recentFailed.length > 0) {
console.log(`\nRecent Failed Jobs:`);
this.state.recentFailed.slice(0, 3).forEach(job => {
const timeAgo = Date.now() - job.failedAt;
console.log(` β ${job.id}: ${job.name} - ${job.error} (${Math.round(timeAgo/1000)}s ago)`);
});
}
console.log(`\n${'='.repeat(50)}`);
}
startPeriodicReport() {
// Update display every 2 seconds if there are active jobs
setInterval(() => {
if (this.state.activeJobs.size > 0) {
this.updateDisplay();
}
}, 2000);
}
}
// Use dashboard
const dashboard = new QueueDashboard(taskQueue);// Example of robust event handling with error recovery
function setupRobustEventHandlers(queue) {
// Wrap event handlers in try-catch to prevent crashes
queue.on('completed', (job, result) => {
try {
logJobCompletion(job, result);
updateMetrics('completed', job);
notifyJobCompletion(job, result);
} catch (error) {
console.error('Error in completed handler:', error);
}
});
queue.on('failed', (job, error) => {
try {
logJobFailure(job, error);
updateMetrics('failed', job);
// Conditional alerting
if (job.opts.attempts && job.attemptsMade >= job.opts.attempts) {
sendFailureAlert(job, error);
}
} catch (handlerError) {
console.error('Error in failed handler:', handlerError);
}
});
// Remove event listeners when shutting down
process.on('SIGTERM', () => {
console.log('Removing event listeners...');
queue.removeAllListeners();
queue.close();
});
}
// Use robust handlers
setupRobustEventHandlers(taskQueue);