or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

event-system.mddocs/

Event System

Comprehensive event-driven architecture for real-time job lifecycle monitoring and queue state changes.

Capabilities

Job Lifecycle Events

Monitor individual job state transitions and processing events.

/**
 * Job becomes waiting to be processed
 * @param event - 'waiting'
 * @param callback - Function called with job ID
 */
on(event: 'waiting', callback: (jobId: string | number) => void): Queue;

/**
 * Job starts processing
 * @param event - 'active'  
 * @param callback - Function called with job and optional job promise
 */
on(event: 'active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;

/**
 * Job completes successfully
 * @param event - 'completed'
 * @param callback - Function called with job and result
 */
on(event: 'completed', callback: (job: Job, result: any) => void): Queue;

/**
 * Job fails after processing
 * @param event - 'failed'
 * @param callback - Function called with job and error
 */
on(event: 'failed', callback: (job: Job, error: Error) => void): Queue;

/**
 * Job progress is updated
 * @param event - 'progress'
 * @param callback - Function called with job and progress value
 */
on(event: 'progress', callback: (job: Job, progress: any) => void): Queue;

/**
 * Job becomes stalled (processing too long)
 * @param event - 'stalled'
 * @param callback - Function called with stalled job
 */
on(event: 'stalled', callback: (job: Job) => void): Queue;

/**
 * Job is removed from queue
 * @param event - 'removed'
 * @param callback - Function called with removed job
 */
on(event: 'removed', callback: (job: Job) => void): Queue;

/**
 * Job was duplicated/deduplicated (not processed due to existing duplicate)
 * @param event - 'duplicated'
 * @param callback - Function called with duplicated job ID
 */
on(event: 'duplicated', callback: (jobId: string | number) => void): Queue;

/**
 * Job was debounced (delayed due to debounce settings)
 * @param event - 'debounced'
 * @param callback - Function called with debounced job ID
 */
on(event: 'debounced', callback: (jobId: string | number) => void): Queue;

/**
 * Job lock extension failed during processing
 * @param event - 'lock-extension-failed'
 * @param callback - Function called with job and lock extension error
 */
on(event: 'lock-extension-failed', callback: (job: Job, error: Error) => void): Queue;

interface JobPromise {
  /** Cancel the running job */
  cancel(): void;
}

Usage Examples:

const Queue = require('bull');
const taskQueue = new Queue('event monitoring');

// Monitor job lifecycle
taskQueue.on('waiting', (jobId) => {
  console.log(`Job ${jobId} is waiting to be processed`);
});

taskQueue.on('active', (job, jobPromise) => {
  console.log(`Job ${job.id} (${job.name}) started processing`);
  console.log('Job data:', job.data);
  
  // Optionally cancel job after timeout
  setTimeout(() => {
    if (jobPromise) {
      jobPromise.cancel();
      console.log(`Cancelled job ${job.id} due to timeout`);
    }
  }, 60000); // Cancel after 1 minute
});

taskQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed successfully`);
  console.log('Result:', result);
  
  // Log completion metrics
  const processingTime = job.finishedOn - job.processedOn;
  console.log(`Processing time: ${processingTime}ms`);
});

taskQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
  console.error(`Attempts made: ${job.attemptsMade}/${job.opts.attempts}`);
  
  // Send alert for important job failures
  if (job.data.priority === 'critical') {
    sendAlert(`Critical job ${job.id} failed: ${err.message}`);
  }
});

taskQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

taskQueue.on('stalled', (job) => {
  console.warn(`Job ${job.id} stalled and will be retried`);
});

Queue State Events

Monitor overall queue state changes and management operations.

/**
 * Queue is paused
 * @param event - 'paused'
 * @param callback - Function called when queue is paused
 */
on(event: 'paused', callback: () => void): Queue;

/**
 * Queue is resumed
 * @param event - 'resumed'
 * @param callback - Function called when queue is resumed
 */
on(event: 'resumed', callback: () => void): Queue;

/**
 * Queue has processed all waiting jobs
 * @param event - 'drained'
 * @param callback - Function called when queue is drained
 */
on(event: 'drained', callback: () => void): Queue;

/**
 * Old jobs have been cleaned from queue
 * @param event - 'cleaned'
 * @param callback - Function called with cleaned jobs and their status
 */
on(event: 'cleaned', callback: (jobs: Job[], status: JobStatusClean) => void): Queue;

type JobStatusClean = 'completed' | 'wait' | 'active' | 'delayed' | 'failed' | 'paused';

Usage Examples:

// Monitor queue state changes
taskQueue.on('paused', () => {
  console.log('Queue has been paused');
  sendNotification('Task queue paused - no new jobs will be processed');
});

taskQueue.on('resumed', () => {
  console.log('Queue has been resumed');
  sendNotification('Task queue resumed - processing continuing');
});

taskQueue.on('drained', () => {
  console.log('Queue is drained - all waiting jobs have been processed');
  
  // Opportunity to perform maintenance or shutdown
  performMaintenanceTasks();
});

taskQueue.on('cleaned', (jobs, status) => {
  console.log(`Cleaned ${jobs.length} ${status} jobs from queue`);
});

Global Events

Monitor job and queue events across all workers and queue instances using global event variants.

/**
 * Global variants of job lifecycle events (fired across all workers)
 * These events are published to all queue instances, not just the local one
 */
on(event: 'global:waiting', callback: (jobId: string | number) => void): Queue;
on(event: 'global:active', callback: (job: Job, jobPromise?: JobPromise) => void): Queue;
on(event: 'global:completed', callback: (job: Job, result: any) => void): Queue;
on(event: 'global:failed', callback: (job: Job, error: Error) => void): Queue;
on(event: 'global:progress', callback: (job: Job, progress: any) => void): Queue;
on(event: 'global:stalled', callback: (job: Job) => void): Queue;
on(event: 'global:duplicated', callback: (jobId: string | number) => void): Queue;
on(event: 'global:debounced', callback: (jobId: string | number) => void): Queue;

/**
 * Global queue state events
 */
on(event: 'global:paused', callback: () => void): Queue;
on(event: 'global:resumed', callback: () => void): Queue;
on(event: 'global:drained', callback: () => void): Queue;

Usage Examples:

// Monitor events across all workers
taskQueue.on('global:completed', (job, result) => {
  console.log(`Job ${job.id} completed on any worker`);
  
  // Update shared metrics or database
  updateGlobalMetrics('completed', job, result);
});

taskQueue.on('global:failed', (job, error) => {
  console.log(`Job ${job.id} failed on any worker: ${error.message}`);
  
  // Send notifications that work across instances
  notifyFailure(job, error);
});

// Monitor queue state across all instances
taskQueue.on('global:paused', () => {
  console.log('Queue paused globally across all workers');
});

taskQueue.on('global:drained', () => {
  console.log('All queues are drained across all workers');
  // Safe to perform system maintenance
  scheduleMaintenanceTask();
});

// Track duplicated jobs globally
taskQueue.on('global:duplicated', (jobId) => {
  console.log(`Job ${jobId} was duplicated and skipped globally`);
});

Error Handling Events

Handle queue-level errors and system issues.

/**
 * Error occurred in queue operations
 * @param event - 'error'
 * @param callback - Function called with error details
 */
on(event: 'error', callback: (error: Error) => void): Queue;

Usage Examples:

// Handle queue errors
taskQueue.on('error', (error) => {
  console.error('Queue error occurred:', error.message);
  console.error('Stack trace:', error.stack);
  
  // Send critical alert
  sendCriticalAlert(`Queue error: ${error.message}`);
  
  // Log error for analysis
  logError('queue_error', {
    queue: taskQueue.name,
    error: error.message,
    stack: error.stack,
    timestamp: new Date().toISOString()
  });
});

Event-Driven Job Processing

Use events for advanced job processing patterns.

// Example: Rate limiting based on events
let activeJobs = 0;
const MAX_CONCURRENT_JOBS = 5;

taskQueue.on('active', (job) => {
  activeJobs++;
  console.log(`Active jobs: ${activeJobs}/${MAX_CONCURRENT_JOBS}`);
  
  if (activeJobs >= MAX_CONCURRENT_JOBS) {
    console.log('Maximum concurrent jobs reached');
  }
});

taskQueue.on('completed', (job) => {
  activeJobs--;
  console.log(`Job completed. Active jobs: ${activeJobs}`);
});

taskQueue.on('failed', (job) => {
  activeJobs--;
  console.log(`Job failed. Active jobs: ${activeJobs}`);
});

// Example: Job dependency tracking
const jobDependencies = new Map();

taskQueue.on('completed', (job, result) => {
  // Check if other jobs were waiting for this one
  const dependentJobs = jobDependencies.get(job.id);
  if (dependentJobs) {
    console.log(`Job ${job.id} completed, triggering ${dependentJobs.length} dependent jobs`);
    
    dependentJobs.forEach(async (dependentJobData) => {
      await taskQueue.add(dependentJobData.name, {
        ...dependentJobData.data,
        parentResult: result
      });
    });
    
    jobDependencies.delete(job.id);
  }
});

Event-Based Metrics Collection

Collect detailed metrics using events.

// Metrics collection using events
class QueueMetrics {
  constructor(queue) {
    this.queue = queue;
    this.metrics = {
      completed: 0,
      failed: 0,
      totalProcessingTime: 0,
      averageProcessingTime: 0,
      errorsByType: new Map(),
      jobsByName: new Map()
    };
    
    this.setupEventListeners();
  }
  
  setupEventListeners() {
    this.queue.on('completed', (job, result) => {
      this.metrics.completed++;
      
      const processingTime = job.finishedOn - job.processedOn;
      this.metrics.totalProcessingTime += processingTime;
      this.metrics.averageProcessingTime = 
        this.metrics.totalProcessingTime / this.metrics.completed;
      
      // Track by job name
      const nameCount = this.metrics.jobsByName.get(job.name) || 0;
      this.metrics.jobsByName.set(job.name, nameCount + 1);
      
      console.log(`Avg processing time: ${this.metrics.averageProcessingTime.toFixed(2)}ms`);
    });
    
    this.queue.on('failed', (job, error) => {
      this.metrics.failed++;
      
      // Track error types
      const errorType = error.name || 'Unknown';
      const errorCount = this.metrics.errorsByType.get(errorType) || 0;
      this.metrics.errorsByType.set(errorType, errorCount + 1);
      
      const successRate = this.metrics.completed / 
        (this.metrics.completed + this.metrics.failed) * 100;
      console.log(`Success rate: ${successRate.toFixed(2)}%`);
    });
    
    this.queue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled - possible worker issue`);
    });
  }
  
  getReport() {
    return {
      summary: {
        completed: this.metrics.completed,
        failed: this.metrics.failed,
        successRate: this.metrics.completed / 
          (this.metrics.completed + this.metrics.failed) * 100,
        averageProcessingTime: this.metrics.averageProcessingTime
      },
      errorsByType: Object.fromEntries(this.metrics.errorsByType),
      jobsByName: Object.fromEntries(this.metrics.jobsByName)
    };
  }
}

