0
# Pool Management
1
2
Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.
3
4
## Capabilities
5
6
### Piscina Class
7
8
Main worker thread pool class that extends EventEmitterAsyncResource for full async tracking support.
9
10
```typescript { .api }
11
/**
12
* Main worker thread pool class
13
* @extends EventEmitterAsyncResource
14
* @template T - Task input type
15
* @template R - Task result type
16
*/
17
class Piscina<T = any, R = any> extends EventEmitterAsyncResource {
18
/**
19
* Create a new worker thread pool
20
* @param options - Pool configuration options
21
*/
22
constructor(options?: Options);
23
24
/**
25
* Execute a task in the worker pool
26
* @param task - Task data to pass to worker
27
* @param options - Task execution options
28
* @returns Promise resolving to task result
29
*/
30
run(task: T, options?: RunOptions): Promise<R>;
31
32
/**
33
* Gracefully close the pool, waiting for running tasks to complete
34
* @param options - Close operation options
35
* @returns Promise resolving when pool is closed
36
*/
37
close(options?: CloseOptions): Promise<void>;
38
39
/**
40
* Forcefully destroy the pool, terminating all workers immediately
41
* @returns Promise resolving when pool is destroyed
42
*/
43
destroy(): Promise<void>;
44
45
/**
46
* Synchronous disposal method for 'using' keyword support
47
*/
48
[Symbol.dispose](): void;
49
50
/**
51
* Asynchronous disposal method for 'await using' keyword support
52
*/
53
[Symbol.asyncDispose](): Promise<void>;
54
55
// Pool properties
56
readonly maxThreads: number;
57
readonly minThreads: number;
58
readonly options: FilledOptions;
59
readonly threads: Worker[];
60
readonly queueSize: number;
61
readonly completed: number;
62
readonly histogram: PiscinaHistogram;
63
readonly utilization: number;
64
readonly duration: number;
65
readonly needsDrain: boolean;
66
}
67
```
68
69
**Usage Examples:**
70
71
```typescript
72
import Piscina from "piscina";
73
import { resolve } from "path";
74
75
// Basic pool with default settings
76
const pool = new Piscina({
77
filename: resolve(__dirname, "worker.js")
78
});
79
80
// Advanced pool configuration
81
const advancedPool = new Piscina({
82
filename: resolve(__dirname, "worker.js"),
83
minThreads: 2,
84
maxThreads: 8,
85
idleTimeout: 5000,
86
maxQueue: 50,
87
concurrentTasksPerWorker: 2,
88
resourceLimits: {
89
maxOldGenerationSizeMb: 100,
90
maxYoungGenerationSizeMb: 50
91
}
92
});
93
94
// Execute tasks
95
const result = await pool.run({ operation: "add", values: [1, 2, 3] });
96
console.log("Task completed:", result);
97
98
// Graceful shutdown
99
await pool.close();
100
```
101
102
### Pool Configuration Options
103
104
Comprehensive configuration interface for customizing pool behavior.
105
106
```typescript { .api }
107
/**
108
* Pool configuration options
109
*/
110
interface Options {
111
/** Worker script filename - can be JavaScript, TypeScript, or ESM */
112
filename?: string | null;
113
/** Pool name for identification and debugging */
114
name?: string;
115
/** Minimum number of worker threads to maintain */
116
minThreads?: number;
117
/** Maximum number of worker threads allowed */
118
maxThreads?: number;
119
/** Milliseconds before idle workers are terminated (0 = never, Infinity = never) */
120
idleTimeout?: number;
121
/** Maximum queued tasks ('auto' = maxThreads^2, number, or Infinity) */
122
maxQueue?: number | 'auto';
123
/** Number of concurrent tasks each worker can handle */
124
concurrentTasksPerWorker?: number;
125
/** Shared memory communication mode */
126
atomics?: 'sync' | 'async' | 'disabled';
127
/** Resource limits for worker threads */
128
resourceLimits?: ResourceLimits;
129
/** Command line arguments passed to worker */
130
argv?: string[];
131
/** Node.js execution arguments passed to worker */
132
execArgv?: string[];
133
/** Environment variables for workers */
134
env?: EnvSpecifier;
135
/** Data passed to worker on startup */
136
workerData?: any;
137
/** Custom task queue implementation */
138
taskQueue?: TaskQueue;
139
/** Process priority adjustment (Unix only) */
140
niceIncrement?: number;
141
/** Track unmanaged file descriptors */
142
trackUnmanagedFds?: boolean;
143
/** Timeout for close operations in milliseconds */
144
closeTimeout?: number;
145
/** Enable performance timing collection */
146
recordTiming?: boolean;
147
/** Custom load balancer function */
148
loadBalancer?: PiscinaLoadBalancer;
149
/** Enable per-worker histograms */
150
workerHistogram?: boolean;
151
}
152
153
/**
154
* Filled options with all defaults applied
155
*/
156
interface FilledOptions extends Options {
157
filename: string | null;
158
name: string;
159
minThreads: number;
160
maxThreads: number;
161
idleTimeout: number;
162
maxQueue: number;
163
concurrentTasksPerWorker: number;
164
atomics: 'sync' | 'async' | 'disabled';
165
taskQueue: TaskQueue;
166
niceIncrement: number;
167
closeTimeout: number;
168
recordTiming: boolean;
169
workerHistogram: boolean;
170
}
171
```
172
173
### Task Execution Options
174
175
Options for individual task execution with support for transfers and cancellation.
176
177
```typescript { .api }
178
/**
179
* Options for individual task execution
180
*/
181
interface RunOptions {
182
/** Objects to transfer ownership to worker (for performance) */
183
transferList?: TransferList;
184
/** Override worker filename for this task */
185
filename?: string | null;
186
/** Abort signal for task cancellation */
187
signal?: AbortSignalAny | null;
188
/** Override task name for this execution */
189
name?: string | null;
190
}
191
```
192
193
**Usage Examples:**
194
195
```typescript
196
import Piscina from "piscina";
197
198
const pool = new Piscina({
199
filename: resolve(__dirname, "worker.js")
200
});
201
202
// Task with transferable objects
203
const buffer = new ArrayBuffer(1024);
204
const result = await pool.run(
205
{ data: buffer, operation: "process" },
206
{ transferList: [buffer] }
207
);
208
209
// Task with cancellation
210
const controller = new AbortController();
211
const taskPromise = pool.run(
212
{ longRunningTask: true },
213
{ signal: controller.signal }
214
);
215
216
// Cancel after 5 seconds
217
setTimeout(() => controller.abort(), 5000);
218
219
try {
220
const result = await taskPromise;
221
} catch (error) {
222
if (error.name === 'AbortError') {
223
console.log('Task was cancelled');
224
}
225
}
226
```
227
228
### Pool Closing Options
229
230
Configuration for graceful or forced pool shutdown.
231
232
```typescript { .api }
233
/**
234
* Options for pool close operations
235
*/
236
interface CloseOptions {
237
/** Force immediate termination without waiting for tasks */
238
force?: boolean;
239
}
240
```
241
242
**Usage Examples:**
243
244
```typescript
245
// Graceful shutdown - wait for running tasks
246
await pool.close();
247
248
// Forced shutdown - terminate immediately
249
await pool.close({ force: true });
250
251
// Using disposal syntax (Node.js 20+)
252
{
253
using pool = new Piscina({ filename: "worker.js" });
254
await pool.run(task);
255
// Automatically closed when leaving scope
256
}
257
258
// Async disposal
259
{
260
await using pool = new Piscina({ filename: "worker.js" });
261
await pool.run(task);
262
// Automatically closed when leaving scope
263
}
264
```
265
266
### Pool Properties
267
268
Real-time pool status and configuration access.
269
270
```typescript { .api }
271
class Piscina {
272
/** Maximum number of worker threads */
273
readonly maxThreads: number;
274
275
/** Minimum number of worker threads */
276
readonly minThreads: number;
277
278
/** Complete pool configuration with defaults applied */
279
readonly options: FilledOptions;
280
281
/** Array of all worker thread instances */
282
readonly threads: Worker[];
283
284
/** Current number of queued tasks */
285
readonly queueSize: number;
286
287
/** Total number of completed tasks */
288
readonly completed: number;
289
290
/** Performance histogram data */
291
readonly histogram: PiscinaHistogram;
292
293
/** Pool utilization as percentage (0-1) */
294
readonly utilization: number;
295
296
/** Pool runtime in milliseconds */
297
readonly duration: number;
298
299
/** Whether pool needs draining (at capacity) */
300
readonly needsDrain: boolean;
301
}
302
```
303
304
**Usage Examples:**
305
306
```typescript
307
const pool = new Piscina({
308
filename: "worker.js",
309
minThreads: 2,
310
maxThreads: 8
311
});
312
313
// Monitor pool status
314
console.log(`Pool has ${pool.threads.length} active threads`);
315
console.log(`Queue size: ${pool.queueSize}`);
316
console.log(`Completed tasks: ${pool.completed}`);
317
console.log(`Utilization: ${(pool.utilization * 100).toFixed(2)}%`);
318
console.log(`Running for: ${pool.duration}ms`);
319
320
// Check if pool needs draining
321
if (pool.needsDrain) {
322
console.log("Pool is at capacity, consider adding more threads");
323
}
324
325
// Access configuration
326
console.log(`Max threads: ${pool.maxThreads}`);
327
console.log(`Idle timeout: ${pool.options.idleTimeout}`);
328
```
329
330
## Events
331
332
The Piscina class extends EventEmitterAsyncResource and emits the following events:
333
334
```typescript { .api }
335
class Piscina extends EventEmitterAsyncResource {
336
// Event: 'workerCreate' - Emitted when a new worker is created
337
on(event: 'workerCreate', listener: (worker: PiscinaWorker) => void): this;
338
339
// Event: 'workerDestroy' - Emitted when a worker is destroyed
340
on(event: 'workerDestroy', listener: (worker: PiscinaWorker) => void): this;
341
342
// Event: 'needsDrain' - Emitted when pool is at capacity
343
on(event: 'needsDrain', listener: () => void): this;
344
345
// Event: 'drain' - Emitted when pool capacity is available again
346
on(event: 'drain', listener: () => void): this;
347
348
// Event: 'message' - Emitted for messages from workers
349
on(event: 'message', listener: (message: any) => void): this;
350
351
// Event: 'error' - Emitted for pool-level errors
352
on(event: 'error', listener: (error: Error) => void): this;
353
354
// Event: 'close' - Emitted when pool is closed
355
on(event: 'close', listener: () => void): this;
356
}
357
```
358
359
**Usage Examples:**
360
361
```typescript
362
const pool = new Piscina({ filename: "worker.js" });
363
364
// Monitor worker lifecycle
365
pool.on('workerCreate', (worker) => {
366
console.log(`Worker ${worker.id} created`);
367
});
368
369
pool.on('workerDestroy', (worker) => {
370
console.log(`Worker ${worker.id} destroyed`);
371
});
372
373
// Monitor pool capacity
374
pool.on('needsDrain', () => {
375
console.log('Pool at capacity - consider draining');
376
});
377
378
pool.on('drain', () => {
379
console.log('Pool capacity available');
380
});
381
382
// Handle errors
383
pool.on('error', (error) => {
384
console.error('Pool error:', error);
385
});
386
```