CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-p-queue

Promise queue with concurrency control for managing asynchronous operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

concurrency-control.mddocs/

Concurrency Control

Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.

Capabilities

Concurrency Management

concurrency Property

Controls the maximum number of tasks that can run simultaneously.

/**
 * Get the current concurrency limit
 */
get concurrency(): number;

/**
 * Set a new concurrency limit. Changes take effect immediately.
 * @param newConcurrency - Number from 1 and up
 * @throws TypeError if not a number >= 1
 */
set concurrency(newConcurrency: number);

Usage Examples:

import PQueue from "p-queue";

const queue = new PQueue({ concurrency: 2 });

// Check current concurrency
console.log(queue.concurrency); // 2

// Add some tasks
queue.add(async () => task1());
queue.add(async () => task2());
queue.add(async () => task3());
queue.add(async () => task4());

// Initially 2 tasks run, 2 are queued
console.log(queue.pending); // 2
console.log(queue.size); // 2

// Increase concurrency dynamically
queue.concurrency = 4;
// Now all 4 tasks can run simultaneously

// Decrease concurrency
queue.concurrency = 1;
// New tasks will be limited to 1 at a time

Queue State Inspection

size Property

Number of queued items waiting to run.

/**
 * Size of the queue, the number of queued items waiting to run.
 */
get size(): number;

pending Property

Number of running items (no longer in the queue).

/**
 * Number of running items (no longer in the queue).
 */
get pending(): number;

isPaused Property

Whether the queue is currently paused.

/**
 * Whether the queue is currently paused.
 */
get isPaused(): boolean;

Usage Examples:

const queue = new PQueue({ concurrency: 3 });

// Monitor queue state
console.log(`Queued: ${queue.size}, Running: ${queue.pending}, Paused: ${queue.isPaused}`);

// Add tasks and monitor changes
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));

// Check state after adding tasks
console.log(`Queued: ${queue.size}, Running: ${queue.pending}`); // Queued: 2, Running: 3

// Pause and check state
queue.pause();
console.log(`Paused: ${queue.isPaused}`); // Paused: true

timeout Property

Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already. Applies to each future operation.

/**
 * Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already.
 * Applies to each future operation.
 */
timeout?: number;

Usage Examples:

import PQueue from "p-queue";

const queue = new PQueue({ concurrency: 2, timeout: 5000 });

// Set timeout for all future operations
queue.timeout = 3000;

// Add task that will timeout after 3 seconds
await queue.add(async () => {
  // This operation will timeout after 3 seconds
  await someSlowOperation();
});

// Individual tasks can override the queue timeout
await queue.add(async () => {
  return fastOperation();
}, { timeout: 1000 }); // This task has 1 second timeout

sizeBy Method

Get the size of the queue filtered by specific options.

/**
 * Size of the queue, filtered by the given options.
 * For example, this can be used to find the number of items remaining in the queue with a specific priority level.
 * @param options - Filter options to match against queued items
 * @returns Number of matching items in the queue
 */
sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;

Usage Examples:

// Add tasks with different priorities
await queue.add(async () => task1(), { priority: 10 });
await queue.add(async () => task2(), { priority: 5 });
await queue.add(async () => task3(), { priority: 10 });
await queue.add(async () => task4(), { priority: 0 });

// Check queue size by priority
console.log(queue.sizeBy({ priority: 10 })); // 2 tasks with priority 10
console.log(queue.sizeBy({ priority: 5 })); // 1 task with priority 5
console.log(queue.size); // 4 total tasks

Priority Management

setPriority Method

Updates the priority of a queued task by its ID, affecting execution order.

/**
 * Updates the priority of a promise function by its id, affecting its execution order. 
 * Requires a defined concurrency limit to take effect.
 * 
 * @param id - The unique identifier of the task
 * @param priority - The new priority level (higher numbers = higher priority)
 * @throws ReferenceError if no task with the given ID exists
 */
setPriority(id: string, priority: number): void;

Usage Examples:

const queue = new PQueue({ concurrency: 1 });

// Add tasks with IDs and priorities
queue.add(async () => '🦄', { priority: 1 });
queue.add(async () => '🦀', { priority: 0, id: 'crab-task' });
queue.add(async () => '🦄', { priority: 1 });
queue.add(async () => '🦄', { priority: 1 });

// Before execution, increase priority of crab task
queue.setPriority('crab-task', 2);
// Now crab task will run second (after first unicorn task that's already running)

