or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

concurrency-control.mdevents.mdindex.mdqueue-management.md

events.mddocs/

0

# Event System

1

2

Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling and real-time queue monitoring.

3

4

## Capabilities

5

6

### Event Types

7

8

p-queue emits various events throughout the task lifecycle and queue state changes.

9

10

```typescript { .api }

11

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

12

```

13

14

### Event Listening

15

16

p-queue extends EventEmitter3, providing standard event listener methods.

17

18

```typescript { .api }

19

/**

20

* Event listener methods inherited from EventEmitter3

21

*/

22

class PQueue extends EventEmitter<EventName> {

23

on(event: EventName, listener: (...args: any[]) => void): this;

24

off(event: EventName, listener: (...args: any[]) => void): this;

25

once(event: EventName, listener: (...args: any[]) => void): this;

26

emit(event: EventName, ...args: any[]): boolean;

27

}

28

```

29

30

### Event Descriptions

31

32

#### 'add' Event

33

34

Emitted when a task is added to the queue.

35

36

```typescript { .api }

37

/**

38

* Emitted when a task is added to the queue via add() or addAll()

39

* No parameters passed to listeners

40

*/

41

queue.on('add', () => void);

42

```

43

44

**Usage Examples:**

45

46

```typescript

47

import PQueue from "p-queue";

48

49

const queue = new PQueue({ concurrency: 2 });

50

51

queue.on('add', () => {

52

console.log(`Task added. Queue size: ${queue.size}, Pending: ${queue.pending}`);

53

});

54

55

// This will trigger the 'add' event

56

queue.add(async () => someTask());

57

queue.add(async () => anotherTask());

58

```

59

60

#### 'active' Event

61

62

Emitted when a task starts running (becomes active).

63

64

```typescript { .api }

65

/**

66

* Emitted when a task begins execution

67

* No parameters passed to listeners

68

*/

69

queue.on('active', () => void);

70

```

71

72

**Usage Examples:**

73

74

```typescript

75

queue.on('active', () => {

76

console.log(`Task started. Running: ${queue.pending}, Queued: ${queue.size}`);

77

});

78

79

// Monitor active tasks

80

let activeTasks = 0;

81

queue.on('active', () => {

82

activeTasks++;

83

console.log(`Active tasks: ${activeTasks}`);

84

});

85

86

queue.on('next', () => {

87

activeTasks--;

88

console.log(`Active tasks: ${activeTasks}`);

89

});

90

```

91

92

#### 'next' Event

93

94

Emitted when a task completes and the next task can start.

95

96

```typescript { .api }

97

/**

98

* Emitted when a task finishes (successfully or with error) and the next task can begin

99

* No parameters passed to listeners

100

*/

101

queue.on('next', () => void);

102

```

103

104

**Usage Examples:**

105

106

```typescript

107

queue.on('next', () => {

108

console.log(`Task finished. ${queue.pending} still running, ${queue.size} queued`);

109

});

110

111

// Track task completion rate

112

let completedTasks = 0;

113

queue.on('next', () => {

114

completedTasks++;

115

if (completedTasks % 10 === 0) {

116

console.log(`${completedTasks} tasks completed`);

117

}

118

});

119

```

120

121

#### 'completed' Event

122

123

Emitted when a task completes successfully.

124

125

```typescript { .api }

126

/**

127

* Emitted when a task completes successfully

128

* @param result - The result returned by the task

129

*/

130

queue.on('completed', (result: unknown) => void);

131

```

132

133

**Usage Examples:**

134

135

```typescript

136

queue.on('completed', (result) => {

137

console.log('Task completed with result:', result);

138

});

139

140

// Collect results

141

const results = [];

142

queue.on('completed', (result) => {

143

results.push(result);

144

});

145

146

// Add tasks

147

await queue.add(async () => ({ id: 1, data: 'first' }));

148

await queue.add(async () => ({ id: 2, data: 'second' }));

149

```

150

151

#### 'error' Event

152

153

Emitted when a task throws an error.

154

155

