or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdload-balancing.mdperformance-monitoring.mdpool-management.mdtask-cancellation.mdtask-queues.mdtransferable-objects.md

pool-management.mddocs/

0

# Pool Management

1

2

Core worker thread pool functionality for creating, configuring, and managing worker pools with automatic scaling and resource management.

3

4

## Capabilities

5

6

### Piscina Class

7

8

Main worker thread pool class that extends EventEmitterAsyncResource for full async tracking support.

9

10

```typescript { .api }

11

/**

12

* Main worker thread pool class

13

* @extends EventEmitterAsyncResource

14

* @template T - Task input type

15

* @template R - Task result type

16

*/

17

class Piscina<T = any, R = any> extends EventEmitterAsyncResource {

18

/**

19

* Create a new worker thread pool

20

* @param options - Pool configuration options

21

*/

22

constructor(options?: Options);

23

24

/**

25

* Execute a task in the worker pool

26

* @param task - Task data to pass to worker

27

* @param options - Task execution options

28

* @returns Promise resolving to task result

29

*/

30

run(task: T, options?: RunOptions): Promise<R>;

31

32

/**

33

* Gracefully close the pool, waiting for running tasks to complete

34

* @param options - Close operation options

35

* @returns Promise resolving when pool is closed

36

*/

37

close(options?: CloseOptions): Promise<void>;

38

39

/**

40

* Forcefully destroy the pool, terminating all workers immediately

41

* @returns Promise resolving when pool is destroyed

42

*/

43

destroy(): Promise<void>;

44

45

/**

46

* Synchronous disposal method for 'using' keyword support

47

*/

48

[Symbol.dispose](): void;

49

50

/**

51

* Asynchronous disposal method for 'await using' keyword support

52

*/

53

[Symbol.asyncDispose](): Promise<void>;

54

55

// Pool properties

56

readonly maxThreads: number;

57

readonly minThreads: number;

58

readonly options: FilledOptions;

59

readonly threads: Worker[];

60

readonly queueSize: number;

61

readonly completed: number;

62

readonly histogram: PiscinaHistogram;

63

readonly utilization: number;

64

readonly duration: number;

65

readonly needsDrain: boolean;

66

}

67

```

68

69

**Usage Examples:**

70

71

```typescript

72

import Piscina from "piscina";

73

import { resolve } from "path";

74

75

// Basic pool with default settings

76

const pool = new Piscina({

77

filename: resolve(__dirname, "worker.js")

78

});

79

80

// Advanced pool configuration

81

const advancedPool = new Piscina({

82

filename: resolve(__dirname, "worker.js"),

83

minThreads: 2,

84

maxThreads: 8,

85

idleTimeout: 5000,

86

maxQueue: 50,

87

concurrentTasksPerWorker: 2,

88

resourceLimits: {

89

maxOldGenerationSizeMb: 100,

90

maxYoungGenerationSizeMb: 50

91

}

92

});

93

94

// Execute tasks

95

const result = await pool.run({ operation: "add", values: [1, 2, 3] });

96

console.log("Task completed:", result);

97

98

// Graceful shutdown

99

await pool.close();

100

```

101

102

### Pool Configuration Options

103

104

Comprehensive configuration interface for customizing pool behavior.

105

106

