or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

concurrency-control.mdevents.mdindex.mdqueue-management.md
tile.json

tessl/npm-p-queue

Promise queue with concurrency control for managing asynchronous operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/p-queue@8.1.x

To install, run

npx @tessl/cli install tessl/npm-p-queue@8.1.0

index.mddocs/

p-queue

p-queue is a sophisticated promise queue library with fine-grained concurrency control for managing asynchronous operations. It enables developers to limit concurrent operations, implement rate limiting, priority-based execution, and interval-based throttling to prevent overwhelming APIs or system resources.

Package Information

  • Package Name: p-queue
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install p-queue

Core Imports

import PQueue from "p-queue";

Note: This package is ESM-only and does not provide CommonJS exports. Projects using CommonJS need to convert to ESM.

Basic Usage

import PQueue from "p-queue";

// Create queue with concurrency limit
const queue = new PQueue({ concurrency: 2 });

// Add tasks to the queue
await queue.add(async () => {
  const response = await fetch('https://api.example.com/data');
  return response.json();
});

// Add multiple tasks
const results = await queue.addAll([
  async () => processFile('file1.txt'),
  async () => processFile('file2.txt'),
  async () => processFile('file3.txt'),
]);

// Wait for all tasks to complete
await queue.onIdle();

Architecture

p-queue is built around several key components:

  • PQueue Class: Main queue class extending EventEmitter3 for real-time monitoring
  • Concurrency Control: Sophisticated limiting of simultaneous operations with interval-based throttling
  • Priority System: Configurable priority-based execution order with dynamic priority updates
  • Event System: Comprehensive event emission for monitoring queue state and task lifecycle
  • Queue Implementations: Pluggable queue classes with default PriorityQueue implementation
  • Task Management: Flexible task handling with timeout support and abort signal integration

Capabilities

Queue Management

Core queue operations for adding, controlling, and monitoring promise-based tasks. Includes methods for adding individual tasks, batch operations, and queue state management.

class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> {
  constructor(options?: Options<QueueType, EnqueueOptionsType>);
  
  // Core task management
  add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
  add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
  addAll<TaskResultsType>(functions: ReadonlyArray<Task<TaskResultsType>>, options?: Partial<EnqueueOptionsType>): Promise<Array<TaskResultsType | void>>;
  
  // Queue control
  start(): this;
  pause(): void;
  clear(): void;
  
  // State monitoring
  onEmpty(): Promise<void>;
  onIdle(): Promise<void>;
  onSizeLessThan(limit: number): Promise<void>;
}

Queue Management

Concurrency Control

Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.

// Concurrency properties and methods
class PQueue {
  get concurrency(): number;
  set concurrency(newConcurrency: number);
  
  timeout?: number;
  
  get size(): number;
  get pending(): number;
  get isPaused(): boolean;
  
  sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;
  setPriority(id: string, priority: number): void;
}

Concurrency Control

Event System

Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling.

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

// Event emission (inherited from EventEmitter3)
class PQueue extends EventEmitter<EventName> {
  on(event: EventName, listener: (...args: any[]) => void): this;
  off(event: EventName, listener: (...args: any[]) => void): this;
  emit(event: EventName, ...args: any[]): boolean;
}

Event System

Types

Core Types

type Task<TaskResultType> = 
  | ((options: TaskOptions) => PromiseLike<TaskResultType>)
  | ((options: TaskOptions) => TaskResultType);

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

type RunFunction = () => Promise<unknown>;

Configuration Options

type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
  readonly concurrency?: number;
  readonly autoStart?: boolean;
  readonly queueClass?: new () => QueueType;
  readonly intervalCap?: number;
  readonly interval?: number;
  readonly carryoverConcurrencyCount?: boolean;
  timeout?: number;
  throwOnTimeout?: boolean;
};

type QueueAddOptions = {
  readonly priority?: number;
  id?: string;
  readonly signal?: AbortSignal;
  timeout?: number;
  throwOnTimeout?: boolean;
};

type TaskOptions = {
  readonly signal?: AbortSignal;
};

Queue Interface

interface Queue<Element, Options> {
  size: number;
  filter: (options: Readonly<Partial<Options>>) => Element[];
  dequeue: () => Element | undefined;
  enqueue: (run: Element, options?: Partial<Options>) => void;
  setPriority: (id: string, priority: number) => void;
}