// Decrease priority example
const queue2 = new PQueue({ concurrency: 1 });

queue2.add(async () => '🦄', { priority: 1 });
queue2.add(async () => '🦀', { priority: 1, id: 'crab-task' });
queue2.add(async () => '🦄');
queue2.add(async () => '🦄', { priority: 0 });

queue2.setPriority('crab-task', -1);
// Now crab task will execute last

Interval-Based Throttling

p-queue supports interval-based rate limiting to prevent overwhelming external services.

Configuration Options

type Options<QueueType, QueueOptions> = {
  readonly intervalCap?: number;        // Max runs per interval (default: Infinity)
  readonly interval?: number;           // Interval length in ms (default: 0)
  readonly carryoverConcurrencyCount?: boolean; // Carry over pending tasks (default: false)
  // ... other options
};

Usage Examples:

// Rate limiting: Max 10 requests per 1 second
const apiQueue = new PQueue({
  concurrency: 5,
  intervalCap: 10,
  interval: 1000
});

// This will spread requests to comply with rate limits
for (let i = 0; i < 50; i++) {
  apiQueue.add(async () => {
    const response = await fetch(`https://api.example.com/data/${i}`);
    return response.json();
  });
}

// Advanced rate limiting with carryover
const restrictedQueue = new PQueue({
  concurrency: 2,
  intervalCap: 5,
  interval: 2000,
  carryoverConcurrencyCount: true // Tasks in progress count toward next interval
});

// Burst prevention: Max 3 operations per 5 seconds
const burstQueue = new PQueue({
  concurrency: 10, // High concurrency allowed
  intervalCap: 3,  // But only 3 per interval
  interval: 5000   // Every 5 seconds
});

Advanced Configuration

Queue Initialization Options

type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
  readonly concurrency?: number;              // Concurrency limit (default: Infinity, min: 1)
  readonly autoStart?: boolean;               // Auto-execute tasks when added (default: true)
  readonly queueClass?: new () => QueueType;  // Custom queue implementation
  readonly intervalCap?: number;              // Max runs in interval (default: Infinity, min: 1)  
  readonly interval?: number;                 // Interval length in ms (default: 0, min: 0)
  readonly carryoverConcurrencyCount?: boolean; // Carry over tasks to next interval (default: false)
  timeout?: number;                          // Default timeout for all tasks
  throwOnTimeout?: boolean;                  // Whether timeout throws exception (default: false)
};

Advanced Usage Examples:

// Comprehensive configuration
const advancedQueue = new PQueue({
  concurrency: 3,           // Run up to 3 tasks simultaneously
  autoStart: false,         // Don't start automatically (manual control)
  intervalCap: 10,          // Max 10 operations per interval
  interval: 60000,          // 1 minute intervals
  carryoverConcurrencyCount: true, // Count running tasks toward next interval
  timeout: 30000,           // 30 second default timeout
  throwOnTimeout: false     // Return void on timeout instead of throwing
});

// Add tasks then start manually
queue.add(async () => heavyTask1());
queue.add(async () => heavyTask2());
queue.add(async () => heavyTask3());

// Start when ready
queue.start();

// Custom queue implementation
class FIFOQueue {
  constructor() {
    this.queue = [];
  }
  
  get size() { return this.queue.length; }
  
  enqueue(run, options) {
    this.queue.push({ run, ...options });
  }
  
  dequeue() {
    return this.queue.shift()?.run;
  }
  
  filter(options) {
    return this.queue
      .filter(item => item.priority === options.priority)
      .map(item => item.run);
  }
  
  setPriority(id, priority) {
    const item = this.queue.find(item => item.id === id);
    if (item) item.priority = priority;
  }
}

const fifoQueue = new PQueue({
  queueClass: FIFOQueue,
  concurrency: 2
});

Performance Monitoring

// Monitor queue performance
const queue = new PQueue({ concurrency: 5 });

function logQueueState() {
  console.log({
    size: queue.size,
    pending: queue.pending,
    isPaused: queue.isPaused,
    concurrency: queue.concurrency
  });
}

// Log state periodically
const monitor = setInterval(logQueueState, 1000);

// Stop monitoring when idle
queue.onIdle().then(() => {
  clearInterval(monitor);
  console.log('Queue processing complete');
});

Install with Tessl CLI

npx tessl i tessl/npm-p-queue

docs

concurrency-control.md

events.md

index.md

queue-management.md

tile.json