// Use metrics collector
const metrics = new QueueMetrics(taskQueue);

// Generate report periodically
setInterval(() => {
  const report = metrics.getReport();
  console.log('Queue Metrics Report:', JSON.stringify(report, null, 2));
}, 60000); // Every minute

Event-Based Job Monitoring Dashboard

Create real-time monitoring using events.

class QueueDashboard {
  constructor(queue) {
    this.queue = queue;
    this.state = {
      activeJobs: new Map(),
      recentCompleted: [],
      recentFailed: [],
      queueState: 'active'
    };
    
    this.setupEventListeners();
    this.startPeriodicReport();
  }
  
  setupEventListeners() {
    this.queue.on('active', (job) => {
      this.state.activeJobs.set(job.id, {
        id: job.id,
        name: job.name,
        startTime: Date.now(),
        progress: 0
      });
      this.updateDisplay();
    });
    
    this.queue.on('progress', (job, progress) => {
      const activeJob = this.state.activeJobs.get(job.id);
      if (activeJob) {
        activeJob.progress = progress;
        this.updateDisplay();
      }
    });
    
    this.queue.on('completed', (job, result) => {
      this.state.activeJobs.delete(job.id);
      this.state.recentCompleted.unshift({
        id: job.id,
        name: job.name,
        completedAt: Date.now(),
        result: result
      });
      
      // Keep only last 10 completed jobs
      this.state.recentCompleted = this.state.recentCompleted.slice(0, 10);
      this.updateDisplay();
    });
    
    this.queue.on('failed', (job, error) => {
      this.state.activeJobs.delete(job.id);
      this.state.recentFailed.unshift({
        id: job.id,
        name: job.name,
        failedAt: Date.now(),
        error: error.message
      });
      
      // Keep only last 10 failed jobs
      this.state.recentFailed = this.state.recentFailed.slice(0, 10);
      this.updateDisplay();
    });
    
    this.queue.on('paused', () => {
      this.state.queueState = 'paused';
      this.updateDisplay();
    });
    
    this.queue.on('resumed', () => {
      this.state.queueState = 'active';
      this.updateDisplay();
    });
    
    this.queue.on('drained', () => {
      console.log('\nπŸŽ‰ Queue is drained - all jobs processed!');
    });
  }
  
