or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdload-balancing.mdperformance-monitoring.mdpool-management.mdtask-cancellation.mdtask-queues.mdtransferable-objects.md

index.mddocs/

0

# Piscina

1

2

## Overview

3

4

Piscina is a fast, efficient Node.js Worker Thread Pool implementation that enables parallel processing and CPU-intensive task distribution across multiple worker threads. It provides comprehensive features including:

5

6

- **Fast Inter-thread Communication**: Optimized message passing between main thread and workers

7

- **Flexible Pool Sizing**: Dynamic scaling with configurable min/max thread limits

8

- **Async Tracking Integration**: Proper async resource tracking for debugging and monitoring

9

- **Detailed Performance Statistics**: Built-in histograms with percentile calculations

10

- **Task Cancellation Support**: AbortController/AbortSignal integration for graceful cancellation

11

- **Memory Resource Limits**: Configurable memory and resource constraints per worker

12

- **Module System Support**: Compatible with CommonJS, ESM, and TypeScript modules

13

14

## Package Information

15

16

- **Package Name**: piscina

17

- **Package Type**: npm

18

- **Language**: TypeScript

19

- **Installation**: `npm install piscina`

20

- **Node.js Version**: 20.x and higher

21

22

## Core Imports

23

24

```typescript

25

import Piscina from "piscina";

26

// Or for named imports

27

import { Piscina, move, isWorkerThread, workerData } from "piscina";

28

```

29

30

For CommonJS:

31

32

```javascript

33

const Piscina = require("piscina");

34

// Or destructured

35

const { Piscina, move, isWorkerThread, workerData } = require("piscina");

36

```

37

38

## Basic Usage

39

40

```typescript

41

import Piscina from "piscina";

42

import { resolve } from "path";

43

44

// Create worker pool

45

const piscina = new Piscina({

46

filename: resolve(__dirname, "worker.js"),

47

maxThreads: 4

48

});

49

50

// Run tasks

51

const result = await piscina.run({ a: 4, b: 6 });

52

console.log(result); // Output from worker

53

54

// Graceful shutdown

55

await piscina.close();

56

```

57

58

Worker file (`worker.js`):

59

60

```javascript

61

module.exports = ({ a, b }) => {

62

return a + b;

63

};

64

```

65

66

## Architecture

67

68

Piscina is built around several key components:

69

70

- **Thread Pool Management**: Automatic worker thread creation, scaling, and lifecycle management

71

- **Task Queue System**: Configurable queuing strategies with built-in implementations (FIFO, fixed-size circular buffer)

72

- **Load Balancing**: Intelligent task distribution across available workers with customizable balancing strategies

73

- **Performance Monitoring**: Built-in histograms for runtime and wait time statistics with percentile calculations

74

- **Transferable Objects**: Efficient data transfer using structured cloning and transferable objects

75

- **Abort Support**: Task cancellation using AbortController/AbortSignal patterns

76

- **Resource Management**: Memory limits, file descriptor tracking, and graceful shutdown handling

77

78

## Capabilities

79

80

### Pool Management

81

82

Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.

83

84

```typescript { .api }

85

import Piscina from "piscina";

86

import { EventEmitterAsyncResource } from "node:async_hooks";

87

import { Worker } from "node:worker_threads";

88

89

class Piscina<T = any, R = any> extends EventEmitterAsyncResource {

90

constructor(options?: Options);

91

92

// Core methods

93

run(task: T, options?: RunOptions): Promise<R>;

94

close(options?: CloseOptions): Promise<void>;

95

destroy(): Promise<void>;

96

97

// Properties

98

readonly maxThreads: number;

99

readonly minThreads: number;

100

readonly options: FilledOptions;

101

readonly threads: Worker[];

102

readonly queueSize: number;

103

readonly completed: number;

104

readonly histogram: PiscinaHistogram;

105

readonly utilization: number;

106

readonly duration: number;

107

readonly needsDrain: boolean;

108

}

109

110

interface FilledOptions extends Required<Options> {

111

filename: string | null;

112

name: string;

113

minThreads: number;

114

maxThreads: number;

115

idleTimeout: number;

116

maxQueue: number;

117

concurrentTasksPerWorker: number;

118

atomics: 'sync' | 'async' | 'disabled';

119

taskQueue: TaskQueue;

120

niceIncrement: number;

121

closeTimeout: number;

122

recordTiming: boolean;

123

workerHistogram: boolean;

124

}

125

126

interface Options {

127

filename?: string | null;

128

name?: string;

129

minThreads?: number;

130

maxThreads?: number;

131

idleTimeout?: number;

132

maxQueue?: number | 'auto';

133

concurrentTasksPerWorker?: number;

134

atomics?: 'sync' | 'async' | 'disabled';

135

resourceLimits?: ResourceLimits;

136

argv?: string[];

137

execArgv?: string[];

138

env?: EnvSpecifier;

139

workerData?: any;

140

taskQueue?: TaskQueue;

141

niceIncrement?: number;

142

trackUnmanagedFds?: boolean;

143

closeTimeout?: number;

144

recordTiming?: boolean;

145

loadBalancer?: PiscinaLoadBalancer;

146

workerHistogram?: boolean;

147

}

148

149

interface RunOptions {

150

transferList?: TransferList;

151

filename?: string | null;

152

signal?: AbortSignalAny | null;

153

name?: string | null;

154

}

155

156

interface CloseOptions {

157

force?: boolean;

158

}

159

```

