or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdload-balancing.mdperformance-monitoring.mdpool-management.mdtask-cancellation.mdtask-queues.mdtransferable-objects.md

task-cancellation.mddocs/

0

# Task Cancellation

1

2

Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.

3

4

## Capabilities

5

6

### AbortSignal Integration

7

8

Piscina supports standard AbortController/AbortSignal patterns for task cancellation.

9

10

```typescript { .api }

11

interface RunOptions {

12

/** Abort signal for task cancellation */

13

signal?: AbortSignalAny | null;

14

}

15

16

/**

17

* Union type supporting both DOM and Node.js abort signal patterns

18

*/

19

type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;

20

```

21

22

**Usage Examples:**

23

24

```typescript

25

import Piscina from "piscina";

26

27

const pool = new Piscina({ filename: "worker.js" });

28

29

// Basic cancellation with AbortController

30

const controller = new AbortController();

31

const taskPromise = pool.run(

32

{ operation: "longRunning", duration: 10000 },

33

{ signal: controller.signal }

34

);

35

36

// Cancel after 2 seconds

37

setTimeout(() => {

38

controller.abort("Task timeout");

39

}, 2000);

40

41

try {

42

const result = await taskPromise;

43

console.log("Task completed:", result);

44

} catch (error) {

45

if (error.name === 'AbortError') {

46

console.log("Task was cancelled:", error.cause);

47

} else {

48

console.error("Task failed:", error);

49

}

50

}

51

```

52

53

### AbortError Class

54

55

Error thrown when tasks are cancelled via abort signals.

56

57

```typescript { .api }

58

/**

59

* Error thrown when a task is aborted

60

*/

61

class AbortError extends Error {

62

/**

63

* Create abort error with optional reason

64

* @param reason - Cancellation reason from AbortSignal

65

*/

66

constructor(reason?: AbortSignalEventTarget['reason']);

67

68

/** Error name is always 'AbortError' */

69

readonly name: 'AbortError';

70

}

71

```

72

73

**Usage Examples:**

74

75

```typescript

76

import Piscina from "piscina";

77

78

const pool = new Piscina({ filename: "worker.js" });

79

80

async function cancellableTask() {

81

const controller = new AbortController();

82

83

// Cancel with custom reason

84

setTimeout(() => {

85

controller.abort(new Error("Custom timeout"));

86

}, 1000);

87

88

try {

89

await pool.run({ task: "slow" }, { signal: controller.signal });

90

} catch (error) {

91

if (error instanceof AbortError) {

92

console.log("Cancelled:", error.message); // "The task has been aborted"

93

console.log("Reason:", error.cause); // Custom timeout error

94

}

95

}

96

}

97

```

98

99

### AbortSignal Types

100

101

Support for both DOM-style and Node.js-style abort signals.

102

103

```typescript { .api }

104

/**

105

* DOM-style abort signal (AbortController.signal)

106

*/

107

interface AbortSignalEventTarget {

108

addEventListener(

109

name: 'abort',

110

listener: () => void,

111

options?: { once: boolean }

112

): void;

113

removeEventListener(name: 'abort', listener: () => void): void;

114

readonly aborted?: boolean;

115

readonly reason?: unknown;

116

}

117

118

/**

119

* Node.js EventEmitter-style abort signal

120

*/

121

interface AbortSignalEventEmitter {

122

off(name: 'abort', listener: () => void): void;

123

once(name: 'abort', listener: () => void): void;

124

}

125

```

126

127

### Abort Signal Utilities

128

129

Utility functions for working with different abort signal types.

130

131

```typescript { .api }

132

/**

133

* Attach abort listener to any abort signal type

134

* @param abortSignal - AbortSignal to listen to

135

* @param listener - Function to call on abort

136

*/

137

function onabort(abortSignal: AbortSignalAny, listener: () => void): void;

138

```

139

140

**Usage Examples:**

141

142

