CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-supercharge--promise-pool

Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.

Pending
Overview
Eval results
Files

core-pool-management.mddocs/

Core Pool Management

Primary promise pool functionality for creating, configuring, and executing concurrent tasks with full type safety and flexible configuration options.

Capabilities

PromisePool Class

Main class for creating and managing promise pools with fluent interface pattern.

/**
 * Main promise pool class with fluent interface for concurrent task processing
 */
class PromisePool<T, ShouldUseCorrespondingResults extends boolean = false> {
  constructor(items?: SomeIterable<T>);
  
  // Static factory methods
  static withConcurrency(concurrency: number): PromisePool<unknown>;
  static withTaskTimeout(timeout: number): PromisePool<unknown>;
  static for<T>(items: SomeIterable<T>): PromisePool<T>;
  
  // Instance configuration methods
  withConcurrency(concurrency: number): PromisePool<T>;
  withTaskTimeout(timeout: number): PromisePool<T>;
  for<ItemType>(items: SomeIterable<ItemType>): PromisePool<ItemType>;
  useCorrespondingResults(): PromisePool<T, true>;
  
  // Execution method
  process<ResultType, ErrorType = any>(
    callback: ProcessHandler<T, ResultType>
  ): Promise<ReturnValue<T, ShouldUseCorrespondingResults extends true ? ResultType | symbol : ResultType, ErrorType>>;
  
  // Static symbols for corresponding results
  static readonly notRun: symbol;
  static readonly failed: symbol;
}

Usage Examples:

import { PromisePool } from "@supercharge/promise-pool";

// Basic usage with default concurrency (10)
const { results } = await PromisePool
  .for([1, 2, 3, 4, 5])
  .process(async (num) => num * 2);

// Custom concurrency
const { results } = await PromisePool
  .withConcurrency(3)
  .for(urls)
  .process(async (url) => fetch(url));

// With task timeout
const { results } = await PromisePool
  .for(tasks)
  .withTaskTimeout(5000) // 5 seconds per task
  .process(async (task) => processTask(task));

Concurrency Configuration

Configure the number of tasks that run concurrently.

/**
 * Set the number of tasks to process concurrently
 * @param concurrency - Number of concurrent tasks (must be >= 1)
 * @returns PromisePool instance for chaining
 */
withConcurrency(concurrency: number): PromisePool<T>;

/**
 * Static method to create pool with specified concurrency
 * @param concurrency - Number of concurrent tasks
 * @returns New PromisePool instance
 */
static withConcurrency(concurrency: number): PromisePool<unknown>;

Task Timeout Configuration

Configure timeout for individual tasks.

/**
 * Set the timeout in milliseconds for each task
 * @param timeout - Timeout in milliseconds (per task, not total)
 * @returns PromisePool instance for chaining
 */
withTaskTimeout(timeout: number): PromisePool<T>;

/**
 * Static method to create pool with task timeout
 * @param timeout - Timeout in milliseconds
 * @returns New PromisePool instance
 */
static withTaskTimeout(timeout: number): PromisePool<unknown>;

Usage Example:

// Each task must complete within 2 seconds
const { results } = await PromisePool
  .for(items)
  .withTaskTimeout(2000)
  .process(async (item) => {
    // This task will timeout if it takes longer than 2 seconds
    return await processItem(item);
  });

Items Configuration

Set the items to be processed by the pool.

/**
 * Set the items to be processed in the promise pool
 * @param items - Array, Iterable, or AsyncIterable of items
 * @returns New PromisePool instance with specified items
 */
for<ItemType>(items: SomeIterable<ItemType>): PromisePool<ItemType>;

/**
 * Static method to create pool for specified items
 * @param items - Items to process
 * @returns New PromisePool instance
 */
static for<T>(items: SomeIterable<T>): PromisePool<T>;

type SomeIterable<T> = T[] | Iterable<T> | AsyncIterable<T>;

Usage Examples:

// With arrays
await PromisePool.for([1, 2, 3]).process(async (num) => num * 2);

// With generators/iterables
function* numberGenerator() {
  for (let i = 0; i < 10; i++) yield i;
}
await PromisePool.for(numberGenerator()).process(async (num) => num * 2);

// With async iterables
async function* asyncGenerator() {
  for (let i = 0; i < 5; i++) {
    await new Promise(resolve => setTimeout(resolve, 100));
    yield i;
  }
}
await PromisePool.for(asyncGenerator()).process(async (num) => num * 2);

Corresponding Results

Enable result-source correspondence to maintain order between input items and results.

/**
 * Configure results to correspond with source items by position
 * @returns PromisePool with corresponding results enabled
 */
useCorrespondingResults(): PromisePool<T, true>;

When using corresponding results, the results array will contain:

  • The actual result value for successful tasks
  • PromisePool.notRun symbol for tasks that didn't execute
  • PromisePool.failed symbol for tasks that failed

Usage Example:

const { results } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(2)
  .useCorrespondingResults()
  .process(async (num, index) => {
    if (num === 2) throw new Error("Skip 2");
    return num * 10;
  });

// Results array will be: [10, Symbol(failed), 30]
// Corresponding to input: [1, 2, 3]

// Filter for successful results only
const successfulResults = results.filter(result => 
  result !== PromisePool.failed && result !== PromisePool.notRun
);

Process Execution

Execute the promise pool with a processing function.

/**
 * Process all items through the provided callback function
 * @param callback - Function to process each item
 * @returns Promise resolving to results and errors
 */
process<ResultType, ErrorType = any>(
  callback: ProcessHandler<T, ResultType>
): Promise<ReturnValue<T, ShouldUseCorrespondingResults extends true ? ResultType | symbol : ResultType, ErrorType>>;

type ProcessHandler<T, R> = (
  item: T, 
  index: number, 
  pool: Stoppable & UsesConcurrency
) => Promise<R> | R;

interface ReturnValue<T, R, E = any> {
  results: R[];
  errors: Array<PromisePoolError<T, E>>;
}

The processing callback receives:

  • item: The current item being processed
  • index: The index of the item in the source array/iterable
  • pool: Pool instance with control methods (stop(), isStopped(), etc.)

Usage Examples:

// Basic processing
const { results, errors } = await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    // Access to current item, index, and pool control
    if (user.invalid) {
      pool.stop(); // Stop processing remaining items
      return null;
    }
    return await processUser(user);
  });

// Handle errors separately
if (errors.length > 0) {
  console.log("Errors occurred:", errors);
  errors.forEach(error => {
    console.log(`Error processing item ${error.item}:`, error.message);
  });
}

Install with Tessl CLI

npx tessl i tessl/npm-supercharge--promise-pool

docs

core-pool-management.md

error-handling.md

index.md

pool-control.md

progress-tracking.md

tile.json