Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.
—
Real-time progress monitoring with task lifecycle callbacks and comprehensive statistics for monitoring promise pool execution and building progress indicators.
Register callbacks to be notified when tasks start and finish processing.
/**
* Register a callback to be called when a task starts processing
* @param handler - Callback function receiving item and pool instance
* @returns PromisePool instance for chaining
*/
onTaskStarted(handler: OnProgressCallback<T>): PromisePool<T>;
/**
* Register a callback to be called when a task finishes processing
* @param handler - Callback function receiving item and pool instance
* @returns PromisePool instance for chaining
*/
onTaskFinished(handler: OnProgressCallback<T>): PromisePool<T>;
type OnProgressCallback<T> = (
item: T,
pool: Stoppable & Statistics<T> & UsesConcurrency
) => void;Key Features:
Usage Examples:
import { PromisePool } from "@supercharge/promise-pool";
// Basic progress tracking
await PromisePool
.for(users)
.onTaskStarted((item, pool) => {
console.log(`Started processing: ${item.name}`);
console.log(`Progress: ${pool.processedPercentage().toFixed(1)}%`);
console.log(`Active tasks: ${pool.activeTasksCount()}/${pool.concurrency()}`);
})
.onTaskFinished((item, pool) => {
console.log(`Finished processing: ${item.name}`);
console.log(`Completed: ${pool.processedCount()} items`);
})
.process(async (user) => await processUser(user));
// Multiple progress handlers
await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
// Handler 1: Update progress bar
updateProgressBar(pool.processedPercentage());
})
.onTaskStarted((item, pool) => {
// Handler 2: Log to console
console.log(`Processing ${item.id}...`);
})
.onTaskFinished((item, pool) => {
// Handler 1: Update status
updateStatus(`Processed ${pool.processedCount()} items`);
})
.onTaskFinished((item, pool) => {
// Handler 2: Track metrics
trackMetrics(item, pool);
})
.process(async (item) => processItem(item));Access real-time statistics about pool execution state.
/**
* Interface providing pool execution statistics
*/
interface Statistics<T> {
/** Returns the number of currently active tasks */
activeTasksCount(): number;
/** Returns the number of currently active tasks (deprecated - use activeTasksCount) */
activeTaskCount(): number;
/** Returns the list of processed items */
processedItems(): T[];
/** Returns the number of processed items */
processedCount(): number;
/** Returns the percentage progress of items that have been processed */
processedPercentage(): number;
}Usage Examples:
// Progress monitoring during task execution
await PromisePool
.for(largeDataSet)
.onTaskStarted((item, pool) => {
const stats = {
activeCount: pool.activeTasksCount(),
processedCount: pool.processedCount(),
processedPercentage: pool.processedPercentage(),
totalProcessed: pool.processedItems().length
};
console.log(`Pool Stats:`, stats);
})
.process(async (item) => processItem(item));
// Real-time progress updates
let progressInterval;
const { results } = await PromisePool
.for(items)
.onTaskStarted((item, pool) => {
// Start progress monitoring on first task
if (!progressInterval) {
progressInterval = setInterval(() => {
const progress = pool.processedPercentage();
const active = pool.activeTasksCount();
const processed = pool.processedCount();
console.log(`Progress: ${progress.toFixed(1)}% (${processed} completed, ${active} active)`);
}, 1000);
}
})
.onTaskFinished((item, pool) => {
// Clear progress monitoring when done
if (pool.processedPercentage() === 100) {
clearInterval(progressInterval);
console.log("Processing complete!");
}
})
.process(async (item) => processItem(item));Basic progress indication with percentage completion.
function createProgressBar(total: number) {
let current = 0;
return {
update: (processed: number) => {
current = processed;
const percentage = (current / total * 100).toFixed(1);
const filled = Math.floor(current / total * 50);
const empty = 50 - filled;
const bar = '█'.repeat(filled) + '░'.repeat(empty);
process.stdout.write(`\r[${bar}] ${percentage}% (${current}/${total})`);
},
complete: () => {
process.stdout.write('\n✓ Complete!\n');
}
};
}
const progressBar = createProgressBar(items.length);
await PromisePool
.for(items)
.onTaskFinished((item, pool) => {
progressBar.update(pool.processedCount());
if (pool.processedPercentage() === 100) {
progressBar.complete();
}
})
.process(async (item) => processItem(item));Comprehensive monitoring with multiple metrics.
class ProgressDashboard {
private startTime = Date.now();
private lastUpdate = Date.now();
update(item: any, pool: Statistics<any> & UsesConcurrency) {
const now = Date.now();
const elapsed = now - this.startTime;
const processed = pool.processedCount();
const percentage = pool.processedPercentage();
const rate = processed / (elapsed / 1000); // items per second
console.clear();
console.log('📊 Promise Pool Progress Dashboard');
console.log('─'.repeat(40));
console.log(`Progress: ${percentage.toFixed(1)}%`);
console.log(`Processed: ${processed} items`);
console.log(`Active: ${pool.activeTasksCount()}/${pool.concurrency()}`);
console.log(`Rate: ${rate.toFixed(2)} items/sec`);
console.log(`Elapsed: ${(elapsed / 1000).toFixed(1)}s`);
if (percentage > 0) {
const eta = (elapsed / percentage * (100 - percentage)) / 1000;
console.log(`ETA: ${eta.toFixed(1)}s`);
}
}
}
const dashboard = new ProgressDashboard();
await PromisePool
.for(items)
.onTaskFinished((item, pool) => {
dashboard.update(item, pool);
})
.process(async (item) => processItem(item));Emit events for external progress monitoring systems.
import EventEmitter from 'events';
class PoolProgressEmitter extends EventEmitter {
track<T>(pool: PromisePool<T>) {
return pool
.onTaskStarted((item, pool) => {
this.emit('taskStarted', {
item,
activeCount: pool.activeTasksCount(),
processedCount: pool.processedCount(),
percentage: pool.processedPercentage()
});
})
.onTaskFinished((item, pool) => {
this.emit('taskFinished', {
item,
activeCount: pool.activeTasksCount(),
processedCount: pool.processedCount(),
percentage: pool.processedPercentage()
});
if (pool.processedPercentage() === 100) {
this.emit('completed', {
totalProcessed: pool.processedCount(),
processedItems: pool.processedItems()
});
}
});
}
}
// Usage
const progressEmitter = new PoolProgressEmitter();
progressEmitter.on('taskStarted', (data) => {
console.log(`Task started: ${data.percentage.toFixed(1)}% complete`);
});
progressEmitter.on('taskFinished', (data) => {
console.log(`Task finished: ${data.processedCount} items processed`);
});
progressEmitter.on('completed', (data) => {
console.log(`All done! Processed ${data.totalProcessed} items`);
});
await progressEmitter
.track(PromisePool.for(items))
.process(async (item) => processItem(item));Combine progress tracking with error monitoring.
class ProgressWithErrors {
private errors: any[] = [];
private processed = 0;
trackProgress<T>(pool: PromisePool<T>) {
return pool
.onTaskFinished((item, pool) => {
this.processed++;
this.logProgress(pool);
})
.handleError((error, item, pool) => {
this.errors.push({ error, item });
console.error(`❌ Error processing ${item}: ${error.message}`);
// Continue processing other items
return;
});
}
private logProgress(pool: Statistics<any>) {
const percentage = pool.processedPercentage();
const errorRate = (this.errors.length / this.processed * 100).toFixed(1);
console.log(`✅ Progress: ${percentage.toFixed(1)}% (${this.errors.length} errors, ${errorRate}% error rate)`);
}
getSummary() {
return {
totalProcessed: this.processed,
totalErrors: this.errors.length,
errorRate: (this.errors.length / this.processed * 100).toFixed(1) + '%',
errors: this.errors
};
}
}
// Usage
const tracker = new ProgressWithErrors();
await tracker
.trackProgress(PromisePool.for(items))
.process(async (item) => {
// Some items may throw errors
if (Math.random() < 0.1) {
throw new Error(`Random error for ${item}`);
}
return processItem(item);
});
console.log('Final Summary:', tracker.getSummary());Install with Tessl CLI
npx tessl i tessl/npm-supercharge--promise-pool