```typescript

143

import { onabort } from "piscina";

144

145

// Works with AbortController

146

const controller = new AbortController();

147

onabort(controller.signal, () => {

148

console.log("DOM-style signal aborted");

149

});

150

151

// Works with EventEmitter-style signals

152

import { EventEmitter } from "events";

153

const emitter = new EventEmitter();

154

onabort(emitter as any, () => {

155

console.log("EventEmitter-style signal aborted");

156

});

157

158

// Trigger abort

159

controller.abort();

160

emitter.emit('abort');

161

```

162

163

### Task Cancellation Scenarios

164

165

#### Timeout-Based Cancellation

166

167

```typescript

168

import Piscina from "piscina";

169

170

const pool = new Piscina({ filename: "worker.js" });

171

172

/**

173

* Run task with timeout

174

* @param task - Task data

175

* @param timeoutMs - Timeout in milliseconds

176

*/

177

async function runWithTimeout(task: any, timeoutMs: number) {

178

const controller = new AbortController();

179

180

const timeoutId = setTimeout(() => {

181

controller.abort(`Task timeout after ${timeoutMs}ms`);

182

}, timeoutMs);

183

184

try {

185

const result = await pool.run(task, { signal: controller.signal });

186

clearTimeout(timeoutId);

187

return result;

188

} catch (error) {

189

clearTimeout(timeoutId);

190

throw error;

191

}

192

}

193

194

// Usage

195

try {

196

const result = await runWithTimeout({ operation: "compute" }, 5000);

197

console.log("Completed within timeout:", result);

198

} catch (error) {

199

if (error.name === 'AbortError') {

200

console.log("Task timed out");

201

}

202

}

203

```

204

205

#### User-Initiated Cancellation

206

207

```typescript

208

import Piscina from "piscina";

209

210

const pool = new Piscina({ filename: "worker.js" });

211

212

class CancellableTaskManager {

213

private controllers = new Map<string, AbortController>();

214

215

async startTask(taskId: string, taskData: any): Promise<any> {

216

const controller = new AbortController();

217

this.controllers.set(taskId, controller);

218

219

try {

220

const result = await pool.run(taskData, { signal: controller.signal });

221

this.controllers.delete(taskId);

222

return result;

223

} catch (error) {

224

this.controllers.delete(taskId);

225

throw error;

226

}

227

}

228

229

cancelTask(taskId: string, reason = "User cancelled"): boolean {

230

const controller = this.controllers.get(taskId);

231

if (controller) {

232

controller.abort(reason);

233

return true;

234

}

235

return false;

236

}

237

238

cancelAllTasks(reason = "Shutdown"): void {

239

for (const [taskId, controller] of this.controllers) {

240

controller.abort(reason);

241

}

242

this.controllers.clear();

243

}

244

}

245

246

// Usage

247

const taskManager = new CancellableTaskManager();

248

249

// Start multiple tasks

250

const task1 = taskManager.startTask("task1", { operation: "compute1" });

251

const task2 = taskManager.startTask("task2", { operation: "compute2" });

252

253

// Cancel specific task

254

setTimeout(() => {

255

taskManager.cancelTask("task1", "No longer needed");

256

}, 2000);

257

258

// Handle results

259

Promise.allSettled([task1, task2]).then(results => {

260

results.forEach((result, index) => {

261

if (result.status === 'fulfilled') {

262

console.log(`Task ${index + 1} completed:`, result.value);

263

} else {

264

console.log(`Task ${index + 1} failed:`, result.reason.message);

265

}

266

});

267

});

268

```

269

270

#### Graceful Shutdown

271

272

