CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-workerpool

Offload tasks to a pool of workers on node.js and in the browser

Overall
score

95%

Overview
Eval results
Files

worker-context.mddocs/

Worker Context

Utilities for setting up worker processes, registering functions, and handling communication between the main thread and workers. This module provides the worker-side API for function registration, event emission, and cleanup handling.

Capabilities

Worker Function Registration

Register functions to be available for execution from the main thread. This is used in dedicated worker scripts to expose methods that can be called via pool.exec() or worker proxies.

/**
 * Register functions in a worker context
 * @param {object} [methods] - Object containing functions to register
 * @param {WorkerRegisterOptions} [options] - Registration configuration
 */
function worker(methods, options) -> void

Usage Examples:

// myWorker.js - dedicated worker script
const workerpool = require('workerpool');

// Register individual functions
workerpool.worker({
  fibonacci: function(n) {
    if (n < 2) return n;
    return fibonacci(n - 2) + fibonacci(n - 1);
  },
  
  processData: function(data) {
    return data.map(item => ({ ...item, processed: true }));
  },
  
  asyncTask: async function(config) {
    await new Promise(resolve => setTimeout(resolve, config.delay));
    return { status: 'completed', config };
  }
});

// Register with cleanup handler
workerpool.worker({
  longRunningTask: function(data) {
    // Register cleanup for this specific task
    this.addAbortListener(async () => {
      console.log('Cleaning up long running task...');
      // Perform cleanup operations
      await cleanupResources();
    });
    
    return processLongRunningTask(data);
  }
}, {
  onTerminate: async function(code) {
    console.log('Worker terminating with code:', code);
    await cleanup();
  },
  abortListenerTimeout: 5000  // 5 second timeout for cleanup
});

Worker Event Emission

Emit events from workers to the main thread during task execution. This enables progress reporting, status updates, and real-time communication during long-running tasks.

/**
 * Emit an event from worker to main thread
 * @param {any} payload - Event data to send
 */
function workerEmit(payload) -> void

Usage Examples:

// In worker script
const workerpool = require('workerpool');

workerpool.worker({
  processLargeDataset: function(dataset) {
    const total = dataset.length;
    const results = [];
    
    for (let i = 0; i < total; i++) {
      // Process item
      const result = processItem(dataset[i]);
      results.push(result);
      
      // Emit progress updates
      if (i % 100 === 0) {
        workerpool.workerEmit({
          type: 'progress',
          completed: i,
          total: total,
          percentage: Math.round((i / total) * 100)
        });
      }
    }
    
    workerpool.workerEmit({
      type: 'completed',
      message: 'Processing finished successfully'
    });
    
    return results;
  },
  
  downloadAndProcess: async function(url) {
    // Emit status updates
    workerpool.workerEmit({ status: 'downloading', url });
    
    const data = await fetch(url).then(r => r.json());
    
    workerpool.workerEmit({ 
      status: 'processing', 
      size: data.length 
    });
    
    const processed = await processData(data);
    
    workerpool.workerEmit({ 
      status: 'complete',
      resultSize: processed.length 
    });
    
    return processed;
  }
});

// In main thread
const result = await pool.exec('processLargeDataset', [data], {
  on: function(event) {
    if (event.type === 'progress') {
      console.log(`Progress: ${event.percentage}%`);
    } else if (event.type === 'completed') {
      console.log(event.message);
    }
  }
});

Worker API (Available within worker functions)

When functions are registered with workerpool.worker(), they receive access to a special this context containing worker utilities.

interface WorkerAPI {
  /**
   * Register cleanup listener for task cancellation/timeout
   * @param {function} listener - Async function to handle cleanup
   */
  addAbortListener(listener: () => Promise<void>) -> void
  
  /**
   * Emit event to main thread (same as workerEmit)
   * @param {any} payload - Event data to send
   */
  emit(payload: any) -> void
}

Usage Examples:

