Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.
—
Manual pool control capabilities including stopping execution, checking pool state, and managing concurrency dynamically during runtime.
Interface for manually controlling pool execution state.
/**
* Interface for stoppable promise pools
*/
interface Stoppable {
/** Stop the promise pool and prevent processing of remaining items */
stop(): void;
/** Determine whether the pool is marked as stopped */
isStopped(): boolean;
}Interface for managing concurrency settings at runtime.
/**
* Interface for concurrency management
*/
interface UsesConcurrency {
/** Set the number of tasks to process concurrently */
useConcurrency(concurrency: number): this;
/** Returns the current concurrency level */
concurrency(): number;
}Stop the promise pool execution manually from within processing or error handling functions.
Usage Examples:
import { PromisePool } from "@supercharge/promise-pool";
// Stop from within process handler
await PromisePool
.for(users)
.process(async (user, index, pool) => {
if (user.requiresManualReview) {
console.log("Manual review required, stopping pool");
pool.stop();
return null;
}
return await processUser(user);
});
// Stop based on results
let processedCount = 0;
const maxToProcess = 100;
await PromisePool
.for(items)
.process(async (item, index, pool) => {
processedCount++;
if (processedCount >= maxToProcess) {
console.log(`Reached limit of ${maxToProcess} items, stopping`);
pool.stop();
return null;
}
return await processItem(item);
});
// Stop from error handler
await PromisePool
.for(items)
.handleError(async (error, item, pool) => {
if (error instanceof CriticalSystemError) {
console.error("Critical system error detected, stopping all processing");
pool.stop();
return;
}
// Handle non-critical errors normally
console.warn(`Warning: ${error.message}`);
})
.process(async (item) => processItem(item));Check if the pool has been stopped during execution.
Usage Examples:
// Check pool state during processing
await PromisePool
.for(items)
.process(async (item, index, pool) => {
// Perform some preliminary work
const preprocessed = await preprocessItem(item);
// Check if pool was stopped by another task
if (pool.isStopped()) {
console.log("Pool was stopped, skipping remaining work");
return null;
}
// Continue with main processing
return await processItem(preprocessed);
});
// Conditional processing based on pool state
await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
if (pool.isStopped()) {
console.log("Pool is stopped, task should not have started");
}
})
.process(async (item, index, pool) => {
if (pool.isStopped()) {
return null; // Skip processing if pool is stopped
}
return await processItem(item);
});Stop processing when too many errors occur.
class CircuitBreaker {
private errorCount = 0;
private readonly threshold: number;
constructor(threshold: number = 5) {
this.threshold = threshold;
}
async process<T>(items: T[], processor: (item: T) => Promise<any>) {
return await PromisePool
.for(items)
.handleError((error, item, pool) => {
this.errorCount++;
if (this.errorCount >= this.threshold) {
console.error(`Circuit breaker triggered: ${this.errorCount} errors exceeded threshold of ${this.threshold}`);
pool.stop();
return;
}
console.warn(`Error ${this.errorCount}/${this.threshold}: ${error.message}`);
})
.process(processor);
}
}
// Usage
const circuitBreaker = new CircuitBreaker(3);
const { results } = await circuitBreaker.process(
items,
async (item) => processItem(item)
);Stop processing after a certain time limit.
async function processWithTimeout<T>(
items: T[],
processor: (item: T) => Promise<any>,
timeoutMs: number
) {
const startTime = Date.now();
return await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
const elapsed = Date.now() - startTime;
if (elapsed > timeoutMs) {
console.log(`Time limit of ${timeoutMs}ms exceeded, stopping pool`);
pool.stop();
}
})
.process(processor);
}
// Usage: Stop processing after 30 seconds
const { results } = await processWithTimeout(
items,
async (item) => processItem(item),
30000
);Stop when system resources are exhausted.
import { memoryUsage } from 'process';
async function processWithMemoryLimit<T>(
items: T[],
processor: (item: T) => Promise<any>,
maxMemoryMB: number = 500
) {
return await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
const memUsage = memoryUsage();
const currentMemoryMB = memUsage.heapUsed / 1024 / 1024;
if (currentMemoryMB > maxMemoryMB) {
console.log(`Memory usage (${currentMemoryMB.toFixed(1)}MB) exceeded limit (${maxMemoryMB}MB), stopping pool`);
pool.stop();
}
})
.process(processor);
}
// Usage
const { results } = await processWithMemoryLimit(
largeDataSet,
async (item) => processLargeItem(item),
1000 // 1GB limit
);Stop processing based on external conditions or user input.
class InteractiveProcessor {
private shouldStop = false;
constructor() {
// Listen for user input to stop processing
process.stdin.on('data', (data) => {
const input = data.toString().trim();
if (input === 'stop') {
console.log('User requested stop');
this.shouldStop = true;
}
});
}
async process<T>(items: T[], processor: (item: T) => Promise<any>) {
console.log('Processing started. Type "stop" to halt execution.');
return await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
if (this.shouldStop) {
console.log('Stopping due to user request');
pool.stop();
}
})
.process(processor);
}
}
// Usage
const interactiveProcessor = new InteractiveProcessor();
const { results } = await interactiveProcessor.process(
items,
async (item) => {
// Long-running processing
await new Promise(resolve => setTimeout(resolve, 1000));
return processItem(item);
}
);Stop processing and perform cleanup operations.
class GracefulProcessor {
private resources: any[] = [];
private isShuttingDown = false;
async processWithCleanup<T>(
items: T[],
processor: (item: T) => Promise<any>
) {
// Set up signal handlers for graceful shutdown
process.on('SIGINT', () => {
console.log('Received SIGINT, initiating graceful shutdown...');
this.isShuttingDown = true;
});
try {
const { results, errors } = await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
if (this.isShuttingDown && !pool.isStopped()) {
console.log('Graceful shutdown initiated, stopping pool');
pool.stop();
}
})
.onTaskFinished((item, pool) => {
// Track resources for cleanup
this.resources.push(item);
})
.process(async (item, index, pool) => {
if (pool.isStopped()) {
return null;
}
return await processor(item);
});
return { results, errors };
} finally {
// Always perform cleanup
await this.cleanup();
}
}
private async cleanup() {
console.log(`Cleaning up ${this.resources.length} resources...`);
for (const resource of this.resources) {
try {
await this.cleanupResource(resource);
} catch (error) {
console.error(`Error cleaning up resource:`, error);
}
}
console.log('Cleanup completed');
}
private async cleanupResource(resource: any) {
// Implement resource-specific cleanup
console.log(`Cleaning up resource: ${resource}`);
}
}
// Usage
const gracefulProcessor = new GracefulProcessor();
const { results, errors } = await gracefulProcessor.processWithCleanup(
items,
async (item) => processItem(item)
);Adjust concurrency during runtime based on system performance or conditions.
Note: Direct runtime concurrency adjustment is available through the internal executor, but the public PromisePool interface doesn't expose this directly. However, you can implement adaptive patterns:
// Pattern: Restart with different concurrency based on performance
async function adaptiveConcurrencyProcessing<T>(
items: T[],
processor: (item: T) => Promise<any>
) {
let concurrency = 5; // Start with moderate concurrency
let remainingItems = [...items];
const allResults: any[] = [];
while (remainingItems.length > 0) {
const startTime = Date.now();
const batchSize = Math.min(100, remainingItems.length);
const currentBatch = remainingItems.splice(0, batchSize);
console.log(`Processing batch of ${currentBatch.length} items with concurrency ${concurrency}`);
const { results } = await PromisePool
.withConcurrency(concurrency)
.for(currentBatch)
.process(processor);
allResults.push(...results);
// Adjust concurrency based on performance
const elapsed = Date.now() - startTime;
const itemsPerSecond = batchSize / (elapsed / 1000);
if (itemsPerSecond > 10 && concurrency < 20) {
concurrency += 2; // Increase concurrency if processing fast
console.log(`Increasing concurrency to ${concurrency}`);
} else if (itemsPerSecond < 2 && concurrency > 1) {
concurrency = Math.max(1, concurrency - 1); // Decrease if slow
console.log(`Decreasing concurrency to ${concurrency}`);
}
}
return allResults;
}
// Usage
const results = await adaptiveConcurrencyProcessing(
items,
async (item) => processItem(item)
);Install with Tessl CLI
npx tessl i tessl/npm-supercharge--promise-pool