```typescript

273

import Piscina from "piscina";

274

275

const pool = new Piscina({ filename: "worker.js" });

276

277

class GracefulTaskPool {

278

private shutdownController = new AbortController();

279

private activeTasks = new Set<Promise<any>>();

280

281

async run(task: any, options: any = {}): Promise<any> {

282

if (this.shutdownController.signal.aborted) {

283

throw new AbortError("Pool is shutting down");

284

}

285

286

// Combine user signal with shutdown signal

287

let combinedSignal = this.shutdownController.signal;

288

if (options.signal) {

289

combinedSignal = this.combineSignals(options.signal, this.shutdownController.signal);

290

}

291

292

const taskPromise = pool.run(task, { ...options, signal: combinedSignal });

293

this.activeTasks.add(taskPromise);

294

295

taskPromise.finally(() => {

296

this.activeTasks.delete(taskPromise);

297

});

298

299

return taskPromise;

300

}

301

302

async shutdown(timeoutMs = 30000): Promise<void> {

303

console.log(`Shutting down pool with ${this.activeTasks.size} active tasks`);

304

305

// Signal all tasks to abort

306

this.shutdownController.abort("Pool shutdown");

307

308

// Wait for tasks to complete or timeout

309

const timeoutPromise = new Promise((_, reject) => {

310

setTimeout(() => reject(new Error("Shutdown timeout")), timeoutMs);

311

});

312

313

try {

314

await Promise.race([

315

Promise.allSettled(Array.from(this.activeTasks)),

316

timeoutPromise

317

]);

318

} catch (error) {

319

console.warn("Forced shutdown due to timeout");

320

}

321

322

await pool.close({ force: true });

323

}

324

325

private combineSignals(signal1: AbortSignalAny, signal2: AbortSignalAny): AbortSignalAny {

326

const controller = new AbortController();

327

328

const abortHandler = () => controller.abort();

329

330

if ('addEventListener' in signal1) {

331

signal1.addEventListener('abort', abortHandler, { once: true });

332

} else {

333

signal1.once('abort', abortHandler);

334

}

335

336

if ('addEventListener' in signal2) {

337

signal2.addEventListener('abort', abortHandler, { once: true });

338

} else {

339

signal2.once('abort', abortHandler);

340

}

341

342

return controller.signal;

343

}

344

}

345

346

// Usage

347

const taskPool = new GracefulTaskPool();

348

349

// Handle shutdown signal

350

process.on('SIGINT', async () => {

351

console.log('Received SIGINT, shutting down gracefully...');

352

await taskPool.shutdown();

353

process.exit(0);

354

});

355

356

// Run tasks

357

taskPool.run({ operation: "longTask" }).catch(error => {

358

if (error.name === 'AbortError') {

359

console.log("Task cancelled during shutdown");

360

}

361

});

362

```

363

364

### Worker-Side Cancellation

365

366

Workers can check for cancellation and perform cleanup.

367

368

**Worker file (worker.js):**

369

370

```javascript

371

const { isMainThread, parentPort } = require('worker_threads');

372

373

// Worker function

374

module.exports = async function(data) {

375

const { operation, iterations = 1000000 } = data;

376

377

if (operation === 'cancellableCompute') {

378

let result = 0;

379

380

for (let i = 0; i < iterations; i++) {

381

// Perform computation

382

result += Math.random();

383

384

// Check for cancellation periodically (every 1000 iterations)

385

if (i % 1000 === 0) {

386

// In a real scenario, you might check a shared flag or message

387

// For demonstration, we'll just yield control

388

await new Promise(resolve => setImmediate(resolve));

389

}

390

}

391

392

return { result, iterations };

393

}

394

395

if (operation === 'longRunning') {

396

// Simulate long-running task with periodic checks

397

const startTime = Date.now();

398

const duration = data.duration || 5000;

399

400

while (Date.now() - startTime < duration) {

401

// Do work...

402

await new Promise(resolve => setTimeout(resolve, 100));

403

404

// Worker is terminated externally if task is cancelled

405

// No need for explicit cancellation checks in this case

406

}

407

408

return { completed: true, actualDuration: Date.now() - startTime };

409

}

410

411

return { error: "Unknown operation" };

412

};

413

```

414

415

### Cancellation Best Practices

416

417

