or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

event-monitoring.mddocs/

Event Monitoring

Real-time queue event monitoring via Redis streams for observability and debugging. The QueueEvents class provides comprehensive event tracking across all queue operations for monitoring and analytics.

Capabilities

QueueEvents Class

Listens to queue events via Redis streams for real-time monitoring.

/**
 * QueueEvents listens to queue events via Redis streams
 */
class QueueEvents {
  constructor(queueName: string, opts?: QueueEventsOptions);
  
  /** Start listening to queue events */
  run(): Promise<void>;
  
  /** Close event listener and connections */
  close(): Promise<void>;
  
  /** Wait until event listener is ready */
  waitUntilReady(): Promise<void>;
}

Queue Events Options

interface QueueEventsOptions {
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Key prefix for Redis keys */
  prefix?: string;
  
  /** Maximum number of events to read per batch */
  maxRetriesPerRead?: number;
  
  /** Starting event ID to read from */
  lastEventId?: string;
  
  /** Read events in blocks */
  blockingTimeout?: number;
}

Usage Examples:

import { QueueEvents } from "bullmq";

// Create queue events listener
const queueEvents = new QueueEvents("email processing", {
  connection: {
    host: "localhost",
    port: 6379,
  },
});

// Listen to all queue events
queueEvents.on("waiting", ({ jobId, name }) => {
  console.log(`Job ${name} (${jobId}) added to queue`);
});

queueEvents.on("active", ({ jobId, prev }) => {
  console.log(`Job ${jobId} started processing (was ${prev})`);
});

queueEvents.on("completed", ({ jobId, returnvalue, prev }) => {
  console.log(`Job ${jobId} completed with result:`, returnvalue);
});

queueEvents.on("failed", ({ jobId, failedReason, prev }) => {
  console.log(`Job ${jobId} failed:`, failedReason);
});

queueEvents.on("progress", ({ jobId, data: progress }) => {
  console.log(`Job ${jobId} progress:`, progress);
});

queueEvents.on("removed", ({ jobId, prev }) => {
  console.log(`Job ${jobId} removed from queue (was ${prev})`);
});

queueEvents.on("cleaned", ({ count, type }) => {
  console.log(`Cleaned ${count} ${type} jobs`);
});

// Start listening
await queueEvents.run();

Queue Events Types

interface QueueEventsEmitted {
  'waiting': (args: { jobId: string; name: string }, id: string) => void;
  'active': (args: { jobId: string; prev?: string }, id: string) => void;
  'completed': (args: { jobId: string; returnvalue: any; prev?: string }, id: string) => void;
  'failed': (args: { jobId: string; failedReason: string; prev?: string }, id: string) => void;
  'progress': (args: { jobId: string; data: number | object }, id: string) => void;
  'removed': (args: { jobId: string; prev: string }, id: string) => void;
  'cleaned': (args: { count: number; type: string }, id: string) => void;
  'delayed': (args: { jobId: string; delay: number }, id: string) => void;
  'paused': (args: {}, id: string) => void;
  'resumed': (args: {}, id: string) => void;
  'error': (error: Error) => void;
}

Advanced Event Monitoring

// Multiple queue monitoring
const orderEvents = new QueueEvents("orders");
const paymentEvents = new QueueEvents("payments");
const shippingEvents = new QueueEvents("shipping");

// Centralized event handler
function createEventLogger(queueName: string) {
  return {
    waiting: ({ jobId, name }) => 
      console.log(`[${queueName}] Job ${name} (${jobId}) waiting`),
    
    active: ({ jobId }) => 
      console.log(`[${queueName}] Job ${jobId} active`),
    
    completed: ({ jobId, returnvalue }) => 
      console.log(`[${queueName}] Job ${jobId} completed:`, returnvalue),
    
    failed: ({ jobId, failedReason }) => 
      console.log(`[${queueName}] Job ${jobId} failed:`, failedReason),
    
    progress: ({ jobId, data }) => 
      console.log(`[${queueName}] Job ${jobId} progress:`, data),
  };
}

// Apply event handlers
const orderLogger = createEventLogger("orders");
Object.entries(orderLogger).forEach(([event, handler]) => {
  orderEvents.on(event as any, handler as any);
});

