Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.
—
Comprehensive error handling system with custom error handlers, error wrapping, and selective error processing for fine-grained control over error management in promise pools.
Set a custom error handler to process errors selectively instead of collecting them automatically.
/**
* Set a custom error handler function
* @param handler - Function to handle errors, or undefined to disable
* @returns PromisePool instance for chaining
*/
handleError(handler: ErrorHandler<T>): PromisePool<T>;
type ErrorHandler<T> = (
error: Error,
item: T,
pool: Stoppable & UsesConcurrency
) => Promise<void> | void;Key Behaviors:
Usage Examples:
import { PromisePool } from "@supercharge/promise-pool";
// Collect specific errors, rethrow others
const collectedErrors = [];
try {
const { results } = await PromisePool
.for(users)
.withConcurrency(4)
.handleError(async (error, user, pool) => {
if (error instanceof ValidationError) {
collectedErrors.push({ error, user });
return; // Continue processing
}
if (error instanceof ThrottleError) {
await retryUser(user); // Custom retry logic
return;
}
// Uncaught errors will immediately stop the pool
throw error;
})
.process(async (user) => {
return await processUser(user);
});
await handleCollectedErrors(collectedErrors);
} catch (error) {
await handleCriticalError(error);
}
// Stop pool on critical errors
const { results } = await PromisePool
.for(items)
.handleError(async (error, item, pool) => {
if (error instanceof CriticalError) {
pool.stop(); // Stop processing remaining items
return;
}
console.warn(`Non-critical error for item ${item}:`, error.message);
})
.process(async (item) => processItem(item));Wrapper class that provides context about which item caused an error.
/**
* Error wrapper that includes the item that caused the error
*/
class PromisePoolError<T, E = any> extends Error {
/** The item that caused this error */
item: T;
/** The original, raw error instance */
raw: E;
constructor(error: E, item: T);
/**
* Create a new promise pool error instance wrapping the error and item
* @param error - The original error
* @param item - The item that caused the error
* @returns New PromisePoolError instance
*/
static createFrom<T, E = any>(error: E, item: T): PromisePoolError<T>;
}Usage Examples:
// Without custom error handler (default behavior)
const { results, errors } = await PromisePool
.for(users)
.process(async (user) => {
if (!user.email) {
throw new Error("Email is required");
}
return await processUser(user);
});
// Handle collected errors
errors.forEach(poolError => {
console.log(`Error processing user ${poolError.item.name}:`);
console.log(` Original error: ${poolError.raw.message}`);
console.log(` Item data:`, poolError.item);
});
// Create custom PromisePoolError
try {
await processItem(item);
} catch (error) {
const poolError = PromisePoolError.createFrom(error, item);
throw poolError;
}Error class specifically for validation failures in pool configuration.
/**
* Error class for validation failures
*/
class ValidationError extends Error {
constructor(message?: string);
/**
* Create a validation error with the given message
* @param message - Error message
* @returns New ValidationError instance
*/
static createFrom(message: string): ValidationError;
}Validation errors are thrown for:
Usage Examples:
try {
await PromisePool
.withConcurrency(-1) // Invalid: must be >= 1
.for(items)
.process(async (item) => item);
} catch (error) {
if (error instanceof ValidationError) {
console.log("Configuration error:", error.message);
// Handle validation error
}
}Special error class used internally when the pool is stopped manually.
/**
* Special error class for stopping pool execution
*/
class StopThePromisePoolError extends Error {
constructor();
}This error is thrown internally when pool.stop() is called and should not typically be handled directly by user code.
The simplest approach - let the pool collect all errors automatically.
const { results, errors } = await PromisePool
.for(items)
.process(async (item) => {
// Any thrown errors are automatically collected
return await processItem(item);
});
if (errors.length > 0) {
console.log(`${errors.length} errors occurred`);
errors.forEach(error => {
console.log(`Item: ${error.item}, Error: ${error.message}`);
});
}Handle specific error types differently while collecting others.
const collectedErrors = [];
const { results } = await PromisePool
.for(items)
.handleError(async (error, item, pool) => {
if (error instanceof NetworkError) {
// Retry network errors
await retryWithBackoff(() => processItem(item));
return;
}
if (error instanceof ValidationError) {
// Collect validation errors for later processing
collectedErrors.push({ error, item });
return;
}
// Rethrow unknown errors to stop the pool
throw error;
})
.process(async (item) => processItem(item));Stop processing when critical errors occur.
const { results } = await PromisePool
.for(items)
.handleError(async (error, item, pool) => {
if (error instanceof DatabaseConnectionError) {
console.error("Database connection lost, stopping pool");
pool.stop();
return;
}
// Log non-critical errors but continue processing
console.warn(`Non-critical error for item ${item}:`, error.message);
})
.process(async (item) => processItem(item));Advanced error handling with recovery mechanisms.
const retryCount = new Map();
const maxRetries = 3;
const { results } = await PromisePool
.for(items)
.handleError(async (error, item, pool) => {
const currentRetries = retryCount.get(item) || 0;
if (currentRetries < maxRetries && isRetryableError(error)) {
retryCount.set(item, currentRetries + 1);
// Add item back to processing queue (pseudo-code)
// In practice, you might need to track failed items separately
console.log(`Retrying item ${item} (attempt ${currentRetries + 1})`);
return;
}
// Max retries reached or non-retryable error
console.error(`Failed to process item ${item} after ${maxRetries} attempts`);
})
.process(async (item) => processItem(item));Install with Tessl CLI
npx tessl i tessl/npm-supercharge--promise-pool