```typescript { .api }

156

/**

157

* Emitted when a task throws an error or times out (if throwOnTimeout is true)

158

* @param error - The error thrown by the task

159

*/

160

queue.on('error', (error: unknown) => void);

161

```

162

163

**Usage Examples:**

164

165

```typescript

166

queue.on('error', (error) => {

167

console.error('Task failed:', error);

168

169

// Log error details

170

if (error instanceof Error) {

171

console.error('Error message:', error.message);

172

console.error('Stack trace:', error.stack);

173

}

174

});

175

176

// Error counting and reporting

177

let errorCount = 0;

178

queue.on('error', (error) => {

179

errorCount++;

180

console.log(`Total errors: ${errorCount}`);

181

182

// Send to error tracking service

183

errorTracker.captureException(error);

184

});

185

186

// Add tasks that might fail

187

queue.add(async () => {

188

if (Math.random() < 0.3) {

189

throw new Error('Random failure');

190

}

191

return 'success';

192

});

193

```

194

195

#### 'empty' Event

196

197

Emitted when the queue becomes empty (no more tasks waiting to run).

198

199

```typescript { .api }

200

/**

201

* Emitted when the queue becomes empty (queue.size === 0)

202

* Note: Some tasks may still be running (pending > 0)

203

* No parameters passed to listeners

204

*/

205

queue.on('empty', () => void);

206

```

207

208

**Usage Examples:**

209

210

```typescript

211

queue.on('empty', () => {

212

console.log('Queue is empty - no more tasks waiting');

213

console.log(`But ${queue.pending} tasks are still running`);

214

});

215

216

// Trigger actions when queue empties

217

queue.on('empty', () => {

218

// Maybe add more tasks dynamically

219

if (shouldAddMoreTasks()) {

220

addMoreTasks();

221

}

222

});

223

```

224

225

#### 'idle' Event

226

227

Emitted when the queue becomes idle (empty and no tasks running).

228

229

```typescript { .api }

230

/**

231

* Emitted when the queue becomes completely idle (queue.size === 0 && queue.pending === 0)

232

* All work has finished

233

* No parameters passed to listeners

234

*/

235

queue.on('idle', () => void);

236

```

237

238

**Usage Examples:**

239

240

```typescript

241

queue.on('idle', () => {

242

console.log('Queue is completely idle - all work finished');

243

});

244

245

// Cleanup when all work is done

246

queue.on('idle', () => {

247

// Close database connections, save state, etc.

248

cleanup();

249

});

250

251

// Trigger next phase of work

252

queue.on('idle', async () => {

253

console.log('Phase 1 complete, starting phase 2');

254

await startPhase2();

255

});

256

```

257

258

### Comprehensive Event Monitoring

259

260

**Complete event monitoring example:**

261

262

```typescript

263

import PQueue from "p-queue";

264

265

const queue = new PQueue({ concurrency: 3 });

266

267

// Monitor all events

268

queue.on('add', () => {

269

console.log(`πŸ“ Task added (Queue: ${queue.size}, Running: ${queue.pending})`);

270

});

271

272

queue.on('active', () => {

273

console.log(`πŸƒ Task started (Queue: ${queue.size}, Running: ${queue.pending})`);

274

});

275

276

queue.on('next', () => {

277

console.log(`⏭️ Task finished (Queue: ${queue.size}, Running: ${queue.pending})`);

278

});

279

280

queue.on('completed', (result) => {

281

console.log(`βœ… Task completed:`, result);

282

});

283

284

queue.on('error', (error) => {

285

console.error(`❌ Task failed:`, error.message);

286

});

287

288

queue.on('empty', () => {

289

console.log(`πŸ“­ Queue empty (${queue.pending} still running)`);

290

});

291

292

queue.on('idle', () => {

293

console.log(`😴 Queue idle - all work complete`);

294

});

295

296

// Add some tasks to see events in action

297

queue.add(async () => {

298

await delay(1000);

299

return 'Task 1 result';

300

});

301

302

queue.add(async () => {

303

await delay(500);

304

throw new Error('Task 2 failed');

305

});

306

307

queue.add(async () => {

308

await delay(800);

309

return 'Task 3 result';

310

});

311

```

312

313

### Event-Based Patterns