  updateDisplay() {
    console.clear();
    console.log(`\n=== Queue Dashboard: ${this.queue.name} ===`);
    console.log(`Status: ${this.state.queueState.toUpperCase()}`);
    
    console.log(`\nActive Jobs (${this.state.activeJobs.size}):`);
    this.state.activeJobs.forEach(job => {
      const runningTime = Date.now() - job.startTime;
      console.log(`  ${job.id}: ${job.name} (${job.progress}%) - ${Math.round(runningTime/1000)}s`);
    });
    
    if (this.state.recentCompleted.length > 0) {
      console.log(`\nRecent Completed Jobs:`);
      this.state.recentCompleted.slice(0, 5).forEach(job => {
        const timeAgo = Date.now() - job.completedAt;
        console.log(`  βœ… ${job.id}: ${job.name} (${Math.round(timeAgo/1000)}s ago)`);
      });
    }
    
    if (this.state.recentFailed.length > 0) {
      console.log(`\nRecent Failed Jobs:`);
      this.state.recentFailed.slice(0, 3).forEach(job => {
        const timeAgo = Date.now() - job.failedAt;
        console.log(`  ❌ ${job.id}: ${job.name} - ${job.error} (${Math.round(timeAgo/1000)}s ago)`);
      });
    }
    
    console.log(`\n${'='.repeat(50)}`);
  }
  
