CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-supercharge--promise-pool

Map-like, concurrent promise processing for Node.js with configurable concurrency limits, error handling, and advanced features.

Pending
Overview
Eval results
Files

pool-control.mddocs/

Pool Control

Manual pool control capabilities including stopping execution, checking pool state, and managing concurrency dynamically during runtime.

Capabilities

Stoppable Interface

Interface for manually controlling pool execution state.

/**
 * Interface for stoppable promise pools
 */
interface Stoppable {
  /** Stop the promise pool and prevent processing of remaining items */
  stop(): void;
  
  /** Determine whether the pool is marked as stopped */
  isStopped(): boolean;
}

UsesConcurrency Interface

Interface for managing concurrency settings at runtime.

/**
 * Interface for concurrency management
 */
interface UsesConcurrency {
  /** Set the number of tasks to process concurrently */
  useConcurrency(concurrency: number): this;
  
  /** Returns the current concurrency level */
  concurrency(): number;
}

Manual Pool Stopping

Stop the promise pool execution manually from within processing or error handling functions.

Usage Examples:

import { PromisePool } from "@supercharge/promise-pool";

// Stop from within process handler
await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    if (user.requiresManualReview) {
      console.log("Manual review required, stopping pool");
      pool.stop();
      return null;
    }
    
    return await processUser(user);
  });

// Stop based on results
let processedCount = 0;
const maxToProcess = 100;

await PromisePool
  .for(items)
  .process(async (item, index, pool) => {
    processedCount++;
    
    if (processedCount >= maxToProcess) {
      console.log(`Reached limit of ${maxToProcess} items, stopping`);
      pool.stop();
      return null;
    }
    
    return await processItem(item);
  });

// Stop from error handler
await PromisePool
  .for(items)
  .handleError(async (error, item, pool) => {
    if (error instanceof CriticalSystemError) {
      console.error("Critical system error detected, stopping all processing");
      pool.stop();
      return;
    }
    
    // Handle non-critical errors normally
    console.warn(`Warning: ${error.message}`);
  })
  .process(async (item) => processItem(item));

Pool State Checking

Check if the pool has been stopped during execution.

Usage Examples:

// Check pool state during processing
await PromisePool
  .for(items)
  .process(async (item, index, pool) => {
    // Perform some preliminary work
    const preprocessed = await preprocessItem(item);
    
    // Check if pool was stopped by another task
    if (pool.isStopped()) {
      console.log("Pool was stopped, skipping remaining work");
      return null;
    }
    
    // Continue with main processing
    return await processItem(preprocessed);
  });

// Conditional processing based on pool state
await PromisePool
  .for(items)
  .onTaskStarted((item, pool) => {
    if (pool.isStopped()) {
      console.log("Pool is stopped, task should not have started");
    }
  })
  .process(async (item, index, pool) => {
    if (pool.isStopped()) {
      return null; // Skip processing if pool is stopped
    }
    
    return await processItem(item);
  });

Pool Control Patterns

Pattern 1: Circuit Breaker

Stop processing when too many errors occur.

class CircuitBreaker {
  private errorCount = 0;
  private readonly threshold: number;
  
  constructor(threshold: number = 5) {
    this.threshold = threshold;
  }
  
  async process<T>(items: T[], processor: (item: T) => Promise<any>) {
    return await PromisePool
      .for(items)
      .handleError((error, item, pool) => {
        this.errorCount++;
        
        if (this.errorCount >= this.threshold) {
          console.error(`Circuit breaker triggered: ${this.errorCount} errors exceeded threshold of ${this.threshold}`);
          pool.stop();
          return;
        }
        
        console.warn(`Error ${this.errorCount}/${this.threshold}: ${error.message}`);
      })
      .process(processor);
  }
}

// Usage
const circuitBreaker = new CircuitBreaker(3);

const { results } = await circuitBreaker.process(
  items,
  async (item) => processItem(item)
);

Pattern 2: Time-Based Stopping

Stop processing after a certain time limit.

async function processWithTimeout<T>(
  items: T[], 
  processor: (item: T) => Promise<any>,
  timeoutMs: number
) {
  const startTime = Date.now();
  
  return await PromisePool
    .for(items)
    .onTaskStarted((item, pool) => {
      const elapsed = Date.now() - startTime;
      
      if (elapsed > timeoutMs) {
        console.log(`Time limit of ${timeoutMs}ms exceeded, stopping pool`);
        pool.stop();
      }
    })
    .process(processor);
}

// Usage: Stop processing after 30 seconds
const { results } = await processWithTimeout(
  items,
  async (item) => processItem(item),
  30000
);

Pattern 3: Resource-Based Stopping

Stop when system resources are exhausted.

import { memoryUsage } from 'process';

async function processWithMemoryLimit<T>(
  items: T[],
  processor: (item: T) => Promise<any>,
  maxMemoryMB: number = 500
) {
  return await PromisePool
    .for(items)
    .onTaskStarted((item, pool) => {
      const memUsage = memoryUsage();
      const currentMemoryMB = memUsage.heapUsed / 1024 / 1024;
      
      if (currentMemoryMB > maxMemoryMB) {
        console.log(`Memory usage (${currentMemoryMB.toFixed(1)}MB) exceeded limit (${maxMemoryMB}MB), stopping pool`);
        pool.stop();
      }
    })
    .process(processor);
}

