Real-time queue event monitoring via Redis streams for observability and debugging. The QueueEvents class provides comprehensive event tracking across all queue operations for monitoring and analytics.
Listens to queue events via Redis streams for real-time monitoring.
/**
* QueueEvents listens to queue events via Redis streams
*/
class QueueEvents {
constructor(queueName: string, opts?: QueueEventsOptions);
/** Start listening to queue events */
run(): Promise<void>;
/** Close event listener and connections */
close(): Promise<void>;
/** Wait until event listener is ready */
waitUntilReady(): Promise<void>;
}interface QueueEventsOptions {
/** Redis connection options */
connection?: ConnectionOptions;
/** Key prefix for Redis keys */
prefix?: string;
/** Maximum number of events to read per batch */
maxRetriesPerRead?: number;
/** Starting event ID to read from */
lastEventId?: string;
/** Read events in blocks */
blockingTimeout?: number;
}Usage Examples:
import { QueueEvents } from "bullmq";
// Create queue events listener
const queueEvents = new QueueEvents("email processing", {
connection: {
host: "localhost",
port: 6379,
},
});
// Listen to all queue events
queueEvents.on("waiting", ({ jobId, name }) => {
console.log(`Job ${name} (${jobId}) added to queue`);
});
queueEvents.on("active", ({ jobId, prev }) => {
console.log(`Job ${jobId} started processing (was ${prev})`);
});
queueEvents.on("completed", ({ jobId, returnvalue, prev }) => {
console.log(`Job ${jobId} completed with result:`, returnvalue);
});
queueEvents.on("failed", ({ jobId, failedReason, prev }) => {
console.log(`Job ${jobId} failed:`, failedReason);
});
queueEvents.on("progress", ({ jobId, data: progress }) => {
console.log(`Job ${jobId} progress:`, progress);
});
queueEvents.on("removed", ({ jobId, prev }) => {
console.log(`Job ${jobId} removed from queue (was ${prev})`);
});
queueEvents.on("cleaned", ({ count, type }) => {
console.log(`Cleaned ${count} ${type} jobs`);
});
// Start listening
await queueEvents.run();interface QueueEventsEmitted {
'waiting': (args: { jobId: string; name: string }, id: string) => void;
'active': (args: { jobId: string; prev?: string }, id: string) => void;
'completed': (args: { jobId: string; returnvalue: any; prev?: string }, id: string) => void;
'failed': (args: { jobId: string; failedReason: string; prev?: string }, id: string) => void;
'progress': (args: { jobId: string; data: number | object }, id: string) => void;
'removed': (args: { jobId: string; prev: string }, id: string) => void;
'cleaned': (args: { count: number; type: string }, id: string) => void;
'delayed': (args: { jobId: string; delay: number }, id: string) => void;
'paused': (args: {}, id: string) => void;
'resumed': (args: {}, id: string) => void;
'error': (error: Error) => void;
}// Multiple queue monitoring
const orderEvents = new QueueEvents("orders");
const paymentEvents = new QueueEvents("payments");
const shippingEvents = new QueueEvents("shipping");
// Centralized event handler
function createEventLogger(queueName: string) {
return {
waiting: ({ jobId, name }) =>
console.log(`[${queueName}] Job ${name} (${jobId}) waiting`),
active: ({ jobId }) =>
console.log(`[${queueName}] Job ${jobId} active`),
completed: ({ jobId, returnvalue }) =>
console.log(`[${queueName}] Job ${jobId} completed:`, returnvalue),
failed: ({ jobId, failedReason }) =>
console.log(`[${queueName}] Job ${jobId} failed:`, failedReason),
progress: ({ jobId, data }) =>
console.log(`[${queueName}] Job ${jobId} progress:`, data),
};
}
// Apply event handlers
const orderLogger = createEventLogger("orders");
Object.entries(orderLogger).forEach(([event, handler]) => {
orderEvents.on(event as any, handler as any);
});
const paymentLogger = createEventLogger("payments");
Object.entries(paymentLogger).forEach(([event, handler]) => {
paymentEvents.on(event as any, handler as any);
});
// Start all listeners
await Promise.all([
orderEvents.run(),
paymentEvents.run(),
shippingEvents.run(),
]);// Event-driven metrics collection
class QueueMetrics {
private stats = {
jobsAdded: 0,
jobsCompleted: 0,
jobsFailed: 0,
totalProcessingTime: 0,
activeJobs: new Set<string>(),
};
constructor(queueName: string) {
const events = new QueueEvents(queueName);
events.on("waiting", ({ jobId }) => {
this.stats.jobsAdded++;
console.log(`Total jobs added: ${this.stats.jobsAdded}`);
});
events.on("active", ({ jobId }) => {
this.stats.activeJobs.add(jobId);
console.log(`Active jobs: ${this.stats.activeJobs.size}`);
});
events.on("completed", ({ jobId, returnvalue }) => {
this.stats.jobsCompleted++;
this.stats.activeJobs.delete(jobId);
// Calculate processing time if available
if (returnvalue && returnvalue.processingTime) {
this.stats.totalProcessingTime += returnvalue.processingTime;
}
console.log(`Completion rate: ${this.getCompletionRate()}%`);
});
events.on("failed", ({ jobId }) => {
this.stats.jobsFailed++;
this.stats.activeJobs.delete(jobId);
console.log(`Failure rate: ${this.getFailureRate()}%`);
});
events.run();
}
getCompletionRate(): number {
const total = this.stats.jobsCompleted + this.stats.jobsFailed;
return total > 0 ? (this.stats.jobsCompleted / total) * 100 : 0;
}
getFailureRate(): number {
const total = this.stats.jobsCompleted + this.stats.jobsFailed;
return total > 0 ? (this.stats.jobsFailed / total) * 100 : 0;
}
getAverageProcessingTime(): number {
return this.stats.jobsCompleted > 0
? this.stats.totalProcessingTime / this.stats.jobsCompleted
: 0;
}
getStats() {
return {
...this.stats,
activeJobs: this.stats.activeJobs.size,
completionRate: this.getCompletionRate(),
failureRate: this.getFailureRate(),
averageProcessingTime: this.getAverageProcessingTime(),
};
}
}
// Use metrics collector
const orderMetrics = new QueueMetrics("orders");
// Periodic stats reporting
setInterval(() => {
console.log("Queue stats:", orderMetrics.getStats());
}, 30000); // Every 30 seconds// Alert system based on queue events
class QueueAlerting {
private failureCount = 0;
private readonly maxFailures = 10;
private readonly timeWindow = 60000; // 1 minute
constructor(queueName: string) {
const events = new QueueEvents(queueName);
events.on("failed", ({ jobId, failedReason }) => {
this.handleJobFailure(jobId, failedReason);
});
events.on("stalled", ({ jobId }) => {
this.sendAlert("WARNING", `Job ${jobId} stalled in ${queueName}`);
});
// Reset failure count periodically
setInterval(() => {
this.failureCount = 0;
}, this.timeWindow);
events.run();
}
private handleJobFailure(jobId: string, reason: string) {
this.failureCount++;
console.log(`Job ${jobId} failed: ${reason}`);
if (this.failureCount >= this.maxFailures) {
this.sendAlert("CRITICAL",
`High failure rate: ${this.failureCount} failures in ${this.timeWindow}ms`
);
}
}
private sendAlert(level: string, message: string) {
console.log(`[${level}] ALERT: ${message}`);
// Send to alerting system (email, Slack, PagerDuty, etc.)
// await sendNotification(level, message);
}
}
// Set up alerting
const orderAlerting = new QueueAlerting("orders");// Persist events to database for historical analysis
class EventLogger {
constructor(queueName: string, database: any) {
const events = new QueueEvents(queueName);
// Log all events to database
const eventTypes = [
'waiting', 'active', 'completed', 'failed',
'progress', 'removed', 'cleaned', 'delayed'
];
eventTypes.forEach(eventType => {
events.on(eventType as any, async (data: any, id: string) => {
await database.events.insert({
queue: queueName,
eventType,
eventId: id,
data,
timestamp: new Date(),
});
});
});
events.run();
}
}
// Usage with database
// const eventLogger = new EventLogger("orders", database);// Real-time event streaming
class EventStream {
private subscribers: Map<string, Function[]> = new Map();
constructor(queueNames: string[]) {
queueNames.forEach(queueName => {
const events = new QueueEvents(queueName);
events.on("completed", (data, id) => {
this.emit("job-completed", { queueName, ...data, id });
});
events.on("failed", (data, id) => {
this.emit("job-failed", { queueName, ...data, id });
});
events.on("progress", (data, id) => {
this.emit("job-progress", { queueName, ...data, id });
});
events.run();
});
}
subscribe(eventType: string, callback: Function) {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(callback);
}
private emit(eventType: string, data: any) {
const callbacks = this.subscribers.get(eventType) || [];
callbacks.forEach(callback => callback(data));
}
}
// Use event stream
const eventStream = new EventStream(["orders", "payments", "shipping"]);
eventStream.subscribe("job-completed", (data) => {
console.log(`Job completed in ${data.queueName}:`, data.jobId);
});
eventStream.subscribe("job-failed", (data) => {
console.log(`Job failed in ${data.queueName}:`, data.failedReason);
});// Graceful shutdown of event listeners
const eventListeners: QueueEvents[] = [];
async function setupEventMonitoring(queueNames: string[]) {
for (const queueName of queueNames) {
const events = new QueueEvents(queueName);
// Add event handlers
events.on("completed", ({ jobId }) => {
console.log(`${queueName}: Job ${jobId} completed`);
});
await events.run();
eventListeners.push(events);
}
}
// Cleanup function
async function shutdown() {
console.log("Shutting down event listeners...");
await Promise.all(
eventListeners.map(events => events.close())
);
console.log("All event listeners closed");
}
// Handle process termination
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
// Start monitoring
await setupEventMonitoring(["orders", "payments", "notifications"]);// Efficient event processing with batching
class BatchedEventProcessor {
private eventBatch: any[] = [];
private readonly batchSize = 100;
private readonly flushInterval = 5000; // 5 seconds
constructor(queueName: string) {
const events = new QueueEvents(queueName);
// Batch all events
const eventTypes = ['waiting', 'active', 'completed', 'failed'];
eventTypes.forEach(eventType => {
events.on(eventType as any, (data: any, id: string) => {
this.addToBatch({ eventType, data, id, timestamp: Date.now() });
});
});
// Periodic flush
setInterval(() => this.flushBatch(), this.flushInterval);
events.run();
}
private addToBatch(event: any) {
this.eventBatch.push(event);
if (this.eventBatch.length >= this.batchSize) {
this.flushBatch();
}
}
private async flushBatch() {
if (this.eventBatch.length === 0) return;
const batch = [...this.eventBatch];
this.eventBatch = [];
// Process batch
await this.processBatch(batch);
}
private async processBatch(batch: any[]) {
console.log(`Processing batch of ${batch.length} events`);
// Bulk insert to database, send to analytics, etc.
}
}