workerpool.worker({
  taskWithCleanup: function(config) {
    let resources = null;
    
    // Register cleanup handler
    this.addAbortListener(async () => {
      console.log('Task was cancelled, cleaning up...');
      if (resources) {
        await resources.close();
        resources = null;
      }
    });
    
    // Initialize resources
    resources = initializeResources(config);
    
    // Emit progress during execution
    this.emit({ status: 'initialized', resourceCount: resources.length });
    
    // Perform work
    const result = performWork(resources);
    
    // Cleanup on successful completion
    await resources.close();
    resources = null;
    
    return result;
  },
  
  interruptibleTask: function(data) {
    const processor = new DataProcessor(data);
    
    // Handle graceful cancellation
    this.addAbortListener(async () => {
      await processor.gracefulStop();
    });
    
    // Process with periodic progress reports
    return processor.process((progress) => {
      this.emit({ 
        type: 'progress',
        current: progress.current,
        total: progress.total,
        eta: progress.estimatedTimeRemaining
      });
    });
  }
});

Built-in Worker Methods

These methods are automatically available in any worker and can be called from the main thread.

/**
 * Execute a stringified function (used for dynamic offloading)
 * @param {string} fn - Stringified function code
 * @param {array} args - Function arguments
 * @returns {any} Function execution result
 */
function run(fn, args) -> any

/**
 * Get list of available worker methods
 * @returns {string[]} Array of method names
 */
function methods() -> string[]

Usage Examples:

// These are called automatically by workerpool
// when using pool.exec() with functions or pool.proxy()

// Dynamic function execution (called internally)
// pool.exec(function(x) { return x * 2; }, [5])
// Internally calls: worker.run('function(x) { return x * 2; }', [5])

// Method discovery (called internally by proxy())
// const proxy = await pool.proxy();
// Internally calls: worker.methods() to discover available functions

Worker Registration Options

interface WorkerRegisterOptions {
  /**
   * Callback executed when worker is terminating
   * Runs in worker context for cleanup
   */
  onTerminate?: (code: number | undefined) => PromiseLike<void> | void
  
  /**
   * Timeout for abort listeners in milliseconds (default: 1000)
   * If cleanup takes longer, worker will be forcefully terminated
   */
  abortListenerTimeout?: number
}

Configuration Examples:

// Database worker with connection cleanup
workerpool.worker({
  queryDatabase: async function(query) {
    const result = await db.query(query);
    return result;
  }
}, {
  onTerminate: async function(code) {
    console.log('Database worker terminating...');
    if (db && db.connected) {
      await db.close();
    }
  },
  abortListenerTimeout: 10000  // Allow 10 seconds for cleanup
});

// File processing worker with temp file cleanup
workerpool.worker({
  processFile: function(filePath) {
    const tempFiles = [];
    
    this.addAbortListener(async () => {
      // Clean up temporary files
      for (const tempFile of tempFiles) {
        try {
          await fs.unlink(tempFile);
        } catch (err) {
          console.warn('Failed to clean temp file:', err);
        }
      }
    });
    
    // Process file and create temp files
    const result = processWithTempFiles(filePath, tempFiles);
    
    // Clean up temp files on success
    tempFiles.forEach(file => fs.unlinkSync(file));
    
    return result;
  }
}, {
  abortListenerTimeout: 5000  // 5 second cleanup timeout
});

Error Handling in Workers

// Workers should handle errors appropriately
workerpool.worker({
  riskyOperation: function(data) {
    try {
      // Validate input
      if (!data || typeof data !== 'object') {
        throw new Error('Invalid input data');
      }
      
      // Perform operation
      return performRiskyOperation(data);
      
    } catch (error) {
      // Error will be propagated to main thread
      // and reject the Promise returned by pool.exec()
      throw new Error(`Operation failed: ${error.message}`);
    }
  },
  
  asyncRiskyOperation: async function(config) {
    try {
      const result = await performAsyncOperation(config);
      return result;
    } catch (error) {
      // Async errors are also properly propagated
      throw new Error(`Async operation failed: ${error.message}`);
    }
  }
});

Install with Tessl CLI

npx tessl i tessl/npm-workerpool

docs

index.md

pool-management.md

promise-transfer.md

worker-context.md

tile.json