Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.
—
Primary promise pool functionality for creating, configuring, and executing concurrent tasks with full type safety and flexible configuration options.
Main class for creating and managing promise pools with fluent interface pattern.
/**
* Main promise pool class with fluent interface for concurrent task processing
*/
class PromisePool<T, ShouldUseCorrespondingResults extends boolean = false> {
constructor(items?: SomeIterable<T>);
// Static factory methods
static withConcurrency(concurrency: number): PromisePool<unknown>;
static withTaskTimeout(timeout: number): PromisePool<unknown>;
static for<T>(items: SomeIterable<T>): PromisePool<T>;
// Instance configuration methods
withConcurrency(concurrency: number): PromisePool<T>;
withTaskTimeout(timeout: number): PromisePool<T>;
for<ItemType>(items: SomeIterable<ItemType>): PromisePool<ItemType>;
useCorrespondingResults(): PromisePool<T, true>;
// Execution method
process<ResultType, ErrorType = any>(
callback: ProcessHandler<T, ResultType>
): Promise<ReturnValue<T, ShouldUseCorrespondingResults extends true ? ResultType | symbol : ResultType, ErrorType>>;
// Static symbols for corresponding results
static readonly notRun: symbol;
static readonly failed: symbol;
}Usage Examples:
import { PromisePool } from "@supercharge/promise-pool";
// Basic usage with default concurrency (10)
const { results } = await PromisePool
.for([1, 2, 3, 4, 5])
.process(async (num) => num * 2);
// Custom concurrency
const { results } = await PromisePool
.withConcurrency(3)
.for(urls)
.process(async (url) => fetch(url));
// With task timeout
const { results } = await PromisePool
.for(tasks)
.withTaskTimeout(5000) // 5 seconds per task
.process(async (task) => processTask(task));Configure the number of tasks that run concurrently.
/**
* Set the number of tasks to process concurrently
* @param concurrency - Number of concurrent tasks (must be >= 1)
* @returns PromisePool instance for chaining
*/
withConcurrency(concurrency: number): PromisePool<T>;
/**
* Static method to create pool with specified concurrency
* @param concurrency - Number of concurrent tasks
* @returns New PromisePool instance
*/
static withConcurrency(concurrency: number): PromisePool<unknown>;Configure timeout for individual tasks.
/**
* Set the timeout in milliseconds for each task
* @param timeout - Timeout in milliseconds (per task, not total)
* @returns PromisePool instance for chaining
*/
withTaskTimeout(timeout: number): PromisePool<T>;
/**
* Static method to create pool with task timeout
* @param timeout - Timeout in milliseconds
* @returns New PromisePool instance
*/
static withTaskTimeout(timeout: number): PromisePool<unknown>;Usage Example:
// Each task must complete within 2 seconds
const { results } = await PromisePool
.for(items)
.withTaskTimeout(2000)
.process(async (item) => {
// This task will timeout if it takes longer than 2 seconds
return await processItem(item);
});Set the items to be processed by the pool.
/**
* Set the items to be processed in the promise pool
* @param items - Array, Iterable, or AsyncIterable of items
* @returns New PromisePool instance with specified items
*/
for<ItemType>(items: SomeIterable<ItemType>): PromisePool<ItemType>;
/**
* Static method to create pool for specified items
* @param items - Items to process
* @returns New PromisePool instance
*/
static for<T>(items: SomeIterable<T>): PromisePool<T>;
type SomeIterable<T> = T[] | Iterable<T> | AsyncIterable<T>;Usage Examples:
// With arrays
await PromisePool.for([1, 2, 3]).process(async (num) => num * 2);
// With generators/iterables
function* numberGenerator() {
for (let i = 0; i < 10; i++) yield i;
}
await PromisePool.for(numberGenerator()).process(async (num) => num * 2);
// With async iterables
async function* asyncGenerator() {
for (let i = 0; i < 5; i++) {
await new Promise(resolve => setTimeout(resolve, 100));
yield i;
}
}
await PromisePool.for(asyncGenerator()).process(async (num) => num * 2);Enable result-source correspondence to maintain order between input items and results.
/**
* Configure results to correspond with source items by position
* @returns PromisePool with corresponding results enabled
*/
useCorrespondingResults(): PromisePool<T, true>;When using corresponding results, the results array will contain:
PromisePool.notRun symbol for tasks that didn't executePromisePool.failed symbol for tasks that failedUsage Example:
const { results } = await PromisePool
.for([1, 2, 3])
.withConcurrency(2)
.useCorrespondingResults()
.process(async (num, index) => {
if (num === 2) throw new Error("Skip 2");
return num * 10;
});
// Results array will be: [10, Symbol(failed), 30]
// Corresponding to input: [1, 2, 3]
// Filter for successful results only
const successfulResults = results.filter(result =>
result !== PromisePool.failed && result !== PromisePool.notRun
);Execute the promise pool with a processing function.
/**
* Process all items through the provided callback function
* @param callback - Function to process each item
* @returns Promise resolving to results and errors
*/
process<ResultType, ErrorType = any>(
callback: ProcessHandler<T, ResultType>
): Promise<ReturnValue<T, ShouldUseCorrespondingResults extends true ? ResultType | symbol : ResultType, ErrorType>>;
type ProcessHandler<T, R> = (
item: T,
index: number,
pool: Stoppable & UsesConcurrency
) => Promise<R> | R;
interface ReturnValue<T, R, E = any> {
results: R[];
errors: Array<PromisePoolError<T, E>>;
}The processing callback receives:
item: The current item being processedindex: The index of the item in the source array/iterablepool: Pool instance with control methods (stop(), isStopped(), etc.)Usage Examples:
// Basic processing
const { results, errors } = await PromisePool
.for(users)
.process(async (user, index, pool) => {
// Access to current item, index, and pool control
if (user.invalid) {
pool.stop(); // Stop processing remaining items
return null;
}
return await processUser(user);
});
// Handle errors separately
if (errors.length > 0) {
console.log("Errors occurred:", errors);
errors.forEach(error => {
console.log(`Error processing item ${error.item}:`, error.message);
});
}Install with Tessl CLI
npx tessl i tessl/npm-supercharge--promise-pool