Promise queue with concurrency control for managing asynchronous operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling and real-time queue monitoring.
p-queue emits various events throughout the task lifecycle and queue state changes.
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';p-queue extends EventEmitter3, providing standard event listener methods.
/**
* Event listener methods inherited from EventEmitter3
*/
class PQueue extends EventEmitter<EventName> {
on(event: EventName, listener: (...args: any[]) => void): this;
off(event: EventName, listener: (...args: any[]) => void): this;
once(event: EventName, listener: (...args: any[]) => void): this;
emit(event: EventName, ...args: any[]): boolean;
}Emitted when a task is added to the queue.
/**
* Emitted when a task is added to the queue via add() or addAll()
* No parameters passed to listeners
*/
queue.on('add', () => void);Usage Examples:
import PQueue from "p-queue";
const queue = new PQueue({ concurrency: 2 });
queue.on('add', () => {
console.log(`Task added. Queue size: ${queue.size}, Pending: ${queue.pending}`);
});
// This will trigger the 'add' event
queue.add(async () => someTask());
queue.add(async () => anotherTask());Emitted when a task starts running (becomes active).
/**
* Emitted when a task begins execution
* No parameters passed to listeners
*/
queue.on('active', () => void);Usage Examples:
queue.on('active', () => {
console.log(`Task started. Running: ${queue.pending}, Queued: ${queue.size}`);
});
// Monitor active tasks
let activeTasks = 0;
queue.on('active', () => {
activeTasks++;
console.log(`Active tasks: ${activeTasks}`);
});
queue.on('next', () => {
activeTasks--;
console.log(`Active tasks: ${activeTasks}`);
});Emitted when a task completes and the next task can start.
/**
* Emitted when a task finishes (successfully or with error) and the next task can begin
* No parameters passed to listeners
*/
queue.on('next', () => void);Usage Examples:
queue.on('next', () => {
console.log(`Task finished. ${queue.pending} still running, ${queue.size} queued`);
});
// Track task completion rate
let completedTasks = 0;
queue.on('next', () => {
completedTasks++;
if (completedTasks % 10 === 0) {
console.log(`${completedTasks} tasks completed`);
}
});Emitted when a task completes successfully.
/**
* Emitted when a task completes successfully
* @param result - The result returned by the task
*/
queue.on('completed', (result: unknown) => void);Usage Examples:
queue.on('completed', (result) => {
console.log('Task completed with result:', result);
});
// Collect results
const results = [];
queue.on('completed', (result) => {
results.push(result);
});
// Add tasks
await queue.add(async () => ({ id: 1, data: 'first' }));
await queue.add(async () => ({ id: 2, data: 'second' }));Emitted when a task throws an error.
/**
* Emitted when a task throws an error or times out (if throwOnTimeout is true)
* @param error - The error thrown by the task
*/
queue.on('error', (error: unknown) => void);Usage Examples:
queue.on('error', (error) => {
console.error('Task failed:', error);
// Log error details
if (error instanceof Error) {
console.error('Error message:', error.message);
console.error('Stack trace:', error.stack);
}
});
// Error counting and reporting
let errorCount = 0;
queue.on('error', (error) => {
errorCount++;
console.log(`Total errors: ${errorCount}`);
// Send to error tracking service
errorTracker.captureException(error);
});
// Add tasks that might fail
queue.add(async () => {
if (Math.random() < 0.3) {
throw new Error('Random failure');
}
return 'success';
});Emitted when the queue becomes empty (no more tasks waiting to run).
/**
* Emitted when the queue becomes empty (queue.size === 0)
* Note: Some tasks may still be running (pending > 0)
* No parameters passed to listeners
*/
queue.on('empty', () => void);Usage Examples:
queue.on('empty', () => {
console.log('Queue is empty - no more tasks waiting');
console.log(`But ${queue.pending} tasks are still running`);
});
// Trigger actions when queue empties
queue.on('empty', () => {
// Maybe add more tasks dynamically
if (shouldAddMoreTasks()) {
addMoreTasks();
}
});Emitted when the queue becomes idle (empty and no tasks running).
/**
* Emitted when the queue becomes completely idle (queue.size === 0 && queue.pending === 0)
* All work has finished
* No parameters passed to listeners
*/
queue.on('idle', () => void);Usage Examples:
queue.on('idle', () => {
console.log('Queue is completely idle - all work finished');
});
// Cleanup when all work is done
queue.on('idle', () => {
// Close database connections, save state, etc.
cleanup();
});
// Trigger next phase of work
queue.on('idle', async () => {
console.log('Phase 1 complete, starting phase 2');
await startPhase2();
});Complete event monitoring example:
import PQueue from "p-queue";
const queue = new PQueue({ concurrency: 3 });
// Monitor all events
queue.on('add', () => {
console.log(`📝 Task added (Queue: ${queue.size}, Running: ${queue.pending})`);
});
queue.on('active', () => {
console.log(`🏃 Task started (Queue: ${queue.size}, Running: ${queue.pending})`);
});
queue.on('next', () => {
console.log(`⏭️ Task finished (Queue: ${queue.size}, Running: ${queue.pending})`);
});
queue.on('completed', (result) => {
console.log(`✅ Task completed:`, result);
});
queue.on('error', (error) => {
console.error(`❌ Task failed:`, error.message);
});
queue.on('empty', () => {
console.log(`📭 Queue empty (${queue.pending} still running)`);
});
queue.on('idle', () => {
console.log(`😴 Queue idle - all work complete`);
});
// Add some tasks to see events in action
queue.add(async () => {
await delay(1000);
return 'Task 1 result';
});
queue.add(async () => {
await delay(500);
throw new Error('Task 2 failed');
});
queue.add(async () => {
await delay(800);
return 'Task 3 result';
});class QueueProgressTracker {
constructor(queue) {
this.queue = queue;
this.totalTasks = 0;
this.completedTasks = 0;
this.failedTasks = 0;
this.setupEventListeners();
}
setupEventListeners() {
this.queue.on('add', () => {
this.totalTasks++;
this.updateProgress();
});
this.queue.on('completed', () => {
this.completedTasks++;
this.updateProgress();
});
this.queue.on('error', () => {
this.failedTasks++;
this.updateProgress();
});
}
updateProgress() {
const finished = this.completedTasks + this.failedTasks;
const progress = this.totalTasks > 0 ? (finished / this.totalTasks) * 100 : 0;
console.log(`Progress: ${progress.toFixed(1)}% (${finished}/${this.totalTasks})`);
console.log(`✅ ${this.completedTasks} completed, ❌ ${this.failedTasks} failed`);
}
}
const queue = new PQueue({ concurrency: 5 });
const tracker = new QueueProgressTracker(queue);class AdaptiveConcurrency {
constructor(queue) {
this.queue = queue;
this.errorRate = 0;
this.errorCount = 0;
this.successCount = 0;
this.setupEventListeners();
}
setupEventListeners() {
this.queue.on('completed', () => {
this.successCount++;
this.adjustConcurrency();
});
this.queue.on('error', () => {
this.errorCount++;
this.adjustConcurrency();
});
}
adjustConcurrency() {
const total = this.errorCount + this.successCount;
if (total < 10) return; // Need enough data
this.errorRate = this.errorCount / total;
if (this.errorRate > 0.1) {
// High error rate, reduce concurrency
const newConcurrency = Math.max(1, Math.floor(this.queue.concurrency * 0.8));
this.queue.concurrency = newConcurrency;
console.log(`High error rate (${(this.errorRate * 100).toFixed(1)}%), reducing concurrency to ${newConcurrency}`);
} else if (this.errorRate < 0.02 && this.queue.concurrency < 10) {
// Low error rate, can increase concurrency
const newConcurrency = Math.min(10, this.queue.concurrency + 1);
this.queue.concurrency = newConcurrency;
console.log(`Low error rate (${(this.errorRate * 100).toFixed(1)}%), increasing concurrency to ${newConcurrency}`);
}
}
}
const queue = new PQueue({ concurrency: 2 });
const adaptive = new AdaptiveConcurrency(queue);// Retry failed tasks with exponential backoff
class RetryQueue extends PQueue {
constructor(options = {}) {
super(options);
this.retryAttempts = new Map();
this.maxRetries = options.maxRetries || 3;
this.on('error', this.handleError.bind(this));
}
async handleError(error, taskInfo) {
if (!taskInfo?.id) return;
const attempts = this.retryAttempts.get(taskInfo.id) || 0;
if (attempts < this.maxRetries) {
this.retryAttempts.set(taskInfo.id, attempts + 1);
// Exponential backoff
const delay = Math.pow(2, attempts) * 1000;
setTimeout(() => {
console.log(`Retrying task ${taskInfo.id} (attempt ${attempts + 1})`);
this.add(taskInfo.task, {
...taskInfo.options,
id: taskInfo.id
});
}, delay);
} else {
console.error(`Task ${taskInfo.id} failed after ${this.maxRetries} attempts`);
}
}
}Install with Tessl CLI
npx tessl i tessl/npm-p-queue