CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-p-queue

Promise queue with concurrency control for managing asynchronous operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

index.mddocs/

p-queue

p-queue is a sophisticated promise queue library with fine-grained concurrency control for managing asynchronous operations. It enables developers to limit concurrent operations, implement rate limiting, priority-based execution, and interval-based throttling to prevent overwhelming APIs or system resources.

Package Information

  • Package Name: p-queue
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install p-queue

Core Imports

import PQueue from "p-queue";

Note: This package is ESM-only and does not provide CommonJS exports. Projects using CommonJS need to convert to ESM.

Basic Usage

import PQueue from "p-queue";

// Create queue with concurrency limit
const queue = new PQueue({ concurrency: 2 });

// Add tasks to the queue
await queue.add(async () => {
  const response = await fetch('https://api.example.com/data');
  return response.json();
});

// Add multiple tasks
const results = await queue.addAll([
  async () => processFile('file1.txt'),
  async () => processFile('file2.txt'),
  async () => processFile('file3.txt'),
]);

// Wait for all tasks to complete
await queue.onIdle();

Architecture

p-queue is built around several key components:

  • PQueue Class: Main queue class extending EventEmitter3 for real-time monitoring
  • Concurrency Control: Sophisticated limiting of simultaneous operations with interval-based throttling
  • Priority System: Configurable priority-based execution order with dynamic priority updates
  • Event System: Comprehensive event emission for monitoring queue state and task lifecycle
  • Queue Implementations: Pluggable queue classes with default PriorityQueue implementation
  • Task Management: Flexible task handling with timeout support and abort signal integration

Capabilities

Queue Management

Core queue operations for adding, controlling, and monitoring promise-based tasks. Includes methods for adding individual tasks, batch operations, and queue state management.

class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> {
  constructor(options?: Options<QueueType, EnqueueOptionsType>);
  
  // Core task management
  add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
  add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
  addAll<TaskResultsType>(functions: ReadonlyArray<Task<TaskResultsType>>, options?: Partial<EnqueueOptionsType>): Promise<Array<TaskResultsType | void>>;
  
  // Queue control
  start(): this;
  pause(): void;
  clear(): void;
  
  // State monitoring
  onEmpty(): Promise<void>;
  onIdle(): Promise<void>;
  onSizeLessThan(limit: number): Promise<void>;
}

Queue Management

Concurrency Control

Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.

// Concurrency properties and methods
class PQueue {
  get concurrency(): number;
  set concurrency(newConcurrency: number);
  
  timeout?: number;
  
  get size(): number;
  get pending(): number;
  get isPaused(): boolean;
  
  sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;
  setPriority(id: string, priority: number): void;
}

Concurrency Control

Event System

Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling.

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

// Event emission (inherited from EventEmitter3)
class PQueue extends EventEmitter<EventName> {
  on(event: EventName, listener: (...args: any[]) => void): this;
  off(event: EventName, listener: (...args: any[]) => void): this;
  emit(event: EventName, ...args: any[]): boolean;
}

Event System

Types

Core Types

type Task<TaskResultType> = 
  | ((options: TaskOptions) => PromiseLike<TaskResultType>)
  | ((options: TaskOptions) => TaskResultType);

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

type RunFunction = () => Promise<unknown>;

Configuration Options

type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
  readonly concurrency?: number;
  readonly autoStart?: boolean;
  readonly queueClass?: new () => QueueType;
  readonly intervalCap?: number;
  readonly interval?: number;
  readonly carryoverConcurrencyCount?: boolean;
  timeout?: number;
  throwOnTimeout?: boolean;
};

type QueueAddOptions = {
  readonly priority?: number;
  id?: string;
  readonly signal?: AbortSignal;
  timeout?: number;
  throwOnTimeout?: boolean;
};

type TaskOptions = {
  readonly signal?: AbortSignal;
};

Queue Interface

interface Queue<Element, Options> {
  size: number;
  filter: (options: Readonly<Partial<Options>>) => Element[];
  dequeue: () => Element | undefined;
  enqueue: (run: Element, options?: Partial<Options>) => void;
  setPriority: (id: string, priority: number) => void;
}

Install with Tessl CLI

npx tessl i tessl/npm-p-queue

docs

concurrency-control.md

events.md

index.md

queue-management.md

tile.json