const paymentLogger = createEventLogger("payments");
Object.entries(paymentLogger).forEach(([event, handler]) => {
  paymentEvents.on(event as any, handler as any);
});

// Start all listeners
await Promise.all([
  orderEvents.run(),
  paymentEvents.run(),
  shippingEvents.run(),
]);

Event Analytics and Metrics

// Event-driven metrics collection
class QueueMetrics {
  private stats = {
    jobsAdded: 0,
    jobsCompleted: 0,
    jobsFailed: 0,
    totalProcessingTime: 0,
    activeJobs: new Set<string>(),
  };

  constructor(queueName: string) {
    const events = new QueueEvents(queueName);
    
    events.on("waiting", ({ jobId }) => {
      this.stats.jobsAdded++;
      console.log(`Total jobs added: ${this.stats.jobsAdded}`);
    });
    
    events.on("active", ({ jobId }) => {
      this.stats.activeJobs.add(jobId);
      console.log(`Active jobs: ${this.stats.activeJobs.size}`);
    });
    
    events.on("completed", ({ jobId, returnvalue }) => {
      this.stats.jobsCompleted++;
      this.stats.activeJobs.delete(jobId);
      
      // Calculate processing time if available
      if (returnvalue && returnvalue.processingTime) {
        this.stats.totalProcessingTime += returnvalue.processingTime;
      }
      
      console.log(`Completion rate: ${this.getCompletionRate()}%`);
    });
    
    events.on("failed", ({ jobId }) => {
      this.stats.jobsFailed++;
      this.stats.activeJobs.delete(jobId);
      console.log(`Failure rate: ${this.getFailureRate()}%`);
    });
    
    events.run();
  }
  
  getCompletionRate(): number {
    const total = this.stats.jobsCompleted + this.stats.jobsFailed;
    return total > 0 ? (this.stats.jobsCompleted / total) * 100 : 0;
  }
  
  getFailureRate(): number {
    const total = this.stats.jobsCompleted + this.stats.jobsFailed;
    return total > 0 ? (this.stats.jobsFailed / total) * 100 : 0;
  }
  
  getAverageProcessingTime(): number {
    return this.stats.jobsCompleted > 0 
      ? this.stats.totalProcessingTime / this.stats.jobsCompleted 
      : 0;
  }
  
  getStats() {
    return {
      ...this.stats,
      activeJobs: this.stats.activeJobs.size,
      completionRate: this.getCompletionRate(),
      failureRate: this.getFailureRate(),
      averageProcessingTime: this.getAverageProcessingTime(),
    };
  }
}

// Use metrics collector
const orderMetrics = new QueueMetrics("orders");

// Periodic stats reporting
setInterval(() => {
  console.log("Queue stats:", orderMetrics.getStats());
}, 30000); // Every 30 seconds

Event-Driven Alerting

// Alert system based on queue events
class QueueAlerting {
  private failureCount = 0;
  private readonly maxFailures = 10;
  private readonly timeWindow = 60000; // 1 minute

  constructor(queueName: string) {
    const events = new QueueEvents(queueName);
    
    events.on("failed", ({ jobId, failedReason }) => {
      this.handleJobFailure(jobId, failedReason);
    });
    
    events.on("stalled", ({ jobId }) => {
      this.sendAlert("WARNING", `Job ${jobId} stalled in ${queueName}`);
    });
    
    // Reset failure count periodically
    setInterval(() => {
      this.failureCount = 0;
    }, this.timeWindow);
    
    events.run();
  }
  
  private handleJobFailure(jobId: string, reason: string) {
    this.failureCount++;
    
    console.log(`Job ${jobId} failed: ${reason}`);
    
    if (this.failureCount >= this.maxFailures) {
      this.sendAlert("CRITICAL", 
        `High failure rate: ${this.failureCount} failures in ${this.timeWindow}ms`
      );
    }
  }
  
  private sendAlert(level: string, message: string) {
    console.log(`[${level}] ALERT: ${message}`);
    // Send to alerting system (email, Slack, PagerDuty, etc.)
    // await sendNotification(level, message);
  }
}

