or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdload-balancing.mdperformance-monitoring.mdpool-management.mdtask-cancellation.mdtask-queues.mdtransferable-objects.md
tile.json

tessl/npm-piscina

A fast, efficient Node.js Worker Thread Pool implementation

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/piscina@5.1.x

To install, run

npx @tessl/cli install tessl/npm-piscina@5.1.0

index.mddocs/

Piscina

Overview

Piscina is a fast, efficient Node.js Worker Thread Pool implementation that enables parallel processing and CPU-intensive task distribution across multiple worker threads. It provides comprehensive features including:

  • Fast Inter-thread Communication: Optimized message passing between main thread and workers
  • Flexible Pool Sizing: Dynamic scaling with configurable min/max thread limits
  • Async Tracking Integration: Proper async resource tracking for debugging and monitoring
  • Detailed Performance Statistics: Built-in histograms with percentile calculations
  • Task Cancellation Support: AbortController/AbortSignal integration for graceful cancellation
  • Memory Resource Limits: Configurable memory and resource constraints per worker
  • Module System Support: Compatible with CommonJS, ESM, and TypeScript modules

Package Information

  • Package Name: piscina
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install piscina
  • Node.js Version: 20.x and higher

Core Imports

import Piscina from "piscina";
// Or for named imports
import { Piscina, move, isWorkerThread, workerData } from "piscina";

For CommonJS:

const Piscina = require("piscina");
// Or destructured
const { Piscina, move, isWorkerThread, workerData } = require("piscina");

Basic Usage

import Piscina from "piscina";
import { resolve } from "path";

// Create worker pool
const piscina = new Piscina({
  filename: resolve(__dirname, "worker.js"),
  maxThreads: 4
});

// Run tasks
const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Output from worker

// Graceful shutdown
await piscina.close();

Worker file (worker.js):

module.exports = ({ a, b }) => {
  return a + b;
};

Architecture

Piscina is built around several key components:

  • Thread Pool Management: Automatic worker thread creation, scaling, and lifecycle management
  • Task Queue System: Configurable queuing strategies with built-in implementations (FIFO, fixed-size circular buffer)
  • Load Balancing: Intelligent task distribution across available workers with customizable balancing strategies
  • Performance Monitoring: Built-in histograms for runtime and wait time statistics with percentile calculations
  • Transferable Objects: Efficient data transfer using structured cloning and transferable objects
  • Abort Support: Task cancellation using AbortController/AbortSignal patterns
  • Resource Management: Memory limits, file descriptor tracking, and graceful shutdown handling

Capabilities

Pool Management

Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.

import Piscina from "piscina";
import { EventEmitterAsyncResource } from "node:async_hooks";
import { Worker } from "node:worker_threads";

class Piscina<T = any, R = any> extends EventEmitterAsyncResource {
  constructor(options?: Options);
  
  // Core methods
  run(task: T, options?: RunOptions): Promise<R>;
  close(options?: CloseOptions): Promise<void>;
  destroy(): Promise<void>;
  
  // Properties
  readonly maxThreads: number;
  readonly minThreads: number;
  readonly options: FilledOptions;
  readonly threads: Worker[];
  readonly queueSize: number;
  readonly completed: number;
  readonly histogram: PiscinaHistogram;
  readonly utilization: number;
  readonly duration: number;
  readonly needsDrain: boolean;
}

interface FilledOptions extends Required<Options> {
  filename: string | null;
  name: string;
  minThreads: number;
  maxThreads: number;
  idleTimeout: number;
  maxQueue: number;
  concurrentTasksPerWorker: number;
  atomics: 'sync' | 'async' | 'disabled';
  taskQueue: TaskQueue;
  niceIncrement: number;
  closeTimeout: number;
  recordTiming: boolean;
  workerHistogram: boolean;
}

interface Options {
  filename?: string | null;
  name?: string;
  minThreads?: number;
  maxThreads?: number;
  idleTimeout?: number;
  maxQueue?: number | 'auto';
  concurrentTasksPerWorker?: number;
  atomics?: 'sync' | 'async' | 'disabled';
  resourceLimits?: ResourceLimits;
  argv?: string[];
  execArgv?: string[];
  env?: EnvSpecifier;
  workerData?: any;
  taskQueue?: TaskQueue;
  niceIncrement?: number;
  trackUnmanagedFds?: boolean;
  closeTimeout?: number;
  recordTiming?: boolean;
  loadBalancer?: PiscinaLoadBalancer;
  workerHistogram?: boolean;
}

interface RunOptions {
  transferList?: TransferList;
  filename?: string | null;
  signal?: AbortSignalAny | null;
  name?: string | null;
}

interface CloseOptions {
  force?: boolean;
}

Pool Management

Task Queue Systems

Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.

import { TaskQueue, ArrayTaskQueue, FixedQueue, isTaskQueue } from "piscina";

interface Task {
  readonly taskId: number;
  readonly filename: string;
  readonly name: string;
  readonly created: number;
  readonly isAbortable: boolean;
}

interface TaskQueue {
  readonly size: number;
  shift(): Task | null;
  remove(task: Task): void;
  push(task: Task): void;
}

class ArrayTaskQueue implements TaskQueue {
  readonly size: number;
  shift(): Task | null;
  push(task: Task): void;
  remove(task: Task): void;
}

class FixedQueue implements TaskQueue {
  readonly size: number;
  shift(): Task | null;
  push(task: Task): void;
  remove(task: Task): void;
}

function isTaskQueue(value: TaskQueue): boolean;

Task Queues

Load Balancing

Intelligent task distribution across available workers with built-in least-busy balancer and support for custom balancing strategies.

import { PiscinaLoadBalancer, LeastBusyBalancer, PiscinaTask, PiscinaWorker } from "piscina";

