or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

event-system.mdindex.mdjob-creation.mdjob-processing.mdjob-state.mdqueue-management.mdqueue-monitoring.mdrepeatable-jobs.mdutils.md
tile.json

queue-monitoring.mddocs/

Queue Monitoring

Comprehensive monitoring, metrics collection, and queue introspection capabilities.

Capabilities

Job Count Statistics

Get comprehensive statistics about jobs in different states.

/**
 * Get counts of jobs in all states
 * @returns Promise that resolves to JobCounts object with all state counts
 */
getJobCounts(): Promise<JobCounts>;

/**
 * Get count of jobs in specific states
 * @param types - Single JobStatus or array of JobStatus values
 * @returns Promise that resolves to total count of jobs in specified states
 */
getJobCountByTypes(types: JobStatus | JobStatus[]): Promise<number>;

interface JobCounts {
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  waiting: number;
}

type JobStatus = 'completed' | 'waiting' | 'active' | 'delayed' | 'failed' | 'paused';

Usage Examples:

const Queue = require('bull');
const taskQueue = new Queue('monitoring');

// Get comprehensive job counts
const counts = await taskQueue.getJobCounts();
console.log('Queue statistics:', counts);
// Output: { active: 5, completed: 150, failed: 12, delayed: 3, waiting: 8 }

// Get count of jobs in specific states
const activeCount = await taskQueue.getJobCountByTypes('active');
const problemCount = await taskQueue.getJobCountByTypes(['failed', 'stalled']);
const pendingCount = await taskQueue.getJobCountByTypes(['waiting', 'delayed']);

console.log(`Active: ${activeCount}, Problems: ${problemCount}, Pending: ${pendingCount}`);

Individual State Counts

Get counts for specific job states.

/**
 * Get count of completed jobs
 * @returns Promise that resolves to number of completed jobs
 */
getCompletedCount(): Promise<number>;

/**
 * Get count of failed jobs
 * @returns Promise that resolves to number of failed jobs
 */
getFailedCount(): Promise<number>;

/**
 * Get count of delayed jobs
 * @returns Promise that resolves to number of delayed jobs
 */
getDelayedCount(): Promise<number>;

/**
 * Get count of waiting jobs
 * @returns Promise that resolves to number of waiting jobs
 */
getWaitingCount(): Promise<number>;

/**
 * Get count of paused jobs
 * @returns Promise that resolves to number of paused jobs
 */
getPausedCount(): Promise<number>;

/**
 * Get count of active jobs
 * @returns Promise that resolves to number of active jobs
 */
getActiveCount(): Promise<number>;

/**
 * Get count of repeatable jobs
 * @returns Promise that resolves to number of repeatable jobs
 */
getRepeatableCount(): Promise<number>;

Usage Examples:

// Monitor queue health
async function monitorQueueHealth(queue) {
  const waiting = await queue.getWaitingCount();
  const active = await queue.getActiveCount();
  const failed = await queue.getFailedCount();
  const completed = await queue.getCompletedCount();
  
  console.log(`Queue Health Report:`);
  console.log(`  Waiting: ${waiting}`);
  console.log(`  Active: ${active}`);
  console.log(`  Failed: ${failed}`);
  console.log(`  Completed: ${completed}`);
  
  // Alert if too many failures
  if (failed > 100) {
    console.warn('High failure rate detected!');
  }
  
  // Alert if queue is backing up
  if (waiting > 1000) {
    console.warn('Queue backlog detected!');
  }
}

// Run monitoring
await monitorQueueHealth(taskQueue);

// Periodic monitoring
setInterval(async () => {
  await monitorQueueHealth(taskQueue);
}, 60000); // Check every minute

Job Retrieval by State

Retrieve jobs in specific states for inspection.

/**
 * Get waiting jobs
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1 for all)
 * @returns Promise that resolves to array of waiting jobs
 */
getWaiting(start?: number, end?: number, opts?: object): Promise<Job[]>;

/**
 * Get active jobs
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1 for all)
 * @returns Promise that resolves to array of active jobs
 */
getActive(start?: number, end?: number, opts?: object): Promise<Job[]>;

/**
 * Get delayed jobs
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1 for all)
 * @returns Promise that resolves to array of delayed jobs
 */
getDelayed(start?: number, end?: number, opts?: object): Promise<Job[]>;

/**
 * Get completed jobs
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1 for all)
 * @returns Promise that resolves to array of completed jobs
 */