#### Error Handling

418

419

```typescript

420

import Piscina from "piscina";

421

422

const pool = new Piscina({ filename: "worker.js" });

423

424

async function robustTaskExecution(task: any, signal?: AbortSignalAny) {

425

try {

426

const result = await pool.run(task, { signal });

427

return { success: true, data: result };

428

} catch (error) {

429

if (error.name === 'AbortError') {

430

return { success: false, cancelled: true, reason: error.cause };

431

} else {

432

return { success: false, cancelled: false, error: error.message };

433

}

434

}

435

}

436

437

// Usage with different outcomes

438

const controller = new AbortController();

439

440

// Task that completes normally

441

const result1 = await robustTaskExecution({ operation: "fast" });

442

console.log(result1); // { success: true, data: { ... } }

443

444

// Task that gets cancelled

445

setTimeout(() => controller.abort("Timeout"), 100);

446

const result2 = await robustTaskExecution(

447

{ operation: "slow" },

448

controller.signal

449

);

450

console.log(result2); // { success: false, cancelled: true, reason: "Timeout" }

451

452

// Task that fails with error

453

const result3 = await robustTaskExecution({ operation: "invalid" });

454

console.log(result3); // { success: false, cancelled: false, error: "..." }

455

```

456

457

#### Resource Cleanup

458

459

```typescript

460

import Piscina from "piscina";

461

462

const pool = new Piscina({ filename: "worker.js" });

463

464

class ResourceManager {

465

private resources = new Map<string, any>();

466

467

async runWithResources(taskId: string, task: any, signal?: AbortSignalAny) {

468

// Allocate resources

469

const resource = await this.allocateResource(taskId);

470

471

try {

472

const result = await pool.run(

473

{ ...task, resourceId: taskId },

474

{ signal }

475

);

476

return result;

477

} catch (error) {

478

// Cleanup on any error (including cancellation)

479

await this.cleanupResource(taskId);

480

throw error;

481

} finally {

482

// Always cleanup

483

await this.cleanupResource(taskId);

484

}

485

}

486

487

private async allocateResource(id: string): Promise<any> {

488

const resource = { id, allocated: Date.now() };

489

this.resources.set(id, resource);

490

return resource;

491

}

492

493

private async cleanupResource(id: string): Promise<void> {

494

const resource = this.resources.get(id);

495

if (resource) {

496

console.log(`Cleaning up resource ${id}`);

497

this.resources.delete(id);

498

// Perform actual cleanup...

499

}

500

}

501

}

502

503

// Usage

504

const resourceManager = new ResourceManager();

505

const controller = new AbortController();

506

507

try {

508

await resourceManager.runWithResources(

509

"task1",

510

{ operation: "useResource" },

511

controller.signal

512

);

513

} catch (error) {

514

// Resources are cleaned up automatically

515

console.log("Task failed or cancelled, but resources cleaned up");

516

}

517

```

518

519

### Integration with Pool Events

520

521

Monitor cancellation-related events at the pool level.

522

523

```typescript

524

import Piscina from "piscina";

525

526

const pool = new Piscina({ filename: "worker.js" });

527

528

// Monitor worker destruction (may indicate cancellation)

529

pool.on('workerDestroy', (worker) => {

530

console.log(`Worker ${worker.id} destroyed (possibly due to cancellation)`);

531

});

532

533

// Monitor errors (may include abort errors)

534

pool.on('error', (error) => {

535

if (error.name === 'AbortError') {

536

console.log('Pool-level abort error:', error.message);

537

}

538

});

539

540

// Run cancellable tasks

541

const controller = new AbortController();

542

543

setTimeout(() => {

544

controller.abort("Demo cancellation");

545

}, 1000);

546

547

try {

548

await pool.run(

549

{ operation: "longRunning", duration: 5000 },

550

{ signal: controller.signal }

551

);

552

} catch (error) {

553

console.log("Expected cancellation:", error.name);

554

}

555

```