A fast, efficient Node.js Worker Thread Pool implementation
npx @tessl/cli install tessl/npm-piscina@5.1.0Piscina is a fast, efficient Node.js Worker Thread Pool implementation that enables parallel processing and CPU-intensive task distribution across multiple worker threads. It provides comprehensive features including:
npm install piscinaimport Piscina from "piscina";
// Or for named imports
import { Piscina, move, isWorkerThread, workerData } from "piscina";For CommonJS:
const Piscina = require("piscina");
// Or destructured
const { Piscina, move, isWorkerThread, workerData } = require("piscina");import Piscina from "piscina";
import { resolve } from "path";
// Create worker pool
const piscina = new Piscina({
filename: resolve(__dirname, "worker.js"),
maxThreads: 4
});
// Run tasks
const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Output from worker
// Graceful shutdown
await piscina.close();Worker file (worker.js):
module.exports = ({ a, b }) => {
return a + b;
};Piscina is built around several key components:
Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.
import Piscina from "piscina";
import { EventEmitterAsyncResource } from "node:async_hooks";
import { Worker } from "node:worker_threads";
class Piscina<T = any, R = any> extends EventEmitterAsyncResource {
constructor(options?: Options);
// Core methods
run(task: T, options?: RunOptions): Promise<R>;
close(options?: CloseOptions): Promise<void>;
destroy(): Promise<void>;
// Properties
readonly maxThreads: number;
readonly minThreads: number;
readonly options: FilledOptions;
readonly threads: Worker[];
readonly queueSize: number;
readonly completed: number;
readonly histogram: PiscinaHistogram;
readonly utilization: number;
readonly duration: number;
readonly needsDrain: boolean;
}
interface FilledOptions extends Required<Options> {
filename: string | null;
name: string;
minThreads: number;
maxThreads: number;
idleTimeout: number;
maxQueue: number;
concurrentTasksPerWorker: number;
atomics: 'sync' | 'async' | 'disabled';
taskQueue: TaskQueue;
niceIncrement: number;
closeTimeout: number;
recordTiming: boolean;
workerHistogram: boolean;
}
interface Options {
filename?: string | null;
name?: string;
minThreads?: number;
maxThreads?: number;
idleTimeout?: number;
maxQueue?: number | 'auto';
concurrentTasksPerWorker?: number;
atomics?: 'sync' | 'async' | 'disabled';
resourceLimits?: ResourceLimits;
argv?: string[];
execArgv?: string[];
env?: EnvSpecifier;
workerData?: any;
taskQueue?: TaskQueue;
niceIncrement?: number;
trackUnmanagedFds?: boolean;
closeTimeout?: number;
recordTiming?: boolean;
loadBalancer?: PiscinaLoadBalancer;
workerHistogram?: boolean;
}
interface RunOptions {
transferList?: TransferList;
filename?: string | null;
signal?: AbortSignalAny | null;
name?: string | null;
}
interface CloseOptions {
force?: boolean;
}Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.
import { TaskQueue, ArrayTaskQueue, FixedQueue, isTaskQueue } from "piscina";
interface Task {
readonly taskId: number;
readonly filename: string;
readonly name: string;
readonly created: number;
readonly isAbortable: boolean;
}
interface TaskQueue {
readonly size: number;
shift(): Task | null;
remove(task: Task): void;
push(task: Task): void;
}
class ArrayTaskQueue implements TaskQueue {
readonly size: number;
shift(): Task | null;
push(task: Task): void;
remove(task: Task): void;
}
class FixedQueue implements TaskQueue {
readonly size: number;
shift(): Task | null;
push(task: Task): void;
remove(task: Task): void;
}
function isTaskQueue(value: TaskQueue): boolean;Intelligent task distribution across available workers with built-in least-busy balancer and support for custom balancing strategies.
import { PiscinaLoadBalancer, LeastBusyBalancer, PiscinaTask, PiscinaWorker } from "piscina";
type PiscinaLoadBalancer = (
task: PiscinaTask,
workers: PiscinaWorker[]
) => PiscinaWorker | null;
function LeastBusyBalancer(
opts: LeastBusyBalancerOptions
): PiscinaLoadBalancer;
interface LeastBusyBalancerOptions {
maximumUsage: number;
}
interface PiscinaWorker {
readonly id: number;
readonly currentUsage: number;
readonly isRunningAbortableTask: boolean;
readonly histogram: PiscinaHistogramSummary | null;
readonly terminating: boolean;
readonly destroyed: boolean;
}Built-in performance metrics collection with detailed histogram statistics for runtime and wait times, including percentile calculations.
import { PiscinaHistogram, PiscinaHistogramSummary } from "piscina";
interface PiscinaHistogram {
readonly runTime: PiscinaHistogramSummary;
readonly waitTime: PiscinaHistogramSummary;
resetRunTime(): void;
resetWaitTime(): void;
}
interface PiscinaHistogramSummary {
readonly average: number;
readonly mean: number;
readonly stddev: number;
readonly min: number;
readonly max: number;
readonly p0_001: number;
readonly p0_01: number;
readonly p0_1: number;
readonly p1: number;
readonly p2_5: number;
readonly p10: number;
readonly p25: number;
readonly p50: number;
readonly p75: number;
readonly p90: number;
readonly p97_5: number;
readonly p99: number;
readonly p99_9: number;
readonly p99_99: number;
readonly p99_999: number;
}Efficient data transfer mechanisms using structured cloning and transferable objects for optimal performance with large data sets.
import {
move,
Transferable,
transferableSymbol,
valueSymbol,
isTransferable,
isMovable
} from "piscina";
function move(
val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
): any;
interface Transferable {
readonly [transferableSymbol]: object;
readonly [valueSymbol]: object;
}
type TransferList = MessagePort extends {
postMessage: (value: any, transferList: infer T) => any;
} ? T : never;
type TransferListItem = TransferList extends Array<infer T> ? T : never;
// Utility functions
function isTransferable(value: unknown): boolean;
function isMovable(value: any): boolean;Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.
import {
AbortError,
AbortSignalAny,
AbortSignalEventTarget,
AbortSignalEventEmitter,
onabort
} from "piscina";
class AbortError extends Error {
constructor(reason?: AbortSignalEventTarget['reason']);
readonly name: 'AbortError';
}
interface AbortSignalEventTarget {
addEventListener(
name: 'abort',
listener: () => void,
options?: { once: boolean }
): void;
removeEventListener(name: 'abort', listener: () => void): void;
readonly aborted?: boolean;
readonly reason?: unknown;
}
interface AbortSignalEventEmitter {
off(name: 'abort', listener: () => void): void;
once(name: 'abort', listener: () => void): void;
}
type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
function onabort(abortSignal: AbortSignalAny, listener: () => void): void;Global properties and utilities available throughout the Piscina ecosystem.
import Piscina, {
move,
isWorkerThread,
workerData,
version,
transferableSymbol,
valueSymbol,
queueOptionsSymbol,
FixedQueue,
ArrayTaskQueue
} from "piscina";
class Piscina {
// Static properties
static readonly isWorkerThread: boolean;
static readonly workerData: any;
static readonly version: string;
static readonly Piscina: typeof Piscina;
static readonly FixedQueue: typeof FixedQueue;
static readonly ArrayTaskQueue: typeof ArrayTaskQueue;
static readonly transferableSymbol: symbol;
static readonly valueSymbol: symbol;
static readonly queueOptionsSymbol: symbol;
// Static methods
static move(
val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
): any;
}
// Named exports for convenience
const move: typeof Piscina.move;
const isWorkerThread: typeof Piscina.isWorkerThread;
const workerData: typeof Piscina.workerData;
const version: typeof Piscina.version;
const transferableSymbol: typeof Piscina.transferableSymbol;
const valueSymbol: typeof Piscina.valueSymbol;
const queueOptionsSymbol: typeof Piscina.queueOptionsSymbol;const Errors: {
ThreadTermination(): Error;
FilenameNotProvided(): Error;
TaskQueueAtLimit(): Error;
NoTaskQueueAvailable(): Error;
CloseTimeout(): Error;
};Common errors thrown by Piscina operations include:
import { PiscinaTask, ResourceLimits, EnvSpecifier } from "piscina";
import { Worker } from "node:worker_threads";
interface PiscinaTask {
readonly taskId: number;
readonly filename: string;
readonly name: string;
readonly created: number;
readonly isAbortable: boolean;
}
type ResourceLimits = Worker extends {
resourceLimits?: infer T;
} ? T : {};
type EnvSpecifier = typeof Worker extends {
new (filename: never, options?: { env: infer T }): Worker;
} ? T : never;