Promise queue with concurrency control for managing asynchronous operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.
Controls the maximum number of tasks that can run simultaneously.
/**
* Get the current concurrency limit
*/
get concurrency(): number;
/**
* Set a new concurrency limit. Changes take effect immediately.
* @param newConcurrency - Number from 1 and up
* @throws TypeError if not a number >= 1
*/
set concurrency(newConcurrency: number);Usage Examples:
import PQueue from "p-queue";
const queue = new PQueue({ concurrency: 2 });
// Check current concurrency
console.log(queue.concurrency); // 2
// Add some tasks
queue.add(async () => task1());
queue.add(async () => task2());
queue.add(async () => task3());
queue.add(async () => task4());
// Initially 2 tasks run, 2 are queued
console.log(queue.pending); // 2
console.log(queue.size); // 2
// Increase concurrency dynamically
queue.concurrency = 4;
// Now all 4 tasks can run simultaneously
// Decrease concurrency
queue.concurrency = 1;
// New tasks will be limited to 1 at a timeNumber of queued items waiting to run.
/**
* Size of the queue, the number of queued items waiting to run.
*/
get size(): number;Number of running items (no longer in the queue).
/**
* Number of running items (no longer in the queue).
*/
get pending(): number;Whether the queue is currently paused.
/**
* Whether the queue is currently paused.
*/
get isPaused(): boolean;Usage Examples:
const queue = new PQueue({ concurrency: 3 });
// Monitor queue state
console.log(`Queued: ${queue.size}, Running: ${queue.pending}, Paused: ${queue.isPaused}`);
// Add tasks and monitor changes
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
queue.add(async () => delay(1000));
// Check state after adding tasks
console.log(`Queued: ${queue.size}, Running: ${queue.pending}`); // Queued: 2, Running: 3
// Pause and check state
queue.pause();
console.log(`Paused: ${queue.isPaused}`); // Paused: truePer-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already. Applies to each future operation.
/**
* Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already.
* Applies to each future operation.
*/
timeout?: number;Usage Examples:
import PQueue from "p-queue";
const queue = new PQueue({ concurrency: 2, timeout: 5000 });
// Set timeout for all future operations
queue.timeout = 3000;
// Add task that will timeout after 3 seconds
await queue.add(async () => {
// This operation will timeout after 3 seconds
await someSlowOperation();
});
// Individual tasks can override the queue timeout
await queue.add(async () => {
return fastOperation();
}, { timeout: 1000 }); // This task has 1 second timeoutGet the size of the queue filtered by specific options.
/**
* Size of the queue, filtered by the given options.
* For example, this can be used to find the number of items remaining in the queue with a specific priority level.
* @param options - Filter options to match against queued items
* @returns Number of matching items in the queue
*/
sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;Usage Examples:
// Add tasks with different priorities
await queue.add(async () => task1(), { priority: 10 });
await queue.add(async () => task2(), { priority: 5 });
await queue.add(async () => task3(), { priority: 10 });
await queue.add(async () => task4(), { priority: 0 });
// Check queue size by priority
console.log(queue.sizeBy({ priority: 10 })); // 2 tasks with priority 10
console.log(queue.sizeBy({ priority: 5 })); // 1 task with priority 5
console.log(queue.size); // 4 total tasksUpdates the priority of a queued task by its ID, affecting execution order.
/**
* Updates the priority of a promise function by its id, affecting its execution order.
* Requires a defined concurrency limit to take effect.
*
* @param id - The unique identifier of the task
* @param priority - The new priority level (higher numbers = higher priority)
* @throws ReferenceError if no task with the given ID exists
*/
setPriority(id: string, priority: number): void;Usage Examples:
const queue = new PQueue({ concurrency: 1 });
// Add tasks with IDs and priorities
queue.add(async () => '🦄', { priority: 1 });
queue.add(async () => '🦀', { priority: 0, id: 'crab-task' });
queue.add(async () => '🦄', { priority: 1 });
queue.add(async () => '🦄', { priority: 1 });
// Before execution, increase priority of crab task
queue.setPriority('crab-task', 2);
// Now crab task will run second (after first unicorn task that's already running)
// Decrease priority example
const queue2 = new PQueue({ concurrency: 1 });
queue2.add(async () => '🦄', { priority: 1 });
queue2.add(async () => '🦀', { priority: 1, id: 'crab-task' });
queue2.add(async () => '🦄');
queue2.add(async () => '🦄', { priority: 0 });
queue2.setPriority('crab-task', -1);
// Now crab task will execute lastp-queue supports interval-based rate limiting to prevent overwhelming external services.
type Options<QueueType, QueueOptions> = {
readonly intervalCap?: number; // Max runs per interval (default: Infinity)
readonly interval?: number; // Interval length in ms (default: 0)
readonly carryoverConcurrencyCount?: boolean; // Carry over pending tasks (default: false)
// ... other options
};Usage Examples:
// Rate limiting: Max 10 requests per 1 second
const apiQueue = new PQueue({
concurrency: 5,
intervalCap: 10,
interval: 1000
});
// This will spread requests to comply with rate limits
for (let i = 0; i < 50; i++) {
apiQueue.add(async () => {
const response = await fetch(`https://api.example.com/data/${i}`);
return response.json();
});
}
// Advanced rate limiting with carryover
const restrictedQueue = new PQueue({
concurrency: 2,
intervalCap: 5,
interval: 2000,
carryoverConcurrencyCount: true // Tasks in progress count toward next interval
});
// Burst prevention: Max 3 operations per 5 seconds
const burstQueue = new PQueue({
concurrency: 10, // High concurrency allowed
intervalCap: 3, // But only 3 per interval
interval: 5000 // Every 5 seconds
});type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
readonly concurrency?: number; // Concurrency limit (default: Infinity, min: 1)
readonly autoStart?: boolean; // Auto-execute tasks when added (default: true)
readonly queueClass?: new () => QueueType; // Custom queue implementation
readonly intervalCap?: number; // Max runs in interval (default: Infinity, min: 1)
readonly interval?: number; // Interval length in ms (default: 0, min: 0)
readonly carryoverConcurrencyCount?: boolean; // Carry over tasks to next interval (default: false)
timeout?: number; // Default timeout for all tasks
throwOnTimeout?: boolean; // Whether timeout throws exception (default: false)
};Advanced Usage Examples:
// Comprehensive configuration
const advancedQueue = new PQueue({
concurrency: 3, // Run up to 3 tasks simultaneously
autoStart: false, // Don't start automatically (manual control)
intervalCap: 10, // Max 10 operations per interval
interval: 60000, // 1 minute intervals
carryoverConcurrencyCount: true, // Count running tasks toward next interval
timeout: 30000, // 30 second default timeout
throwOnTimeout: false // Return void on timeout instead of throwing
});
// Add tasks then start manually
queue.add(async () => heavyTask1());
queue.add(async () => heavyTask2());
queue.add(async () => heavyTask3());
// Start when ready
queue.start();
// Custom queue implementation
class FIFOQueue {
constructor() {
this.queue = [];
}
get size() { return this.queue.length; }
enqueue(run, options) {
this.queue.push({ run, ...options });
}
dequeue() {
return this.queue.shift()?.run;
}
filter(options) {
return this.queue
.filter(item => item.priority === options.priority)
.map(item => item.run);
}
setPriority(id, priority) {
const item = this.queue.find(item => item.id === id);
if (item) item.priority = priority;
}
}
const fifoQueue = new PQueue({
queueClass: FIFOQueue,
concurrency: 2
});// Monitor queue performance
const queue = new PQueue({ concurrency: 5 });
function logQueueState() {
console.log({
size: queue.size,
pending: queue.pending,
isPaused: queue.isPaused,
concurrency: queue.concurrency
});
}
// Log state periodically
const monitor = setInterval(logQueueState, 1000);
// Stop monitoring when idle
queue.onIdle().then(() => {
clearInterval(monitor);
console.log('Queue processing complete');
});Install with Tessl CLI
npx tessl i tessl/npm-p-queue