getCompleted(start?: number, end?: number, opts?: object): Promise<Job[]>;

/**
 * Get failed jobs
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1 for all)
 * @returns Promise that resolves to array of failed jobs
 */
getFailed(start?: number, end?: number): Promise<Job[]>;

/**
 * Get jobs by multiple states
 * @param types - Array of job states to retrieve
 * @param start - Start index (default: 0)
 * @param end - End index (default: -1 for all)
 * @param asc - Sort ascending by timestamp (default: false)
 * @returns Promise that resolves to array of jobs in specified states
 */
getJobs(types: JobStatus[], start?: number, end?: number, asc?: boolean): Promise<Job[]>;

Usage Examples:

// Get recent failed jobs for analysis
const recentFailures = await taskQueue.getFailed(0, 10);
for (const job of recentFailures) {
  console.log(`Failed job ${job.id}: ${job.failedReason}`);
  console.log('Stack trace:', job.stacktrace);
}

// Get currently active jobs
const activeJobs = await taskQueue.getActive();
console.log(`Currently processing ${activeJobs.length} jobs:`);
activeJobs.forEach(job => {
  console.log(`  Job ${job.id}: ${job.name} (${job.progress()}% complete)`);
});

// Get waiting jobs with pagination
const waitingJobs = await taskQueue.getWaiting(0, 50); // First 50 waiting jobs
console.log(`Next ${waitingJobs.length} jobs to process:`);

// Get jobs across multiple states
const problemJobs = await taskQueue.getJobs(['failed', 'stalled'], 0, 20);
console.log(`Found ${problemJobs.length} problematic jobs`);

// Get jobs in chronological order
const recentJobs = await taskQueue.getJobs(['completed', 'failed'], 0, 100, true);

Job Log Retrieval

Access job logs for debugging and monitoring.

/**
 * Get logs for a specific job
 * @param jobId - Job identifier
 * @param start - Start index for log entries (default: 0)
 * @param end - End index for log entries (default: -1 for all)
 * @returns Promise that resolves to object with logs array and total count
 */
getJobLogs(jobId: string | number, start?: number, end?: number): Promise<{
  logs: string[];
  count: number;
}>;

Usage Examples:

// Get all logs for a specific job
const jobLogs = await taskQueue.getJobLogs(123);
console.log(`Job 123 has ${jobLogs.count} log entries:`);
jobLogs.logs.forEach((log, index) => {
  console.log(`  ${index + 1}: ${log}`);
});

// Get recent log entries with pagination
const recentLogs = await taskQueue.getJobLogs(456, 0, 10);
console.log('Recent log entries:', recentLogs.logs);

// Monitor failed job logs
const failedJobs = await taskQueue.getFailed(0, 5);
for (const job of failedJobs) {
  const logs = await taskQueue.getJobLogs(job.id);
  console.log(`\nLogs for failed job ${job.id}:`);
  console.log(logs.logs.join('\n'));
}

Metrics and Performance Data

Collect performance metrics for queue analysis.

/**
 * Get performance metrics for completed or failed jobs
 * @param type - Metric type ('completed' or 'failed')
 * @param start - Start data point index (default: 0)
 * @param end - End data point index (default: -1 for all)
 * @returns Promise that resolves to metrics data object
 */
getMetrics(type: 'completed' | 'failed', start?: number, end?: number): Promise<{
  meta: {
    count: number;
    prevTS: number;
    prevCount: number;
  };
  data: number[];
  count: number;
}>;

Usage Examples:

// Get completion metrics
const completionMetrics = await taskQueue.getMetrics('completed');
console.log('Completion metrics:', {
  totalCompleted: completionMetrics.count,
  dataPoints: completionMetrics.data.length,
  recentRate: completionMetrics.data.slice(-10) // Last 10 data points
});

// Get failure metrics
const failureMetrics = await taskQueue.getMetrics('failed');
console.log('Failure rate:', failureMetrics.data);

// Calculate success rate
const completed = await taskQueue.getCompletedCount();
const failed = await taskQueue.getFailedCount();
const successRate = completed / (completed + failed) * 100;
console.log(`Success rate: ${successRate.toFixed(2)}%`);

Priority-based Monitoring

Monitor jobs by priority levels.

/**
 * Get job counts by priority levels
 * @param priorities - Array of priority numbers to check
 * @returns Promise that resolves to object mapping priorities to counts
 */
getCountsPerPriority(priorities: number[]): Promise<{ [priority: string]: number }>;