// Set up alerting
const orderAlerting = new QueueAlerting("orders");

Event Log Persistence

// Persist events to database for historical analysis
class EventLogger {
  constructor(queueName: string, database: any) {
    const events = new QueueEvents(queueName);
    
    // Log all events to database
    const eventTypes = [
      'waiting', 'active', 'completed', 'failed', 
      'progress', 'removed', 'cleaned', 'delayed'
    ];
    
    eventTypes.forEach(eventType => {
      events.on(eventType as any, async (data: any, id: string) => {
        await database.events.insert({
          queue: queueName,
          eventType,
          eventId: id,
          data,
          timestamp: new Date(),
        });
      });
    });
    
    events.run();
  }
}

// Usage with database
// const eventLogger = new EventLogger("orders", database);

Event Streaming and Processing

// Real-time event streaming
class EventStream {
  private subscribers: Map<string, Function[]> = new Map();
  
  constructor(queueNames: string[]) {
    queueNames.forEach(queueName => {
      const events = new QueueEvents(queueName);
      
      events.on("completed", (data, id) => {
        this.emit("job-completed", { queueName, ...data, id });
      });
      
      events.on("failed", (data, id) => {
        this.emit("job-failed", { queueName, ...data, id });
      });
      
      events.on("progress", (data, id) => {
        this.emit("job-progress", { queueName, ...data, id });
      });
      
      events.run();
    });
  }
  
  subscribe(eventType: string, callback: Function) {
    if (!this.subscribers.has(eventType)) {
      this.subscribers.set(eventType, []);
    }
    this.subscribers.get(eventType)!.push(callback);
  }
  
  private emit(eventType: string, data: any) {
    const callbacks = this.subscribers.get(eventType) || [];
    callbacks.forEach(callback => callback(data));
  }
}

// Use event stream
const eventStream = new EventStream(["orders", "payments", "shipping"]);

eventStream.subscribe("job-completed", (data) => {
  console.log(`Job completed in ${data.queueName}:`, data.jobId);
});

eventStream.subscribe("job-failed", (data) => {
  console.log(`Job failed in ${data.queueName}:`, data.failedReason);
});

Event Cleanup and Management

// Graceful shutdown of event listeners
const eventListeners: QueueEvents[] = [];

async function setupEventMonitoring(queueNames: string[]) {
  for (const queueName of queueNames) {
    const events = new QueueEvents(queueName);
    
    // Add event handlers
    events.on("completed", ({ jobId }) => {
      console.log(`${queueName}: Job ${jobId} completed`);
    });
    
    await events.run();
    eventListeners.push(events);
  }
}

// Cleanup function
async function shutdown() {
  console.log("Shutting down event listeners...");
  
  await Promise.all(
    eventListeners.map(events => events.close())
  );
  
  console.log("All event listeners closed");
}

// Handle process termination
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);

// Start monitoring
await setupEventMonitoring(["orders", "payments", "notifications"]);

Performance Considerations

// Efficient event processing with batching
class BatchedEventProcessor {
  private eventBatch: any[] = [];
  private readonly batchSize = 100;
  private readonly flushInterval = 5000; // 5 seconds
  
  constructor(queueName: string) {
    const events = new QueueEvents(queueName);
    
    // Batch all events
    const eventTypes = ['waiting', 'active', 'completed', 'failed'];
    eventTypes.forEach(eventType => {
      events.on(eventType as any, (data: any, id: string) => {
        this.addToBatch({ eventType, data, id, timestamp: Date.now() });
      });
    });
    
    // Periodic flush
    setInterval(() => this.flushBatch(), this.flushInterval);
    
    events.run();
  }
  
  private addToBatch(event: any) {
    this.eventBatch.push(event);
    
    if (this.eventBatch.length >= this.batchSize) {
      this.flushBatch();
    }
  }
  
  private async flushBatch() {
    if (this.eventBatch.length === 0) return;
    
    const batch = [...this.eventBatch];
    this.eventBatch = [];
    
    // Process batch
    await this.processBatch(batch);
  }
  
  private async processBatch(batch: any[]) {
    console.log(`Processing batch of ${batch.length} events`);
    // Bulk insert to database, send to analytics, etc.
  }
}