160

161

[Pool Management](./pool-management.md)

162

163

### Task Queue Systems

164

165

Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.

166

167

```typescript { .api }

168

import { TaskQueue, ArrayTaskQueue, FixedQueue, isTaskQueue } from "piscina";

169

170

interface Task {

171

readonly taskId: number;

172

readonly filename: string;

173

readonly name: string;

174

readonly created: number;

175

readonly isAbortable: boolean;

176

}

177

178

interface TaskQueue {

179

readonly size: number;

180

shift(): Task | null;

181

remove(task: Task): void;

182

push(task: Task): void;

183

}

184

185

class ArrayTaskQueue implements TaskQueue {

186

readonly size: number;

187

shift(): Task | null;

188

push(task: Task): void;

189

remove(task: Task): void;

190

}

191

192

class FixedQueue implements TaskQueue {

193

readonly size: number;

194

shift(): Task | null;

195

push(task: Task): void;

196

remove(task: Task): void;

197

}

198

199

function isTaskQueue(value: TaskQueue): boolean;

200

```

201

202

[Task Queues](./task-queues.md)

203

204

### Load Balancing

205

206

Intelligent task distribution across available workers with built-in least-busy balancer and support for custom balancing strategies.

207

208

```typescript { .api }

209

import { PiscinaLoadBalancer, LeastBusyBalancer, PiscinaTask, PiscinaWorker } from "piscina";

210

211

type PiscinaLoadBalancer = (

212

task: PiscinaTask,

213

workers: PiscinaWorker[]

214

) => PiscinaWorker | null;

215

216

function LeastBusyBalancer(

217

opts: LeastBusyBalancerOptions

218

): PiscinaLoadBalancer;

219

220

interface LeastBusyBalancerOptions {

221

maximumUsage: number;

222

}

223

224

interface PiscinaWorker {

225

readonly id: number;

226

readonly currentUsage: number;

227

readonly isRunningAbortableTask: boolean;

228

readonly histogram: PiscinaHistogramSummary | null;

229

readonly terminating: boolean;

230

readonly destroyed: boolean;

231

}

232

```

233

234

[Load Balancing](./load-balancing.md)

235

236

### Performance Monitoring

237

238

Built-in performance metrics collection with detailed histogram statistics for runtime and wait times, including percentile calculations.

239

240

```typescript { .api }

241

import { PiscinaHistogram, PiscinaHistogramSummary } from "piscina";

242

243

interface PiscinaHistogram {

244

readonly runTime: PiscinaHistogramSummary;

245

readonly waitTime: PiscinaHistogramSummary;

246

resetRunTime(): void;

247

resetWaitTime(): void;

248

}

249

250

interface PiscinaHistogramSummary {

251

readonly average: number;

252

readonly mean: number;

253

readonly stddev: number;

254

readonly min: number;

255

readonly max: number;

256

readonly p0_001: number;

257

readonly p0_01: number;

258

readonly p0_1: number;

259

readonly p1: number;

260

readonly p2_5: number;

261

readonly p10: number;

262

readonly p25: number;

263

readonly p50: number;

264

readonly p75: number;

265

readonly p90: number;

266

readonly p97_5: number;

267

readonly p99: number;

268

readonly p99_9: number;

269

readonly p99_99: number;

270

readonly p99_999: number;

271

}

272

```

273

274

[Performance Monitoring](./performance-monitoring.md)

275

276

### Transferable Objects

277

278

Efficient data transfer mechanisms using structured cloning and transferable objects for optimal performance with large data sets.

279

280