type PiscinaLoadBalancer = (
  task: PiscinaTask,
  workers: PiscinaWorker[]
) => PiscinaWorker | null;

function LeastBusyBalancer(
  opts: LeastBusyBalancerOptions
): PiscinaLoadBalancer;

interface LeastBusyBalancerOptions {
  maximumUsage: number;
}

interface PiscinaWorker {
  readonly id: number;
  readonly currentUsage: number;
  readonly isRunningAbortableTask: boolean;
  readonly histogram: PiscinaHistogramSummary | null;
  readonly terminating: boolean;
  readonly destroyed: boolean;
}

Load Balancing

Performance Monitoring

Built-in performance metrics collection with detailed histogram statistics for runtime and wait times, including percentile calculations.

import { PiscinaHistogram, PiscinaHistogramSummary } from "piscina";

interface PiscinaHistogram {
  readonly runTime: PiscinaHistogramSummary;
  readonly waitTime: PiscinaHistogramSummary;
  resetRunTime(): void;
  resetWaitTime(): void;
}

interface PiscinaHistogramSummary {
  readonly average: number;
  readonly mean: number;
  readonly stddev: number;
  readonly min: number;
  readonly max: number;
  readonly p0_001: number;
  readonly p0_01: number;
  readonly p0_1: number;
  readonly p1: number;
  readonly p2_5: number;
  readonly p10: number;
  readonly p25: number;
  readonly p50: number;
  readonly p75: number;
  readonly p90: number;
  readonly p97_5: number;
  readonly p99: number;
  readonly p99_9: number;
  readonly p99_99: number;
  readonly p99_999: number;
}

Performance Monitoring

Transferable Objects

Efficient data transfer mechanisms using structured cloning and transferable objects for optimal performance with large data sets.

import { 
  move, 
  Transferable, 
  transferableSymbol, 
  valueSymbol,
  isTransferable,
  isMovable
} from "piscina";

function move(
  val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
): any;

interface Transferable {
  readonly [transferableSymbol]: object;
  readonly [valueSymbol]: object;
}

type TransferList = MessagePort extends {
  postMessage: (value: any, transferList: infer T) => any;
} ? T : never;

type TransferListItem = TransferList extends Array<infer T> ? T : never;

// Utility functions
function isTransferable(value: unknown): boolean;
function isMovable(value: any): boolean;

Transferable Objects

Task Cancellation

Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.

import { 
  AbortError, 
  AbortSignalAny, 
  AbortSignalEventTarget,
  AbortSignalEventEmitter,
  onabort 
} from "piscina";

class AbortError extends Error {
  constructor(reason?: AbortSignalEventTarget['reason']);
  readonly name: 'AbortError';
}

interface AbortSignalEventTarget {
  addEventListener(
    name: 'abort',
    listener: () => void,
    options?: { once: boolean }
  ): void;
  removeEventListener(name: 'abort', listener: () => void): void;
  readonly aborted?: boolean;
  readonly reason?: unknown;
}

interface AbortSignalEventEmitter {
  off(name: 'abort', listener: () => void): void;
  once(name: 'abort', listener: () => void): void;
}

type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;

function onabort(abortSignal: AbortSignalAny, listener: () => void): void;

Task Cancellation

Worker Thread Context

Static Properties and Methods

Global properties and utilities available throughout the Piscina ecosystem.

import Piscina, {
  move,
  isWorkerThread,
  workerData,
  version,
  transferableSymbol,
  valueSymbol,
  queueOptionsSymbol,
  FixedQueue,
  ArrayTaskQueue
} from "piscina";

class Piscina {
  // Static properties
  static readonly isWorkerThread: boolean;
  static readonly workerData: any;
  static readonly version: string;
  static readonly Piscina: typeof Piscina;
  static readonly FixedQueue: typeof FixedQueue;
  static readonly ArrayTaskQueue: typeof ArrayTaskQueue;
  static readonly transferableSymbol: symbol;
  static readonly valueSymbol: symbol;
  static readonly queueOptionsSymbol: symbol;
  
  // Static methods
  static move(
    val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
  ): any;
}

// Named exports for convenience
const move: typeof Piscina.move;
const isWorkerThread: typeof Piscina.isWorkerThread;
const workerData: typeof Piscina.workerData;
const version: typeof Piscina.version;
const transferableSymbol: typeof Piscina.transferableSymbol;
const valueSymbol: typeof Piscina.valueSymbol;
const queueOptionsSymbol: typeof Piscina.queueOptionsSymbol;

Error Handling

const Errors: {
  ThreadTermination(): Error;
  FilenameNotProvided(): Error;
  TaskQueueAtLimit(): Error;
  NoTaskQueueAvailable(): Error;
  CloseTimeout(): Error;
};

Common errors thrown by Piscina operations include:

  • ThreadTermination: Worker thread was terminated unexpectedly
  • FilenameNotProvided: No worker filename specified in options or run call
  • TaskQueueAtLimit: Task queue has reached its maximum capacity
  • NoTaskQueueAvailable: No task queue available and all workers are busy
  • CloseTimeout: Pool close operation exceeded the configured timeout
  • AbortError: Task was cancelled via AbortSignal

Types

import { PiscinaTask, ResourceLimits, EnvSpecifier } from "piscina";
import { Worker } from "node:worker_threads";

interface PiscinaTask {
  readonly taskId: number;
  readonly filename: string;
  readonly name: string;
  readonly created: number;
  readonly isAbortable: boolean;
}

type ResourceLimits = Worker extends {
  resourceLimits?: infer T;
} ? T : {};

type EnvSpecifier = typeof Worker extends {
  new (filename: never, options?: { env: infer T }): Worker;
} ? T : never;