A fast, efficient Node.js Worker Thread Pool implementation
npx @tessl/cli install tessl/npm-piscina@5.1.00
# Piscina
1
2
## Overview
3
4
Piscina is a fast, efficient Node.js Worker Thread Pool implementation that enables parallel processing and CPU-intensive task distribution across multiple worker threads. It provides comprehensive features including:
5
6
- **Fast Inter-thread Communication**: Optimized message passing between main thread and workers
7
- **Flexible Pool Sizing**: Dynamic scaling with configurable min/max thread limits
8
- **Async Tracking Integration**: Proper async resource tracking for debugging and monitoring
9
- **Detailed Performance Statistics**: Built-in histograms with percentile calculations
10
- **Task Cancellation Support**: AbortController/AbortSignal integration for graceful cancellation
11
- **Memory Resource Limits**: Configurable memory and resource constraints per worker
12
- **Module System Support**: Compatible with CommonJS, ESM, and TypeScript modules
13
14
## Package Information
15
16
- **Package Name**: piscina
17
- **Package Type**: npm
18
- **Language**: TypeScript
19
- **Installation**: `npm install piscina`
20
- **Node.js Version**: 20.x and higher
21
22
## Core Imports
23
24
```typescript
25
import Piscina from "piscina";
26
// Or for named imports
27
import { Piscina, move, isWorkerThread, workerData } from "piscina";
28
```
29
30
For CommonJS:
31
32
```javascript
33
const Piscina = require("piscina");
34
// Or destructured
35
const { Piscina, move, isWorkerThread, workerData } = require("piscina");
36
```
37
38
## Basic Usage
39
40
```typescript
41
import Piscina from "piscina";
42
import { resolve } from "path";
43
44
// Create worker pool
45
const piscina = new Piscina({
46
filename: resolve(__dirname, "worker.js"),
47
maxThreads: 4
48
});
49
50
// Run tasks
51
const result = await piscina.run({ a: 4, b: 6 });
52
console.log(result); // Output from worker
53
54
// Graceful shutdown
55
await piscina.close();
56
```
57
58
Worker file (`worker.js`):
59
60
```javascript
61
module.exports = ({ a, b }) => {
62
return a + b;
63
};
64
```
65
66
## Architecture
67
68
Piscina is built around several key components:
69
70
- **Thread Pool Management**: Automatic worker thread creation, scaling, and lifecycle management
71
- **Task Queue System**: Configurable queuing strategies with built-in implementations (FIFO, fixed-size circular buffer)
72
- **Load Balancing**: Intelligent task distribution across available workers with customizable balancing strategies
73
- **Performance Monitoring**: Built-in histograms for runtime and wait time statistics with percentile calculations
74
- **Transferable Objects**: Efficient data transfer using structured cloning and transferable objects
75
- **Abort Support**: Task cancellation using AbortController/AbortSignal patterns
76
- **Resource Management**: Memory limits, file descriptor tracking, and graceful shutdown handling
77
78
## Capabilities
79
80
### Pool Management
81
82
Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.
83
84
```typescript { .api }
85
import Piscina from "piscina";
86
import { EventEmitterAsyncResource } from "node:async_hooks";
87
import { Worker } from "node:worker_threads";
88
89
class Piscina<T = any, R = any> extends EventEmitterAsyncResource {
90
constructor(options?: Options);
91
92
// Core methods
93
run(task: T, options?: RunOptions): Promise<R>;
94
close(options?: CloseOptions): Promise<void>;
95
destroy(): Promise<void>;
96
97
// Properties
98
readonly maxThreads: number;
99
readonly minThreads: number;
100
readonly options: FilledOptions;
101
readonly threads: Worker[];
102
readonly queueSize: number;
103
readonly completed: number;
104
readonly histogram: PiscinaHistogram;
105
readonly utilization: number;
106
readonly duration: number;
107
readonly needsDrain: boolean;
108
}
109
110
interface FilledOptions extends Required<Options> {
111
filename: string | null;
112
name: string;
113
minThreads: number;
114
maxThreads: number;
115
idleTimeout: number;
116
maxQueue: number;
117
concurrentTasksPerWorker: number;
118
atomics: 'sync' | 'async' | 'disabled';
119
taskQueue: TaskQueue;
120
niceIncrement: number;
121
closeTimeout: number;
122
recordTiming: boolean;
123
workerHistogram: boolean;
124
}
125
126
interface Options {
127
filename?: string | null;
128
name?: string;
129
minThreads?: number;
130
maxThreads?: number;
131
idleTimeout?: number;
132
maxQueue?: number | 'auto';
133
concurrentTasksPerWorker?: number;
134
atomics?: 'sync' | 'async' | 'disabled';
135
resourceLimits?: ResourceLimits;
136
argv?: string[];
137
execArgv?: string[];
138
env?: EnvSpecifier;
139
workerData?: any;
140
taskQueue?: TaskQueue;
141
niceIncrement?: number;
142
trackUnmanagedFds?: boolean;
143
closeTimeout?: number;
144
recordTiming?: boolean;
145
loadBalancer?: PiscinaLoadBalancer;
146
workerHistogram?: boolean;
147
}
148
149
interface RunOptions {
150
transferList?: TransferList;
151
filename?: string | null;
152
signal?: AbortSignalAny | null;
153
name?: string | null;
154
}
155
156
interface CloseOptions {
157
force?: boolean;
158
}
159
```
160
161
[Pool Management](./pool-management.md)
162
163
### Task Queue Systems
164
165
Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.
166
167
```typescript { .api }
168
import { TaskQueue, ArrayTaskQueue, FixedQueue, isTaskQueue } from "piscina";
169
170
interface Task {
171
readonly taskId: number;
172
readonly filename: string;
173
readonly name: string;
174
readonly created: number;
175
readonly isAbortable: boolean;
176
}
177
178
interface TaskQueue {
179
readonly size: number;
180
shift(): Task | null;
181
remove(task: Task): void;
182
push(task: Task): void;
183
}
184
185
class ArrayTaskQueue implements TaskQueue {
186
readonly size: number;
187
shift(): Task | null;
188
push(task: Task): void;
189
remove(task: Task): void;
190
}
191
192
class FixedQueue implements TaskQueue {
193
readonly size: number;
194
shift(): Task | null;
195
push(task: Task): void;
196
remove(task: Task): void;
197
}
198
199
function isTaskQueue(value: TaskQueue): boolean;
200
```
201
202
[Task Queues](./task-queues.md)
203
204
### Load Balancing
205
206
Intelligent task distribution across available workers with built-in least-busy balancer and support for custom balancing strategies.
207
208
```typescript { .api }
209
import { PiscinaLoadBalancer, LeastBusyBalancer, PiscinaTask, PiscinaWorker } from "piscina";
210
211
type PiscinaLoadBalancer = (
212
task: PiscinaTask,
213
workers: PiscinaWorker[]
214
) => PiscinaWorker | null;
215
216
function LeastBusyBalancer(
217
opts: LeastBusyBalancerOptions
218
): PiscinaLoadBalancer;
219
220
interface LeastBusyBalancerOptions {
221
maximumUsage: number;
222
}
223
224
interface PiscinaWorker {
225
readonly id: number;
226
readonly currentUsage: number;
227
readonly isRunningAbortableTask: boolean;
228
readonly histogram: PiscinaHistogramSummary | null;
229
readonly terminating: boolean;
230
readonly destroyed: boolean;
231
}
232
```
233
234
[Load Balancing](./load-balancing.md)
235
236
### Performance Monitoring
237
238
Built-in performance metrics collection with detailed histogram statistics for runtime and wait times, including percentile calculations.
239
240
```typescript { .api }
241
import { PiscinaHistogram, PiscinaHistogramSummary } from "piscina";
242
243
interface PiscinaHistogram {
244
readonly runTime: PiscinaHistogramSummary;
245
readonly waitTime: PiscinaHistogramSummary;
246
resetRunTime(): void;
247
resetWaitTime(): void;
248
}
249
250
interface PiscinaHistogramSummary {
251
readonly average: number;
252
readonly mean: number;
253
readonly stddev: number;
254
readonly min: number;
255
readonly max: number;
256
readonly p0_001: number;
257
readonly p0_01: number;
258
readonly p0_1: number;
259
readonly p1: number;
260
readonly p2_5: number;
261
readonly p10: number;
262
readonly p25: number;
263
readonly p50: number;
264
readonly p75: number;
265
readonly p90: number;
266
readonly p97_5: number;
267
readonly p99: number;
268
readonly p99_9: number;
269
readonly p99_99: number;
270
readonly p99_999: number;
271
}
272
```
273
274
[Performance Monitoring](./performance-monitoring.md)
275
276
### Transferable Objects
277
278
Efficient data transfer mechanisms using structured cloning and transferable objects for optimal performance with large data sets.
279
280
```typescript { .api }
281
import {
282
move,
283
Transferable,
284
transferableSymbol,
285
valueSymbol,
286
isTransferable,
287
isMovable
288
} from "piscina";
289
290
function move(
291
val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
292
): any;
293
294
interface Transferable {
295
readonly [transferableSymbol]: object;
296
readonly [valueSymbol]: object;
297
}
298
299
type TransferList = MessagePort extends {
300
postMessage: (value: any, transferList: infer T) => any;
301
} ? T : never;
302
303
type TransferListItem = TransferList extends Array<infer T> ? T : never;
304
305
// Utility functions
306
function isTransferable(value: unknown): boolean;
307
function isMovable(value: any): boolean;
308
```
309
310
[Transferable Objects](./transferable-objects.md)
311
312
### Task Cancellation
313
314
Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.
315
316
```typescript { .api }
317
import {
318
AbortError,
319
AbortSignalAny,
320
AbortSignalEventTarget,
321
AbortSignalEventEmitter,
322
onabort
323
} from "piscina";
324
325
class AbortError extends Error {
326
constructor(reason?: AbortSignalEventTarget['reason']);
327
readonly name: 'AbortError';
328
}
329
330
interface AbortSignalEventTarget {
331
addEventListener(
332
name: 'abort',
333
listener: () => void,
334
options?: { once: boolean }
335
): void;
336
removeEventListener(name: 'abort', listener: () => void): void;
337
readonly aborted?: boolean;
338
readonly reason?: unknown;
339
}
340
341
interface AbortSignalEventEmitter {
342
off(name: 'abort', listener: () => void): void;
343
once(name: 'abort', listener: () => void): void;
344
}
345
346
type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
347
348
function onabort(abortSignal: AbortSignalAny, listener: () => void): void;
349
```
350
351
[Task Cancellation](./task-cancellation.md)
352
353
## Worker Thread Context
354
355
### Static Properties and Methods
356
357
Global properties and utilities available throughout the Piscina ecosystem.
358
359
```typescript { .api }
360
import Piscina, {
361
move,
362
isWorkerThread,
363
workerData,
364
version,
365
transferableSymbol,
366
valueSymbol,
367
queueOptionsSymbol,
368
FixedQueue,
369
ArrayTaskQueue
370
} from "piscina";
371
372
class Piscina {
373
// Static properties
374
static readonly isWorkerThread: boolean;
375
static readonly workerData: any;
376
static readonly version: string;
377
static readonly Piscina: typeof Piscina;
378
static readonly FixedQueue: typeof FixedQueue;
379
static readonly ArrayTaskQueue: typeof ArrayTaskQueue;
380
static readonly transferableSymbol: symbol;
381
static readonly valueSymbol: symbol;
382
static readonly queueOptionsSymbol: symbol;
383
384
// Static methods
385
static move(
386
val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
387
): any;
388
}
389
390
// Named exports for convenience
391
const move: typeof Piscina.move;
392
const isWorkerThread: typeof Piscina.isWorkerThread;
393
const workerData: typeof Piscina.workerData;
394
const version: typeof Piscina.version;
395
const transferableSymbol: typeof Piscina.transferableSymbol;
396
const valueSymbol: typeof Piscina.valueSymbol;
397
const queueOptionsSymbol: typeof Piscina.queueOptionsSymbol;
398
```
399
400
## Error Handling
401
402
```typescript { .api }
403
const Errors: {
404
ThreadTermination(): Error;
405
FilenameNotProvided(): Error;
406
TaskQueueAtLimit(): Error;
407
NoTaskQueueAvailable(): Error;
408
CloseTimeout(): Error;
409
};
410
```
411
412
Common errors thrown by Piscina operations include:
413
- **ThreadTermination**: Worker thread was terminated unexpectedly
414
- **FilenameNotProvided**: No worker filename specified in options or run call
415
- **TaskQueueAtLimit**: Task queue has reached its maximum capacity
416
- **NoTaskQueueAvailable**: No task queue available and all workers are busy
417
- **CloseTimeout**: Pool close operation exceeded the configured timeout
418
- **AbortError**: Task was cancelled via AbortSignal
419
420
## Types
421
422
```typescript { .api }
423
import { PiscinaTask, ResourceLimits, EnvSpecifier } from "piscina";
424
import { Worker } from "node:worker_threads";
425
426
interface PiscinaTask {
427
readonly taskId: number;
428
readonly filename: string;
429
readonly name: string;
430
readonly created: number;
431
readonly isAbortable: boolean;
432
}
433
434
type ResourceLimits = Worker extends {
435
resourceLimits?: infer T;
436
} ? T : {};
437
438
type EnvSpecifier = typeof Worker extends {
439
new (filename: never, options?: { env: infer T }): Worker;
440
} ? T : never;
441
```