Promise queue with concurrency control for managing asynchronous operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core queue operations for adding, controlling, and monitoring promise-based tasks. Provides methods for adding individual tasks, batch operations, and comprehensive queue state management.
Adds a sync or async task to the queue and returns a promise that resolves with the task result.
/**
* Adds a sync or async task to the queue. Always returns a promise.
* @param function_ - The task function to execute
* @param options - Optional configuration for the task
* @returns Promise that resolves with task result or void
*/
add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
/**
* Adds a task with throwOnTimeout enabled, guaranteeing a result
* @param function_ - The task function to execute
* @param options - Configuration with throwOnTimeout: true
* @returns Promise that resolves with task result (never void)
*/
add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;Usage Examples:
import PQueue from "p-queue";
const queue = new PQueue({ concurrency: 2 });
// Basic task addition
const result = await queue.add(async () => {
const response = await fetch('https://api.example.com/users');
return response.json();
});
// Task with priority and ID
await queue.add(
async ({ signal }) => {
// Task can check for abort signal
if (signal?.aborted) throw signal.reason;
return processImportantData();
},
{
priority: 10,
id: 'important-task',
timeout: 5000
}
);
// Task with abort signal
const controller = new AbortController();
const taskPromise = queue.add(
async ({ signal }) => {
// Handle abortion within task
signal?.addEventListener('abort', () => cleanup());
return longRunningOperation();
},
{ signal: controller.signal }
);
// Can abort the task
setTimeout(() => controller.abort(), 1000);Adds multiple sync or async tasks to the queue simultaneously.
/**
* Same as .add(), but accepts an array of sync or async functions
* @param functions - Array of task functions to execute
* @param options - Optional configuration applied to all tasks
* @returns Promise that resolves when all functions are resolved
*/
addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: Partial<EnqueueOptionsType>
): Promise<Array<TaskResultsType | void>>;
/**
* Adds multiple tasks with throwOnTimeout enabled
* @param functions - Array of task functions to execute
* @param options - Configuration with throwOnTimeout: true
* @returns Promise with guaranteed results (no void values)
*/
addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: {throwOnTimeout: true} & Partial<Exclude<EnqueueOptionsType, 'throwOnTimeout'>>
): Promise<TaskResultsType[]>;Usage Examples:
// Process multiple files
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
const results = await queue.addAll(
files.map(filename => async () => {
const data = await readFile(filename);
return processFileData(data);
}),
{ priority: 5 }
);
// Batch API requests with error handling
const urls = ['api/users', 'api/posts', 'api/comments'];
const responses = await queue.addAll(
urls.map(url => async ({ signal }) => {
try {
const response = await fetch(`https://api.example.com/${url}`, {
signal
});
return response.json();
} catch (error) {
console.error(`Failed to fetch ${url}:`, error);
return null;
}
})
);Starts or resumes executing enqueued tasks within concurrency limit.
/**
* Start (or resume) executing enqueued tasks within concurrency limit.
* No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)
* @returns The queue instance for chaining
*/
start(): this;Usage Examples:
// Create paused queue
const queue = new PQueue({
concurrency: 2,
autoStart: false
});
// Add tasks while paused
queue.add(async () => task1());
queue.add(async () => task2());
queue.add(async () => task3());
console.log(queue.size); // 3 (tasks are queued but not running)
console.log(queue.pending); // 0 (no tasks executing)
// Start processing
queue.start();
console.log(queue.pending); // 2 (up to concurrency limit)Puts queue execution on hold, preventing new tasks from starting.
/**
* Put queue execution on hold.
*/
pause(): void;Usage Examples:
const queue = new PQueue({ concurrency: 3 });
// Add some tasks
queue.add(async () => longTask1());
queue.add(async () => longTask2());
queue.add(async () => longTask3());
// Pause after 1 second
setTimeout(() => {
queue.pause();
console.log('Queue paused');
// Resume after another 2 seconds
setTimeout(() => {
queue.start();
console.log('Queue resumed');
}, 2000);
}, 1000);Clears all queued tasks that haven't started executing yet.
/**
* Clear the queue.
*/
clear(): void;Usage Examples:
const queue = new PQueue({ concurrency: 1 });
// Add tasks
queue.add(async () => task1());
queue.add(async () => task2());
queue.add(async () => task3());
console.log(queue.size); // 2 (one task running, two queued)
// Clear queued tasks
queue.clear();
console.log(queue.size); // 0 (queued tasks cleared)
console.log(queue.pending); // 1 (running task continues)Returns a promise that resolves when the queue becomes empty (no more tasks waiting).
/**
* Can be called multiple times. Useful if you for example add additional items at a later time.
* @returns A promise that settles when the queue becomes empty.
*/
onEmpty(): Promise<void>;Returns a promise that resolves when the queue becomes empty and all promises have completed.
/**
* The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished.
* .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
* @returns A promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.
*/
onIdle(): Promise<void>;Returns a promise that resolves when the queue size is less than the given limit.
/**
* @param limit - The size limit to wait for
* @returns A promise that settles when the queue size is less than the given limit: queue.size < limit.
*
* If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.
*
* Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.
*/
onSizeLessThan(limit: number): Promise<void>;Usage Examples:
const queue = new PQueue({ concurrency: 2 });
// Monitor queue states
queue.add(async () => {
await delay(1000);
return 'task1';
});
queue.add(async () => {
await delay(2000);
return 'task2';
});
// Wait for queue to be empty (no more tasks waiting)
await queue.onEmpty();
console.log('No more tasks in queue');
// Wait for all work to complete
await queue.onIdle();
console.log('All tasks completed');
// Limit queue growth
async function addTaskSafely(taskFn) {
// Wait if queue has too many items
await queue.onSizeLessThan(10);
return queue.add(taskFn);
}Tasks added to the queue can be configured with various options:
type QueueAddOptions = {
readonly priority?: number;
id?: string;
readonly signal?: AbortSignal;
timeout?: number;
throwOnTimeout?: boolean;
};
type TaskOptions = {
readonly signal?: AbortSignal;
};
type Task<TaskResultType> =
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
| ((options: TaskOptions) => TaskResultType);Configuration Examples:
// Priority-based execution (higher numbers = higher priority)
await queue.add(async () => criticalTask(), { priority: 10 });
await queue.add(async () => normalTask(), { priority: 0 });
await queue.add(async () => lowPriorityTask(), { priority: -5 });
// Task with timeout
await queue.add(
async () => {
// This task will timeout after 5 seconds
await slowOperation();
return result;
},
{
timeout: 5000,
throwOnTimeout: true // Will throw TimeoutError instead of returning void
}
);
// Task with unique ID for later priority updates
await queue.add(
async () => updateUserProfile(userId),
{ id: `profile-update-${userId}`, priority: 5 }
);
// Later, increase priority of specific task
queue.setPriority(`profile-update-${userId}`, 15);Install with Tessl CLI
npx tessl i tessl/npm-p-queue