// Usage
const { results } = await processWithMemoryLimit(
  largeDataSet,
  async (item) => processLargeItem(item),
  1000 // 1GB limit
);

Pattern 4: Conditional Stopping with User Input

Stop processing based on external conditions or user input.

class InteractiveProcessor {
  private shouldStop = false;
  
  constructor() {
    // Listen for user input to stop processing
    process.stdin.on('data', (data) => {
      const input = data.toString().trim();
      if (input === 'stop') {
        console.log('User requested stop');
        this.shouldStop = true;
      }
    });
  }
  
  async process<T>(items: T[], processor: (item: T) => Promise<any>) {
    console.log('Processing started. Type "stop" to halt execution.');
    
    return await PromisePool
      .for(items)
      .onTaskStarted((item, pool) => {
        if (this.shouldStop) {
          console.log('Stopping due to user request');
          pool.stop();
        }
      })
      .process(processor);
  }
}

// Usage
const interactiveProcessor = new InteractiveProcessor();

const { results } = await interactiveProcessor.process(
  items,
  async (item) => {
    // Long-running processing
    await new Promise(resolve => setTimeout(resolve, 1000));
    return processItem(item);
  }
);

Pattern 5: Graceful Shutdown with Cleanup

Stop processing and perform cleanup operations.

class GracefulProcessor {
  private resources: any[] = [];
  private isShuttingDown = false;
  
  async processWithCleanup<T>(
    items: T[],
    processor: (item: T) => Promise<any>
  ) {
    // Set up signal handlers for graceful shutdown
    process.on('SIGINT', () => {
      console.log('Received SIGINT, initiating graceful shutdown...');
      this.isShuttingDown = true;
    });
    
    try {
      const { results, errors } = await PromisePool
        .for(items)
        .onTaskStarted((item, pool) => {
          if (this.isShuttingDown && !pool.isStopped()) {
            console.log('Graceful shutdown initiated, stopping pool');
            pool.stop();
          }
        })
        .onTaskFinished((item, pool) => {
          // Track resources for cleanup
          this.resources.push(item);
        })
        .process(async (item, index, pool) => {
          if (pool.isStopped()) {
            return null;
          }
          
          return await processor(item);
        });
      
      return { results, errors };
    } finally {
      // Always perform cleanup
      await this.cleanup();
    }
  }
  
  private async cleanup() {
    console.log(`Cleaning up ${this.resources.length} resources...`);
    
    for (const resource of this.resources) {
      try {
        await this.cleanupResource(resource);
      } catch (error) {
        console.error(`Error cleaning up resource:`, error);
      }
    }
    
    console.log('Cleanup completed');
  }
  
  private async cleanupResource(resource: any) {
    // Implement resource-specific cleanup
    console.log(`Cleaning up resource: ${resource}`);
  }
}

// Usage
const gracefulProcessor = new GracefulProcessor();

const { results, errors } = await gracefulProcessor.processWithCleanup(
  items,
  async (item) => processItem(item)
);

Dynamic Concurrency Control

Adjust concurrency during runtime based on system performance or conditions.

Note: Direct runtime concurrency adjustment is available through the internal executor, but the public PromisePool interface doesn't expose this directly. However, you can implement adaptive patterns:

// Pattern: Restart with different concurrency based on performance
async function adaptiveConcurrencyProcessing<T>(
  items: T[],
  processor: (item: T) => Promise<any>
) {
  let concurrency = 5; // Start with moderate concurrency
  let remainingItems = [...items];
  const allResults: any[] = [];
  
  while (remainingItems.length > 0) {
    const startTime = Date.now();
    const batchSize = Math.min(100, remainingItems.length);
    const currentBatch = remainingItems.splice(0, batchSize);
    
    console.log(`Processing batch of ${currentBatch.length} items with concurrency ${concurrency}`);
    
    const { results } = await PromisePool
      .withConcurrency(concurrency)
      .for(currentBatch)
      .process(processor);
    
    allResults.push(...results);
    
    // Adjust concurrency based on performance
    const elapsed = Date.now() - startTime;
    const itemsPerSecond = batchSize / (elapsed / 1000);
    
    if (itemsPerSecond > 10 && concurrency < 20) {
      concurrency += 2; // Increase concurrency if processing fast
      console.log(`Increasing concurrency to ${concurrency}`);
    } else if (itemsPerSecond < 2 && concurrency > 1) {
      concurrency = Math.max(1, concurrency - 1); // Decrease if slow
      console.log(`Decreasing concurrency to ${concurrency}`);
    }
  }
  
  return allResults;
}

// Usage
const results = await adaptiveConcurrencyProcessing(
  items,
  async (item) => processItem(item)
);

Install with Tessl CLI

npx tessl i tessl/npm-supercharge--promise-pool

docs

core-pool-management.md

error-handling.md

index.md

pool-control.md

progress-tracking.md

tile.json