Offload tasks to a pool of workers on node.js and in the browser
Overall
score
95%
Utilities for setting up worker processes, registering functions, and handling communication between the main thread and workers. This module provides the worker-side API for function registration, event emission, and cleanup handling.
Register functions to be available for execution from the main thread. This is used in dedicated worker scripts to expose methods that can be called via pool.exec() or worker proxies.
/**
* Register functions in a worker context
* @param {object} [methods] - Object containing functions to register
* @param {WorkerRegisterOptions} [options] - Registration configuration
*/
function worker(methods, options) -> voidUsage Examples:
// myWorker.js - dedicated worker script
const workerpool = require('workerpool');
// Register individual functions
workerpool.worker({
fibonacci: function(n) {
if (n < 2) return n;
return fibonacci(n - 2) + fibonacci(n - 1);
},
processData: function(data) {
return data.map(item => ({ ...item, processed: true }));
},
asyncTask: async function(config) {
await new Promise(resolve => setTimeout(resolve, config.delay));
return { status: 'completed', config };
}
});
// Register with cleanup handler
workerpool.worker({
longRunningTask: function(data) {
// Register cleanup for this specific task
this.addAbortListener(async () => {
console.log('Cleaning up long running task...');
// Perform cleanup operations
await cleanupResources();
});
return processLongRunningTask(data);
}
}, {
onTerminate: async function(code) {
console.log('Worker terminating with code:', code);
await cleanup();
},
abortListenerTimeout: 5000 // 5 second timeout for cleanup
});Emit events from workers to the main thread during task execution. This enables progress reporting, status updates, and real-time communication during long-running tasks.
/**
* Emit an event from worker to main thread
* @param {any} payload - Event data to send
*/
function workerEmit(payload) -> voidUsage Examples:
// In worker script
const workerpool = require('workerpool');
workerpool.worker({
processLargeDataset: function(dataset) {
const total = dataset.length;
const results = [];
for (let i = 0; i < total; i++) {
// Process item
const result = processItem(dataset[i]);
results.push(result);
// Emit progress updates
if (i % 100 === 0) {
workerpool.workerEmit({
type: 'progress',
completed: i,
total: total,
percentage: Math.round((i / total) * 100)
});
}
}
workerpool.workerEmit({
type: 'completed',
message: 'Processing finished successfully'
});
return results;
},
downloadAndProcess: async function(url) {
// Emit status updates
workerpool.workerEmit({ status: 'downloading', url });
const data = await fetch(url).then(r => r.json());
workerpool.workerEmit({
status: 'processing',
size: data.length
});
const processed = await processData(data);
workerpool.workerEmit({
status: 'complete',
resultSize: processed.length
});
return processed;
}
});
// In main thread
const result = await pool.exec('processLargeDataset', [data], {
on: function(event) {
if (event.type === 'progress') {
console.log(`Progress: ${event.percentage}%`);
} else if (event.type === 'completed') {
console.log(event.message);
}
}
});When functions are registered with workerpool.worker(), they receive access to a special this context containing worker utilities.
interface WorkerAPI {
/**
* Register cleanup listener for task cancellation/timeout
* @param {function} listener - Async function to handle cleanup
*/
addAbortListener(listener: () => Promise<void>) -> void
/**
* Emit event to main thread (same as workerEmit)
* @param {any} payload - Event data to send
*/
emit(payload: any) -> void
}Usage Examples:
workerpool.worker({
taskWithCleanup: function(config) {
let resources = null;
// Register cleanup handler
this.addAbortListener(async () => {
console.log('Task was cancelled, cleaning up...');
if (resources) {
await resources.close();
resources = null;
}
});
// Initialize resources
resources = initializeResources(config);
// Emit progress during execution
this.emit({ status: 'initialized', resourceCount: resources.length });
// Perform work
const result = performWork(resources);
// Cleanup on successful completion
await resources.close();
resources = null;
return result;
},
interruptibleTask: function(data) {
const processor = new DataProcessor(data);
// Handle graceful cancellation
this.addAbortListener(async () => {
await processor.gracefulStop();
});
// Process with periodic progress reports
return processor.process((progress) => {
this.emit({
type: 'progress',
current: progress.current,
total: progress.total,
eta: progress.estimatedTimeRemaining
});
});
}
});These methods are automatically available in any worker and can be called from the main thread.
/**
* Execute a stringified function (used for dynamic offloading)
* @param {string} fn - Stringified function code
* @param {array} args - Function arguments
* @returns {any} Function execution result
*/
function run(fn, args) -> any
/**
* Get list of available worker methods
* @returns {string[]} Array of method names
*/
function methods() -> string[]Usage Examples:
// These are called automatically by workerpool
// when using pool.exec() with functions or pool.proxy()
// Dynamic function execution (called internally)
// pool.exec(function(x) { return x * 2; }, [5])
// Internally calls: worker.run('function(x) { return x * 2; }', [5])
// Method discovery (called internally by proxy())
// const proxy = await pool.proxy();
// Internally calls: worker.methods() to discover available functionsinterface WorkerRegisterOptions {
/**
* Callback executed when worker is terminating
* Runs in worker context for cleanup
*/
onTerminate?: (code: number | undefined) => PromiseLike<void> | void
/**
* Timeout for abort listeners in milliseconds (default: 1000)
* If cleanup takes longer, worker will be forcefully terminated
*/
abortListenerTimeout?: number
}Configuration Examples:
// Database worker with connection cleanup
workerpool.worker({
queryDatabase: async function(query) {
const result = await db.query(query);
return result;
}
}, {
onTerminate: async function(code) {
console.log('Database worker terminating...');
if (db && db.connected) {
await db.close();
}
},
abortListenerTimeout: 10000 // Allow 10 seconds for cleanup
});
// File processing worker with temp file cleanup
workerpool.worker({
processFile: function(filePath) {
const tempFiles = [];
this.addAbortListener(async () => {
// Clean up temporary files
for (const tempFile of tempFiles) {
try {
await fs.unlink(tempFile);
} catch (err) {
console.warn('Failed to clean temp file:', err);
}
}
});
// Process file and create temp files
const result = processWithTempFiles(filePath, tempFiles);
// Clean up temp files on success
tempFiles.forEach(file => fs.unlinkSync(file));
return result;
}
}, {
abortListenerTimeout: 5000 // 5 second cleanup timeout
});// Workers should handle errors appropriately
workerpool.worker({
riskyOperation: function(data) {
try {
// Validate input
if (!data || typeof data !== 'object') {
throw new Error('Invalid input data');
}
// Perform operation
return performRiskyOperation(data);
} catch (error) {
// Error will be propagated to main thread
// and reject the Promise returned by pool.exec()
throw new Error(`Operation failed: ${error.message}`);
}
},
asyncRiskyOperation: async function(config) {
try {
const result = await performAsyncOperation(config);
return result;
} catch (error) {
// Async errors are also properly propagated
throw new Error(`Async operation failed: ${error.message}`);
}
}
});Install with Tessl CLI
npx tessl i tessl/npm-workerpoolevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10