314

315

#### Progress Tracking

316

317

```typescript

318

class QueueProgressTracker {

319

constructor(queue) {

320

this.queue = queue;

321

this.totalTasks = 0;

322

this.completedTasks = 0;

323

this.failedTasks = 0;

324

325

this.setupEventListeners();

326

}

327

328

setupEventListeners() {

329

this.queue.on('add', () => {

330

this.totalTasks++;

331

this.updateProgress();

332

});

333

334

this.queue.on('completed', () => {

335

this.completedTasks++;

336

this.updateProgress();

337

});

338

339

this.queue.on('error', () => {

340

this.failedTasks++;

341

this.updateProgress();

342

});

343

}

344

345

updateProgress() {

346

const finished = this.completedTasks + this.failedTasks;

347

const progress = this.totalTasks > 0 ? (finished / this.totalTasks) * 100 : 0;

348

349

console.log(`Progress: ${progress.toFixed(1)}% (${finished}/${this.totalTasks})`);

350

console.log(`βœ… ${this.completedTasks} completed, ❌ ${this.failedTasks} failed`);

351

}

352

}

353

354

const queue = new PQueue({ concurrency: 5 });

355

const tracker = new QueueProgressTracker(queue);

356

```

357

358

#### Adaptive Concurrency

359

360

```typescript

361

class AdaptiveConcurrency {

362

constructor(queue) {

363

this.queue = queue;

364

this.errorRate = 0;

365

this.errorCount = 0;

366

this.successCount = 0;

367

368

this.setupEventListeners();

369

}

370

371

setupEventListeners() {

372

this.queue.on('completed', () => {

373

this.successCount++;

374

this.adjustConcurrency();

375

});

376

377

this.queue.on('error', () => {

378

this.errorCount++;

379

this.adjustConcurrency();

380

});

381

}

382

383

adjustConcurrency() {

384

const total = this.errorCount + this.successCount;

385

if (total < 10) return; // Need enough data

386

387

this.errorRate = this.errorCount / total;

388

389

if (this.errorRate > 0.1) {

390

// High error rate, reduce concurrency

391

const newConcurrency = Math.max(1, Math.floor(this.queue.concurrency * 0.8));

392

this.queue.concurrency = newConcurrency;

393

console.log(`High error rate (${(this.errorRate * 100).toFixed(1)}%), reducing concurrency to ${newConcurrency}`);

394

} else if (this.errorRate < 0.02 && this.queue.concurrency < 10) {

395

// Low error rate, can increase concurrency

396

const newConcurrency = Math.min(10, this.queue.concurrency + 1);

397

this.queue.concurrency = newConcurrency;

398

console.log(`Low error rate (${(this.errorRate * 100).toFixed(1)}%), increasing concurrency to ${newConcurrency}`);

399

}

400

}

401

}

402

403

const queue = new PQueue({ concurrency: 2 });

404

const adaptive = new AdaptiveConcurrency(queue);

405

```

406

407

### Error Recovery Patterns

408

409

```typescript

410

// Retry failed tasks with exponential backoff

411

class RetryQueue extends PQueue {

412

constructor(options = {}) {

413

super(options);

414

this.retryAttempts = new Map();

415

this.maxRetries = options.maxRetries || 3;

416

417

this.on('error', this.handleError.bind(this));

418

}

419

420

async handleError(error, taskInfo) {

421

if (!taskInfo?.id) return;

422

423

const attempts = this.retryAttempts.get(taskInfo.id) || 0;

424

425

if (attempts < this.maxRetries) {

426

this.retryAttempts.set(taskInfo.id, attempts + 1);

427

428

// Exponential backoff

429

const delay = Math.pow(2, attempts) * 1000;

430

431

setTimeout(() => {

432

console.log(`Retrying task ${taskInfo.id} (attempt ${attempts + 1})`);

433

this.add(taskInfo.task, {

434

...taskInfo.options,

435

id: taskInfo.id

436

});

437

}, delay);

438

} else {

439

console.error(`Task ${taskInfo.id} failed after ${this.maxRetries} attempts`);

440

}

441

}

442

}

443

```