A fast, efficient Node.js Worker Thread Pool implementation
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.
Main worker thread pool class that extends EventEmitterAsyncResource for full async tracking support.
/**
* Main worker thread pool class
* @extends EventEmitterAsyncResource
* @template T - Task input type
* @template R - Task result type
*/
class Piscina<T = any, R = any> extends EventEmitterAsyncResource {
/**
* Create a new worker thread pool
* @param options - Pool configuration options
*/
constructor(options?: Options);
/**
* Execute a task in the worker pool
* @param task - Task data to pass to worker
* @param options - Task execution options
* @returns Promise resolving to task result
*/
run(task: T, options?: RunOptions): Promise<R>;
/**
* Gracefully close the pool, waiting for running tasks to complete
* @param options - Close operation options
* @returns Promise resolving when pool is closed
*/
close(options?: CloseOptions): Promise<void>;
/**
* Forcefully destroy the pool, terminating all workers immediately
* @returns Promise resolving when pool is destroyed
*/
destroy(): Promise<void>;
/**
* Synchronous disposal method for 'using' keyword support
*/
[Symbol.dispose](): void;
/**
* Asynchronous disposal method for 'await using' keyword support
*/
[Symbol.asyncDispose](): Promise<void>;
// Pool properties
readonly maxThreads: number;
readonly minThreads: number;
readonly options: FilledOptions;
readonly threads: Worker[];
readonly queueSize: number;
readonly completed: number;
readonly histogram: PiscinaHistogram;
readonly utilization: number;
readonly duration: number;
readonly needsDrain: boolean;
}Usage Examples:
import Piscina from "piscina";
import { resolve } from "path";
// Basic pool with default settings
const pool = new Piscina({
filename: resolve(__dirname, "worker.js")
});
// Advanced pool configuration
const advancedPool = new Piscina({
filename: resolve(__dirname, "worker.js"),
minThreads: 2,
maxThreads: 8,
idleTimeout: 5000,
maxQueue: 50,
concurrentTasksPerWorker: 2,
resourceLimits: {
maxOldGenerationSizeMb: 100,
maxYoungGenerationSizeMb: 50
}
});
// Execute tasks
const result = await pool.run({ operation: "add", values: [1, 2, 3] });
console.log("Task completed:", result);
// Graceful shutdown
await pool.close();Comprehensive configuration interface for customizing pool behavior.
/**
* Pool configuration options
*/
interface Options {
/** Worker script filename - can be JavaScript, TypeScript, or ESM */
filename?: string | null;
/** Pool name for identification and debugging */
name?: string;
/** Minimum number of worker threads to maintain */
minThreads?: number;
/** Maximum number of worker threads allowed */
maxThreads?: number;
/** Milliseconds before idle workers are terminated (0 = never, Infinity = never) */
idleTimeout?: number;
/** Maximum queued tasks ('auto' = maxThreads^2, number, or Infinity) */
maxQueue?: number | 'auto';
/** Number of concurrent tasks each worker can handle */
concurrentTasksPerWorker?: number;
/** Shared memory communication mode */
atomics?: 'sync' | 'async' | 'disabled';
/** Resource limits for worker threads */
resourceLimits?: ResourceLimits;
/** Command line arguments passed to worker */
argv?: string[];
/** Node.js execution arguments passed to worker */
execArgv?: string[];
/** Environment variables for workers */
env?: EnvSpecifier;
/** Data passed to worker on startup */
workerData?: any;
/** Custom task queue implementation */
taskQueue?: TaskQueue;
/** Process priority adjustment (Unix only) */
niceIncrement?: number;
/** Track unmanaged file descriptors */
trackUnmanagedFds?: boolean;
/** Timeout for close operations in milliseconds */
closeTimeout?: number;
/** Enable performance timing collection */
recordTiming?: boolean;
/** Custom load balancer function */
loadBalancer?: PiscinaLoadBalancer;
/** Enable per-worker histograms */
workerHistogram?: boolean;
}
/**
* Filled options with all defaults applied
*/
interface FilledOptions extends Options {
filename: string | null;
name: string;
minThreads: number;
maxThreads: number;
idleTimeout: number;
maxQueue: number;
concurrentTasksPerWorker: number;
atomics: 'sync' | 'async' | 'disabled';
taskQueue: TaskQueue;
niceIncrement: number;
closeTimeout: number;
recordTiming: boolean;
workerHistogram: boolean;
}Options for individual task execution with support for transfers and cancellation.
/**
* Options for individual task execution
*/
interface RunOptions {
/** Objects to transfer ownership to worker (for performance) */
transferList?: TransferList;
/** Override worker filename for this task */
filename?: string | null;
/** Abort signal for task cancellation */
signal?: AbortSignalAny | null;
/** Override task name for this execution */
name?: string | null;
}Usage Examples:
import Piscina from "piscina";
const pool = new Piscina({
filename: resolve(__dirname, "worker.js")
});
// Task with transferable objects
const buffer = new ArrayBuffer(1024);
const result = await pool.run(
{ data: buffer, operation: "process" },
{ transferList: [buffer] }
);
// Task with cancellation
const controller = new AbortController();
const taskPromise = pool.run(
{ longRunningTask: true },
{ signal: controller.signal }
);
// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);
try {
const result = await taskPromise;
} catch (error) {
if (error.name === 'AbortError') {
console.log('Task was cancelled');
}
}Configuration for graceful or forced pool shutdown.
/**
* Options for pool close operations
*/
interface CloseOptions {
/** Force immediate termination without waiting for tasks */
force?: boolean;
}Usage Examples:
// Graceful shutdown - wait for running tasks
await pool.close();
// Forced shutdown - terminate immediately
await pool.close({ force: true });
// Using disposal syntax (Node.js 20+)
{
using pool = new Piscina({ filename: "worker.js" });
await pool.run(task);
// Automatically closed when leaving scope
}
// Async disposal
{
await using pool = new Piscina({ filename: "worker.js" });
await pool.run(task);
// Automatically closed when leaving scope
}Real-time pool status and configuration access.
class Piscina {
/** Maximum number of worker threads */
readonly maxThreads: number;
/** Minimum number of worker threads */
readonly minThreads: number;
/** Complete pool configuration with defaults applied */
readonly options: FilledOptions;
/** Array of all worker thread instances */
readonly threads: Worker[];
/** Current number of queued tasks */
readonly queueSize: number;
/** Total number of completed tasks */
readonly completed: number;
/** Performance histogram data */
readonly histogram: PiscinaHistogram;
/** Pool utilization as percentage (0-1) */
readonly utilization: number;
/** Pool runtime in milliseconds */
readonly duration: number;
/** Whether pool needs draining (at capacity) */
readonly needsDrain: boolean;
}Usage Examples:
const pool = new Piscina({
filename: "worker.js",
minThreads: 2,
maxThreads: 8
});
// Monitor pool status
console.log(`Pool has ${pool.threads.length} active threads`);
console.log(`Queue size: ${pool.queueSize}`);
console.log(`Completed tasks: ${pool.completed}`);
console.log(`Utilization: ${(pool.utilization * 100).toFixed(2)}%`);
console.log(`Running for: ${pool.duration}ms`);
// Check if pool needs draining
if (pool.needsDrain) {
console.log("Pool is at capacity, consider adding more threads");
}
// Access configuration
console.log(`Max threads: ${pool.maxThreads}`);
console.log(`Idle timeout: ${pool.options.idleTimeout}`);The Piscina class extends EventEmitterAsyncResource and emits the following events:
class Piscina extends EventEmitterAsyncResource {
// Event: 'workerCreate' - Emitted when a new worker is created
on(event: 'workerCreate', listener: (worker: PiscinaWorker) => void): this;
// Event: 'workerDestroy' - Emitted when a worker is destroyed
on(event: 'workerDestroy', listener: (worker: PiscinaWorker) => void): this;
// Event: 'needsDrain' - Emitted when pool is at capacity
on(event: 'needsDrain', listener: () => void): this;
// Event: 'drain' - Emitted when pool capacity is available again
on(event: 'drain', listener: () => void): this;
// Event: 'message' - Emitted for messages from workers
on(event: 'message', listener: (message: any) => void): this;
// Event: 'error' - Emitted for pool-level errors
on(event: 'error', listener: (error: Error) => void): this;
// Event: 'close' - Emitted when pool is closed
on(event: 'close', listener: () => void): this;
}Usage Examples:
const pool = new Piscina({ filename: "worker.js" });
// Monitor worker lifecycle
pool.on('workerCreate', (worker) => {
console.log(`Worker ${worker.id} created`);
});
pool.on('workerDestroy', (worker) => {
console.log(`Worker ${worker.id} destroyed`);
});
// Monitor pool capacity
pool.on('needsDrain', () => {
console.log('Pool at capacity - consider draining');
});
pool.on('drain', () => {
console.log('Pool capacity available');
});
// Handle errors
pool.on('error', (error) => {
console.error('Pool error:', error);
});Install with Tessl CLI
npx tessl i tessl/npm-piscina