  startPeriodicReport() {
    // Update display every 2 seconds if there are active jobs
    setInterval(() => {
      if (this.state.activeJobs.size > 0) {
        this.updateDisplay();
      }
    }, 2000);
  }
}

// Use dashboard
const dashboard = new QueueDashboard(taskQueue);

Event Handler Best Practices

// Example of robust event handling with error recovery
function setupRobustEventHandlers(queue) {
  // Wrap event handlers in try-catch to prevent crashes
  queue.on('completed', (job, result) => {
    try {
      logJobCompletion(job, result);
      updateMetrics('completed', job);
      notifyJobCompletion(job, result);
    } catch (error) {
      console.error('Error in completed handler:', error);
    }
  });
  
  queue.on('failed', (job, error) => {
    try {
      logJobFailure(job, error);
      updateMetrics('failed', job);
      
      // Conditional alerting
      if (job.opts.attempts && job.attemptsMade >= job.opts.attempts) {
        sendFailureAlert(job, error);
      }
    } catch (handlerError) {
      console.error('Error in failed handler:', handlerError);
    }
  });
  
  // Remove event listeners when shutting down
  process.on('SIGTERM', () => {
    console.log('Removing event listeners...');
    queue.removeAllListeners();
    queue.close();
  });
}

// Use robust handlers
setupRobustEventHandlers(taskQueue);