```typescript { .api }

107

/**

108

* Pool configuration options

109

*/

110

interface Options {

111

/** Worker script filename - can be JavaScript, TypeScript, or ESM */

112

filename?: string | null;

113

/** Pool name for identification and debugging */

114

name?: string;

115

/** Minimum number of worker threads to maintain */

116

minThreads?: number;

117

/** Maximum number of worker threads allowed */

118

maxThreads?: number;

119

/** Milliseconds before idle workers are terminated (0 = never, Infinity = never) */

120

idleTimeout?: number;

121

/** Maximum queued tasks ('auto' = maxThreads^2, number, or Infinity) */

122

maxQueue?: number | 'auto';

123

/** Number of concurrent tasks each worker can handle */

124

concurrentTasksPerWorker?: number;

125

/** Shared memory communication mode */

126

atomics?: 'sync' | 'async' | 'disabled';

127

/** Resource limits for worker threads */

128

resourceLimits?: ResourceLimits;

129

/** Command line arguments passed to worker */

130

argv?: string[];

131

/** Node.js execution arguments passed to worker */

132

execArgv?: string[];

133

/** Environment variables for workers */

134

env?: EnvSpecifier;

135

/** Data passed to worker on startup */

136

workerData?: any;

137

/** Custom task queue implementation */

138

taskQueue?: TaskQueue;

139

/** Process priority adjustment (Unix only) */

140

niceIncrement?: number;

141

/** Track unmanaged file descriptors */

142

trackUnmanagedFds?: boolean;

143

/** Timeout for close operations in milliseconds */

144

closeTimeout?: number;

145

/** Enable performance timing collection */

146

recordTiming?: boolean;

147

/** Custom load balancer function */

148

loadBalancer?: PiscinaLoadBalancer;

149

/** Enable per-worker histograms */

150

workerHistogram?: boolean;

151

}

152

153

/**

154

* Filled options with all defaults applied

155

*/

156

interface FilledOptions extends Options {

157

filename: string | null;

158

name: string;

159

minThreads: number;

160

maxThreads: number;

161

idleTimeout: number;

162

maxQueue: number;

163

concurrentTasksPerWorker: number;

164

atomics: 'sync' | 'async' | 'disabled';

165

taskQueue: TaskQueue;

166

niceIncrement: number;

167

closeTimeout: number;

168

recordTiming: boolean;

169

workerHistogram: boolean;

170

}

171

```

172

173

### Task Execution Options

174

175

Options for individual task execution with support for transfers and cancellation.

176

177

```typescript { .api }

178

/**

179

* Options for individual task execution

180

*/

181

interface RunOptions {

182

/** Objects to transfer ownership to worker (for performance) */

183

transferList?: TransferList;

184

/** Override worker filename for this task */

185

filename?: string | null;

186

/** Abort signal for task cancellation */

187

signal?: AbortSignalAny | null;

188

/** Override task name for this execution */

189

name?: string | null;

190

}

191

```

192

193

**Usage Examples:**

194

195

```typescript

196

import Piscina from "piscina";

197

198

const pool = new Piscina({

199

filename: resolve(__dirname, "worker.js")

200

});

201

202

// Task with transferable objects

203

const buffer = new ArrayBuffer(1024);

204

const result = await pool.run(

205

{ data: buffer, operation: "process" },

206

{ transferList: [buffer] }

207

);

208

209

// Task with cancellation

210

const controller = new AbortController();

211

const taskPromise = pool.run(

212

{ longRunningTask: true },

213

{ signal: controller.signal }

214

);

215

216

// Cancel after 5 seconds

217

setTimeout(() => controller.abort(), 5000);

218

219

try {

220

const result = await taskPromise;

221

} catch (error) {

222

if (error.name === 'AbortError') {

223

console.log('Task was cancelled');

224

}

225

}

226

```

227

228

### Pool Closing Options

229

230

Configuration for graceful or forced pool shutdown.

231

232

```typescript { .api }

233

/**

234

* Options for pool close operations

235

*/

236

interface CloseOptions {

237

/** Force immediate termination without waiting for tasks */

238

force?: boolean;

239

}

240

```

241

242

**Usage Examples:**

243

244

```typescript

245

// Graceful shutdown - wait for running tasks

246

await pool.close();

247

248

// Forced shutdown - terminate immediately

249

await pool.close({ force: true });

250

251

// Using disposal syntax (Node.js 20+)

252

{

253

using pool = new Piscina({ filename: "worker.js" });

254

await pool.run(task);

255

// Automatically closed when leaving scope

256

}

257

258

// Async disposal

259

{

260

await using pool = new Piscina({ filename: "worker.js" });

261

await pool.run(task);

262

// Automatically closed when leaving scope

263

}

264

```

