0
# Event System
1
2
Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling and real-time queue monitoring.
3
4
## Capabilities
5
6
### Event Types
7
8
p-queue emits various events throughout the task lifecycle and queue state changes.
9
10
```typescript { .api }
11
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';
12
```
13
14
### Event Listening
15
16
p-queue extends EventEmitter3, providing standard event listener methods.
17
18
```typescript { .api }
19
/**
20
* Event listener methods inherited from EventEmitter3
21
*/
22
class PQueue extends EventEmitter<EventName> {
23
on(event: EventName, listener: (...args: any[]) => void): this;
24
off(event: EventName, listener: (...args: any[]) => void): this;
25
once(event: EventName, listener: (...args: any[]) => void): this;
26
emit(event: EventName, ...args: any[]): boolean;
27
}
28
```
29
30
### Event Descriptions
31
32
#### 'add' Event
33
34
Emitted when a task is added to the queue.
35
36
```typescript { .api }
37
/**
38
* Emitted when a task is added to the queue via add() or addAll()
39
* No parameters passed to listeners
40
*/
41
queue.on('add', () => void);
42
```
43
44
**Usage Examples:**
45
46
```typescript
47
import PQueue from "p-queue";
48
49
const queue = new PQueue({ concurrency: 2 });
50
51
queue.on('add', () => {
52
console.log(`Task added. Queue size: ${queue.size}, Pending: ${queue.pending}`);
53
});
54
55
// This will trigger the 'add' event
56
queue.add(async () => someTask());
57
queue.add(async () => anotherTask());
58
```
59
60
#### 'active' Event
61
62
Emitted when a task starts running (becomes active).
63
64
```typescript { .api }
65
/**
66
* Emitted when a task begins execution
67
* No parameters passed to listeners
68
*/
69
queue.on('active', () => void);
70
```
71
72
**Usage Examples:**
73
74
```typescript
75
queue.on('active', () => {
76
console.log(`Task started. Running: ${queue.pending}, Queued: ${queue.size}`);
77
});
78
79
// Monitor active tasks
80
let activeTasks = 0;
81
queue.on('active', () => {
82
activeTasks++;
83
console.log(`Active tasks: ${activeTasks}`);
84
});
85
86
queue.on('next', () => {
87
activeTasks--;
88
console.log(`Active tasks: ${activeTasks}`);
89
});
90
```
91
92
#### 'next' Event
93
94
Emitted when a task completes and the next task can start.
95
96
```typescript { .api }
97
/**
98
* Emitted when a task finishes (successfully or with error) and the next task can begin
99
* No parameters passed to listeners
100
*/
101
queue.on('next', () => void);
102
```
103
104
**Usage Examples:**
105
106
```typescript
107
queue.on('next', () => {
108
console.log(`Task finished. ${queue.pending} still running, ${queue.size} queued`);
109
});
110
111
// Track task completion rate
112
let completedTasks = 0;
113
queue.on('next', () => {
114
completedTasks++;
115
if (completedTasks % 10 === 0) {
116
console.log(`${completedTasks} tasks completed`);
117
}
118
});
119
```
120
121
#### 'completed' Event
122
123
Emitted when a task completes successfully.
124
125
```typescript { .api }
126
/**
127
* Emitted when a task completes successfully
128
* @param result - The result returned by the task
129
*/
130
queue.on('completed', (result: unknown) => void);
131
```
132
133
**Usage Examples:**
134
135
```typescript
136
queue.on('completed', (result) => {
137
console.log('Task completed with result:', result);
138
});
139
140
// Collect results
141
const results = [];
142
queue.on('completed', (result) => {
143
results.push(result);
144
});
145
146
// Add tasks
147
await queue.add(async () => ({ id: 1, data: 'first' }));
148
await queue.add(async () => ({ id: 2, data: 'second' }));
149
```
150
151
#### 'error' Event
152
153
Emitted when a task throws an error.
154
155
```typescript { .api }
156
/**
157
* Emitted when a task throws an error or times out (if throwOnTimeout is true)
158
* @param error - The error thrown by the task
159
*/
160
queue.on('error', (error: unknown) => void);
161
```
162
163
**Usage Examples:**
164
165
```typescript
166
queue.on('error', (error) => {
167
console.error('Task failed:', error);
168
169
// Log error details
170
if (error instanceof Error) {
171
console.error('Error message:', error.message);
172
console.error('Stack trace:', error.stack);
173
}
174
});
175
176
// Error counting and reporting
177
let errorCount = 0;
178
queue.on('error', (error) => {
179
errorCount++;
180
console.log(`Total errors: ${errorCount}`);
181
182
// Send to error tracking service
183
errorTracker.captureException(error);
184
});
185
186
// Add tasks that might fail
187
queue.add(async () => {
188
if (Math.random() < 0.3) {
189
throw new Error('Random failure');
190
}
191
return 'success';
192
});
193
```
194
195
#### 'empty' Event
196
197
Emitted when the queue becomes empty (no more tasks waiting to run).
198
199
```typescript { .api }
200
/**
201
* Emitted when the queue becomes empty (queue.size === 0)
202
* Note: Some tasks may still be running (pending > 0)
203
* No parameters passed to listeners
204
*/
205
queue.on('empty', () => void);
206
```
207
208
**Usage Examples:**
209
210
```typescript
211
queue.on('empty', () => {
212
console.log('Queue is empty - no more tasks waiting');
213
console.log(`But ${queue.pending} tasks are still running`);
214
});
215
216
// Trigger actions when queue empties
217
queue.on('empty', () => {
218
// Maybe add more tasks dynamically
219
if (shouldAddMoreTasks()) {
220
addMoreTasks();
221
}
222
});
223
```
224
225
#### 'idle' Event
226
227
Emitted when the queue becomes idle (empty and no tasks running).
228
229
```typescript { .api }
230
/**
231
* Emitted when the queue becomes completely idle (queue.size === 0 && queue.pending === 0)
232
* All work has finished
233
* No parameters passed to listeners
234
*/
235
queue.on('idle', () => void);
236
```
237
238
**Usage Examples:**
239
240
```typescript
241
queue.on('idle', () => {
242
console.log('Queue is completely idle - all work finished');
243
});
244
245
// Cleanup when all work is done
246
queue.on('idle', () => {
247
// Close database connections, save state, etc.
248
cleanup();
249
});
250
251
// Trigger next phase of work
252
queue.on('idle', async () => {
253
console.log('Phase 1 complete, starting phase 2');
254
await startPhase2();
255
});
256
```
257
258
### Comprehensive Event Monitoring
259
260
**Complete event monitoring example:**
261
262
```typescript
263
import PQueue from "p-queue";
264
265
const queue = new PQueue({ concurrency: 3 });
266
267
// Monitor all events
268
queue.on('add', () => {
269
console.log(`π Task added (Queue: ${queue.size}, Running: ${queue.pending})`);
270
});
271
272
queue.on('active', () => {
273
console.log(`π Task started (Queue: ${queue.size}, Running: ${queue.pending})`);
274
});
275
276
queue.on('next', () => {
277
console.log(`βοΈ Task finished (Queue: ${queue.size}, Running: ${queue.pending})`);
278
});
279
280
queue.on('completed', (result) => {
281
console.log(`β Task completed:`, result);
282
});
283
284
queue.on('error', (error) => {
285
console.error(`β Task failed:`, error.message);
286
});
287
288
queue.on('empty', () => {
289
console.log(`π Queue empty (${queue.pending} still running)`);
290
});
291
292
queue.on('idle', () => {
293
console.log(`π΄ Queue idle - all work complete`);
294
});
295
296
// Add some tasks to see events in action
297
queue.add(async () => {
298
await delay(1000);
299
return 'Task 1 result';
300
});
301
302
queue.add(async () => {
303
await delay(500);
304
throw new Error('Task 2 failed');
305
});
306
307
queue.add(async () => {
308
await delay(800);
309
return 'Task 3 result';
310
});
311
```
312
313
### Event-Based Patterns
314
315
#### Progress Tracking
316
317
```typescript
318
class QueueProgressTracker {
319
constructor(queue) {
320
this.queue = queue;
321
this.totalTasks = 0;
322
this.completedTasks = 0;
323
this.failedTasks = 0;
324
325
this.setupEventListeners();
326
}
327
328
setupEventListeners() {
329
this.queue.on('add', () => {
330
this.totalTasks++;
331
this.updateProgress();
332
});
333
334
this.queue.on('completed', () => {
335
this.completedTasks++;
336
this.updateProgress();
337
});
338
339
this.queue.on('error', () => {
340
this.failedTasks++;
341
this.updateProgress();
342
});
343
}
344
345
updateProgress() {
346
const finished = this.completedTasks + this.failedTasks;
347
const progress = this.totalTasks > 0 ? (finished / this.totalTasks) * 100 : 0;
348
349
console.log(`Progress: ${progress.toFixed(1)}% (${finished}/${this.totalTasks})`);
350
console.log(`β ${this.completedTasks} completed, β ${this.failedTasks} failed`);
351
}
352
}
353
354
const queue = new PQueue({ concurrency: 5 });
355
const tracker = new QueueProgressTracker(queue);
356
```
357
358
#### Adaptive Concurrency
359
360
```typescript
361
class AdaptiveConcurrency {
362
constructor(queue) {
363
this.queue = queue;
364
this.errorRate = 0;
365
this.errorCount = 0;
366
this.successCount = 0;
367
368
this.setupEventListeners();
369
}
370
371
setupEventListeners() {
372
this.queue.on('completed', () => {
373
this.successCount++;
374
this.adjustConcurrency();
375
});
376
377
this.queue.on('error', () => {
378
this.errorCount++;
379
this.adjustConcurrency();
380
});
381
}
382
383
adjustConcurrency() {
384
const total = this.errorCount + this.successCount;
385
if (total < 10) return; // Need enough data
386
387
this.errorRate = this.errorCount / total;
388
389
if (this.errorRate > 0.1) {
390
// High error rate, reduce concurrency
391
const newConcurrency = Math.max(1, Math.floor(this.queue.concurrency * 0.8));
392
this.queue.concurrency = newConcurrency;
393
console.log(`High error rate (${(this.errorRate * 100).toFixed(1)}%), reducing concurrency to ${newConcurrency}`);
394
} else if (this.errorRate < 0.02 && this.queue.concurrency < 10) {
395
// Low error rate, can increase concurrency
396
const newConcurrency = Math.min(10, this.queue.concurrency + 1);
397
this.queue.concurrency = newConcurrency;
398
console.log(`Low error rate (${(this.errorRate * 100).toFixed(1)}%), increasing concurrency to ${newConcurrency}`);
399
}
400
}
401
}
402
403
const queue = new PQueue({ concurrency: 2 });
404
const adaptive = new AdaptiveConcurrency(queue);
405
```
406
407
### Error Recovery Patterns
408
409
```typescript
410
// Retry failed tasks with exponential backoff
411
class RetryQueue extends PQueue {
412
constructor(options = {}) {
413
super(options);
414
this.retryAttempts = new Map();
415
this.maxRetries = options.maxRetries || 3;
416
417
this.on('error', this.handleError.bind(this));
418
}
419
420
async handleError(error, taskInfo) {
421
if (!taskInfo?.id) return;
422
423
const attempts = this.retryAttempts.get(taskInfo.id) || 0;
424
425
if (attempts < this.maxRetries) {
426
this.retryAttempts.set(taskInfo.id, attempts + 1);
427
428
// Exponential backoff
429
const delay = Math.pow(2, attempts) * 1000;
430
431
setTimeout(() => {
432
console.log(`Retrying task ${taskInfo.id} (attempt ${attempts + 1})`);
433
this.add(taskInfo.task, {
434
...taskInfo.options,
435
id: taskInfo.id
436
});
437
}, delay);
438
} else {
439
console.error(`Task ${taskInfo.id} failed after ${this.maxRetries} attempts`);
440
}
441
}
442
}
443
```