Usage Examples:

// Monitor job distribution by priority
const priorities = [1, 2, 3, 4, 5]; // 1 = highest priority
const priorityCounts = await taskQueue.getCountsPerPriority(priorities);

console.log('Jobs by priority:');
Object.entries(priorityCounts).forEach(([priority, count]) => {
  const level = priority === '1' ? 'Critical' : 
                priority === '2' ? 'High' :
                priority === '3' ? 'Medium' :
                priority === '4' ? 'Low' : 'Lowest';
  console.log(`  ${level} (${priority}): ${count} jobs`);
});

// Alert on high-priority job backlog
const highPriorityCount = (priorityCounts['1'] || 0) + (priorityCounts['2'] || 0);
if (highPriorityCount > 50) {
  console.warn(`High priority backlog: ${highPriorityCount} jobs`);
}

Queue Cleaning and Maintenance

Clean old jobs and maintain queue health.

/**
 * Clean old jobs from the queue
 * @param grace - Grace period in milliseconds (jobs older than this are cleaned)
 * @param status - Job status to clean ('completed', 'wait', 'active', 'delayed', 'failed')
 * @param limit - Maximum number of jobs to clean per call
 * @returns Promise that resolves to array of cleaned jobs
 */
clean(grace: number, status?: 'completed' | 'wait' | 'active' | 'delayed' | 'failed', limit?: number): Promise<Job[]>;

Usage Examples:

// Clean completed jobs older than 1 hour
const oneHour = 60 * 60 * 1000;
const cleanedCompleted = await taskQueue.clean(oneHour, 'completed');
console.log(`Cleaned ${cleanedCompleted.length} completed jobs`);

// Clean failed jobs older than 24 hours, limit to 100 at a time
const oneDay = 24 * 60 * 60 * 1000;
const cleanedFailed = await taskQueue.clean(oneDay, 'failed', 100);
console.log(`Cleaned ${cleanedFailed.length} failed jobs`);

// Regular maintenance
async function performMaintenance(queue) {
  const oneWeek = 7 * 24 * 60 * 60 * 1000;
  
  // Clean old completed jobs
  const completed = await queue.clean(oneWeek, 'completed', 500);
  
  // Clean old failed jobs
  const failed = await queue.clean(oneWeek, 'failed', 100);
  
  console.log(`Maintenance: cleaned ${completed.length} completed and ${failed.length} failed jobs`);
}

// Run maintenance daily
setInterval(() => performMaintenance(taskQueue), 24 * 60 * 60 * 1000);

Comprehensive Queue Dashboard

Example of comprehensive monitoring dashboard.

async function createQueueDashboard(queue) {
  // Get all statistics at once
  const [
    counts,
    waiting,
    active, 
    completionMetrics,
    failureMetrics,
    recentFailed
  ] = await Promise.all([
    queue.getJobCounts(),
    queue.getWaiting(0, 5),
    queue.getActive(),
    queue.getMetrics('completed').catch(() => null),
    queue.getMetrics('failed').catch(() => null),
    queue.getFailed(0, 3)
  ]);
  
  console.log('\n=== Queue Dashboard ===');
  console.log(`Queue: ${queue.name}`);
  console.log('\nJob Counts:');
  console.log(`  Active: ${counts.active}`);
  console.log(`  Waiting: ${counts.waiting}`);
  console.log(`  Completed: ${counts.completed}`);
  console.log(`  Failed: ${counts.failed}`);
  console.log(`  Delayed: ${counts.delayed}`);
  
  console.log('\nActive Jobs:');
  active.slice(0, 3).forEach(job => {
    console.log(`  ${job.id}: ${job.name} (${job.progress()}%)`);
  });
  
  console.log('\nNext Waiting Jobs:');
  waiting.forEach(job => {
    console.log(`  ${job.id}: ${job.name}`);
  });
  
  if (recentFailed.length > 0) {
    console.log('\nRecent Failures:');
    recentFailed.forEach(job => {
      console.log(`  ${job.id}: ${job.failedReason}`);
    });
  }
  
  if (completionMetrics) {
    console.log(`\nCompletion Rate: ${completionMetrics.count} total`);
  }
  
  if (failureMetrics) {
    const failureRate = failureMetrics.count / (counts.completed + counts.failed) * 100;
    console.log(`Failure Rate: ${failureRate.toFixed(2)}%`);
  }
  
  console.log('========================\n');
}

// Use the dashboard
await createQueueDashboard(taskQueue);