265

266

### Pool Properties

267

268

Real-time pool status and configuration access.

269

270

```typescript { .api }

271

class Piscina {

272

/** Maximum number of worker threads */

273

readonly maxThreads: number;

274

275

/** Minimum number of worker threads */

276

readonly minThreads: number;

277

278

/** Complete pool configuration with defaults applied */

279

readonly options: FilledOptions;

280

281

/** Array of all worker thread instances */

282

readonly threads: Worker[];

283

284

/** Current number of queued tasks */

285

readonly queueSize: number;

286

287

/** Total number of completed tasks */

288

readonly completed: number;

289

290

/** Performance histogram data */

291

readonly histogram: PiscinaHistogram;

292

293

/** Pool utilization as percentage (0-1) */

294

readonly utilization: number;

295

296

/** Pool runtime in milliseconds */

297

readonly duration: number;

298

299

/** Whether pool needs draining (at capacity) */

300

readonly needsDrain: boolean;

301

}

302

```

303

304

**Usage Examples:**

305

306

```typescript

307

const pool = new Piscina({

308

filename: "worker.js",

309

minThreads: 2,

310

maxThreads: 8

311

});

312

313

// Monitor pool status

314

console.log(`Pool has ${pool.threads.length} active threads`);

315

console.log(`Queue size: ${pool.queueSize}`);

316

console.log(`Completed tasks: ${pool.completed}`);

317

console.log(`Utilization: ${(pool.utilization * 100).toFixed(2)}%`);

318

console.log(`Running for: ${pool.duration}ms`);

319

320

// Check if pool needs draining

321

if (pool.needsDrain) {

322

console.log("Pool is at capacity, consider adding more threads");

323

}

324

325

// Access configuration

326

console.log(`Max threads: ${pool.maxThreads}`);

327

console.log(`Idle timeout: ${pool.options.idleTimeout}`);

328

```

329

330

## Events

331

332

The Piscina class extends EventEmitterAsyncResource and emits the following events:

333

334

```typescript { .api }

335

class Piscina extends EventEmitterAsyncResource {

336

// Event: 'workerCreate' - Emitted when a new worker is created

337

on(event: 'workerCreate', listener: (worker: PiscinaWorker) => void): this;

338

339

// Event: 'workerDestroy' - Emitted when a worker is destroyed

340

on(event: 'workerDestroy', listener: (worker: PiscinaWorker) => void): this;

341

342

// Event: 'needsDrain' - Emitted when pool is at capacity

343

on(event: 'needsDrain', listener: () => void): this;

344

345

// Event: 'drain' - Emitted when pool capacity is available again

346

on(event: 'drain', listener: () => void): this;

347

348

// Event: 'message' - Emitted for messages from workers

349

on(event: 'message', listener: (message: any) => void): this;

350

351

// Event: 'error' - Emitted for pool-level errors

352

on(event: 'error', listener: (error: Error) => void): this;

353

354

// Event: 'close' - Emitted when pool is closed

355

on(event: 'close', listener: () => void): this;

356

}

357

```

358

359

**Usage Examples:**

360

361

```typescript

362

const pool = new Piscina({ filename: "worker.js" });

363

364

// Monitor worker lifecycle

365

pool.on('workerCreate', (worker) => {

366

console.log(`Worker ${worker.id} created`);

367

});

368

369

pool.on('workerDestroy', (worker) => {

370

console.log(`Worker ${worker.id} destroyed`);

371

});

372

373

// Monitor pool capacity

374

pool.on('needsDrain', () => {

375

console.log('Pool at capacity - consider draining');

376

});

377

378

pool.on('drain', () => {

379

console.log('Pool capacity available');

380

});

381

382

// Handle errors

383

pool.on('error', (error) => {

384

console.error('Pool error:', error);

385

});

386

```