Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.
npx @tessl/cli install tessl/npm-supercharge--promise-pool@3.2.0Promise Pool provides map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features. It enables controlled parallel execution of asynchronous operations while maintaining fine-grained control over execution flow, error handling, and result ordering.
npm install @supercharge/promise-poolimport { PromisePool } from "@supercharge/promise-pool";Default import:
import PromisePool from "@supercharge/promise-pool";CommonJS:
const { PromisePool } = require("@supercharge/promise-pool");import { PromisePool } from "@supercharge/promise-pool";
const users = [
{ name: "Marcus" },
{ name: "Norman" },
{ name: "Christian" }
];
const { results, errors } = await PromisePool
.withConcurrency(2)
.for(users)
.process(async (userData, index, pool) => {
const user = await User.createIfNotExisting(userData);
return user;
});Promise Pool is built around several key components:
PromisePool class providing chainable method calls for configurationPromisePoolExecutor handles the internal processing logicPrimary promise pool functionality for creating, configuring, and executing concurrent tasks with full type safety.
class PromisePool<T, ShouldUseCorrespondingResults extends boolean = false> {
constructor(items?: SomeIterable<T>);
static withConcurrency(concurrency: number): PromisePool<unknown>;
static withTaskTimeout(timeout: number): PromisePool<unknown>;
static for<T>(items: SomeIterable<T>): PromisePool<T>;
withConcurrency(concurrency: number): PromisePool<T>;
withTaskTimeout(timeout: number): PromisePool<T>;
for<ItemType>(items: SomeIterable<ItemType>): PromisePool<ItemType>;
useCorrespondingResults(): PromisePool<T, true>;
process<ResultType, ErrorType = any>(
callback: ProcessHandler<T, ResultType>
): Promise<ReturnValue<T, ShouldUseCorrespondingResults extends true ? ResultType | symbol : ResultType, ErrorType>>;
}Comprehensive error handling system with custom error handlers, error wrapping, and selective error processing.
handleError(handler: ErrorHandler<T>): PromisePool<T>;
type ErrorHandler<T> = (
error: Error,
item: T,
pool: Stoppable & UsesConcurrency
) => Promise<void> | void;
class PromisePoolError<T, E = any> extends Error {
item: T;
raw: E;
constructor(error: E, item: T);
static createFrom<T, E = any>(error: E, item: T): PromisePoolError<T>;
}Real-time progress monitoring with task lifecycle callbacks and comprehensive statistics.
onTaskStarted(handler: OnProgressCallback<T>): PromisePool<T>;
onTaskFinished(handler: OnProgressCallback<T>): PromisePool<T>;
type OnProgressCallback<T> = (
item: T,
pool: Stoppable & Statistics<T> & UsesConcurrency
) => void;
interface Statistics<T> {
activeTasksCount(): number;
activeTaskCount(): number; // deprecated - use activeTasksCount
processedItems(): T[];
processedCount(): number;
processedPercentage(): number;
}Manual pool control capabilities including stopping execution and checking pool state.
interface Stoppable {
stop(): void;
isStopped(): boolean;
}
interface UsesConcurrency {
useConcurrency(concurrency: number): this;
concurrency(): number;
}type SomeIterable<T> = T[] | Iterable<T> | AsyncIterable<T>;
type ProcessHandler<T, R> = (
item: T,
index: number,
pool: Stoppable & UsesConcurrency
) => Promise<R> | R;
interface ReturnValue<T, R, E = any> {
results: R[];
errors: Array<PromisePoolError<T, E>>;
}
// Special symbols for corresponding results
static readonly notRun: symbol;
static readonly failed: symbol;