Promise queue with concurrency control for managing asynchronous operations
npx @tessl/cli install tessl/npm-p-queue@8.1.0p-queue is a sophisticated promise queue library with fine-grained concurrency control for managing asynchronous operations. It enables developers to limit concurrent operations, implement rate limiting, priority-based execution, and interval-based throttling to prevent overwhelming APIs or system resources.
npm install p-queueimport PQueue from "p-queue";Note: This package is ESM-only and does not provide CommonJS exports. Projects using CommonJS need to convert to ESM.
import PQueue from "p-queue";
// Create queue with concurrency limit
const queue = new PQueue({ concurrency: 2 });
// Add tasks to the queue
await queue.add(async () => {
const response = await fetch('https://api.example.com/data');
return response.json();
});
// Add multiple tasks
const results = await queue.addAll([
async () => processFile('file1.txt'),
async () => processFile('file2.txt'),
async () => processFile('file3.txt'),
]);
// Wait for all tasks to complete
await queue.onIdle();p-queue is built around several key components:
Core queue operations for adding, controlling, and monitoring promise-based tasks. Includes methods for adding individual tasks, batch operations, and queue state management.
class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> {
constructor(options?: Options<QueueType, EnqueueOptionsType>);
// Core task management
add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
addAll<TaskResultsType>(functions: ReadonlyArray<Task<TaskResultsType>>, options?: Partial<EnqueueOptionsType>): Promise<Array<TaskResultsType | void>>;
// Queue control
start(): this;
pause(): void;
clear(): void;
// State monitoring
onEmpty(): Promise<void>;
onIdle(): Promise<void>;
onSizeLessThan(limit: number): Promise<void>;
}Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.
// Concurrency properties and methods
class PQueue {
get concurrency(): number;
set concurrency(newConcurrency: number);
timeout?: number;
get size(): number;
get pending(): number;
get isPaused(): boolean;
sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;
setPriority(id: string, priority: number): void;
}Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling.
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';
// Event emission (inherited from EventEmitter3)
class PQueue extends EventEmitter<EventName> {
on(event: EventName, listener: (...args: any[]) => void): this;
off(event: EventName, listener: (...args: any[]) => void): this;
emit(event: EventName, ...args: any[]): boolean;
}type Task<TaskResultType> =
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
| ((options: TaskOptions) => TaskResultType);
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';
type RunFunction = () => Promise<unknown>;type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
readonly concurrency?: number;
readonly autoStart?: boolean;
readonly queueClass?: new () => QueueType;
readonly intervalCap?: number;
readonly interval?: number;
readonly carryoverConcurrencyCount?: boolean;
timeout?: number;
throwOnTimeout?: boolean;
};
type QueueAddOptions = {
readonly priority?: number;
id?: string;
readonly signal?: AbortSignal;
timeout?: number;
throwOnTimeout?: boolean;
};
type TaskOptions = {
readonly signal?: AbortSignal;
};interface Queue<Element, Options> {
size: number;
filter: (options: Readonly<Partial<Options>>) => Element[];
dequeue: () => Element | undefined;
enqueue: (run: Element, options?: Partial<Options>) => void;
setPriority: (id: string, priority: number) => void;
}