```typescript { .api }

281

import {

282

move,

283

Transferable,

284

transferableSymbol,

285

valueSymbol,

286

isTransferable,

287

isMovable

288

} from "piscina";

289

290

function move(

291

val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort

292

): any;

293

294

interface Transferable {

295

readonly [transferableSymbol]: object;

296

readonly [valueSymbol]: object;

297

}

298

299

type TransferList = MessagePort extends {

300

postMessage: (value: any, transferList: infer T) => any;

301

} ? T : never;

302

303

type TransferListItem = TransferList extends Array<infer T> ? T : never;

304

305

// Utility functions

306

function isTransferable(value: unknown): boolean;

307

function isMovable(value: any): boolean;

308

```

309

310

[Transferable Objects](./transferable-objects.md)

311

312

### Task Cancellation

313

314

Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.

315

316

```typescript { .api }

317

import {

318

AbortError,

319

AbortSignalAny,

320

AbortSignalEventTarget,

321

AbortSignalEventEmitter,

322

onabort

323

} from "piscina";

324

325

class AbortError extends Error {

326

constructor(reason?: AbortSignalEventTarget['reason']);

327

readonly name: 'AbortError';

328

}

329

330

interface AbortSignalEventTarget {

331

addEventListener(

332

name: 'abort',

333

listener: () => void,

334

options?: { once: boolean }

335

): void;

336

removeEventListener(name: 'abort', listener: () => void): void;

337

readonly aborted?: boolean;

338

readonly reason?: unknown;

339

}

340

341

interface AbortSignalEventEmitter {

342

off(name: 'abort', listener: () => void): void;

343

once(name: 'abort', listener: () => void): void;

344

}

345

346

type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;

347

348

function onabort(abortSignal: AbortSignalAny, listener: () => void): void;

349

```

350

351

[Task Cancellation](./task-cancellation.md)

352

353

## Worker Thread Context

354

355

### Static Properties and Methods

356

357

Global properties and utilities available throughout the Piscina ecosystem.

358

359

```typescript { .api }

360

import Piscina, {

361

move,

362

isWorkerThread,

363

workerData,

364

version,

365

transferableSymbol,

366

valueSymbol,

367

queueOptionsSymbol,

368

FixedQueue,

369

ArrayTaskQueue

370

} from "piscina";

371

372

class Piscina {

373

// Static properties

374

static readonly isWorkerThread: boolean;

375

static readonly workerData: any;

376

static readonly version: string;

377

static readonly Piscina: typeof Piscina;

378

static readonly FixedQueue: typeof FixedQueue;

379

static readonly ArrayTaskQueue: typeof ArrayTaskQueue;

380

static readonly transferableSymbol: symbol;

381

static readonly valueSymbol: symbol;

382

static readonly queueOptionsSymbol: symbol;

383

384

// Static methods

385

static move(

386

val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort

387

): any;

388

}

389

390

// Named exports for convenience

391

const move: typeof Piscina.move;

392

const isWorkerThread: typeof Piscina.isWorkerThread;

393

const workerData: typeof Piscina.workerData;

394

const version: typeof Piscina.version;

395

const transferableSymbol: typeof Piscina.transferableSymbol;

396

const valueSymbol: typeof Piscina.valueSymbol;

397

const queueOptionsSymbol: typeof Piscina.queueOptionsSymbol;

398

```

399

400

## Error Handling

401

402

```typescript { .api }

403

const Errors: {

404

ThreadTermination(): Error;

405

FilenameNotProvided(): Error;

406

TaskQueueAtLimit(): Error;

407

NoTaskQueueAvailable(): Error;

408

CloseTimeout(): Error;

409

};

410

```

411

412

Common errors thrown by Piscina operations include:

413

- **ThreadTermination**: Worker thread was terminated unexpectedly

414

- **FilenameNotProvided**: No worker filename specified in options or run call

415

- **TaskQueueAtLimit**: Task queue has reached its maximum capacity

416

- **NoTaskQueueAvailable**: No task queue available and all workers are busy

417

- **CloseTimeout**: Pool close operation exceeded the configured timeout

418

- **AbortError**: Task was cancelled via AbortSignal

419

420

## Types

421

422

```typescript { .api }

423

import { PiscinaTask, ResourceLimits, EnvSpecifier } from "piscina";

424

import { Worker } from "node:worker_threads";

425

426

interface PiscinaTask {

427

readonly taskId: number;

428

readonly filename: string;

429

readonly name: string;

430

readonly created: number;

431

readonly isAbortable: boolean;

432

}

433

434

type ResourceLimits = Worker extends {

435

resourceLimits?: infer T;

436

} ? T : {};

437

438

type EnvSpecifier = typeof Worker extends {

439

new (filename: never, options?: { env: infer T }): Worker;

440

} ? T : never;

441

```