0
# Task Cancellation
1
2
Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.
3
4
## Capabilities
5
6
### AbortSignal Integration
7
8
Piscina supports standard AbortController/AbortSignal patterns for task cancellation.
9
10
```typescript { .api }
11
interface RunOptions {
12
/** Abort signal for task cancellation */
13
signal?: AbortSignalAny | null;
14
}
15
16
/**
17
* Union type supporting both DOM and Node.js abort signal patterns
18
*/
19
type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
20
```
21
22
**Usage Examples:**
23
24
```typescript
25
import Piscina from "piscina";
26
27
const pool = new Piscina({ filename: "worker.js" });
28
29
// Basic cancellation with AbortController
30
const controller = new AbortController();
31
const taskPromise = pool.run(
32
{ operation: "longRunning", duration: 10000 },
33
{ signal: controller.signal }
34
);
35
36
// Cancel after 2 seconds
37
setTimeout(() => {
38
controller.abort("Task timeout");
39
}, 2000);
40
41
try {
42
const result = await taskPromise;
43
console.log("Task completed:", result);
44
} catch (error) {
45
if (error.name === 'AbortError') {
46
console.log("Task was cancelled:", error.cause);
47
} else {
48
console.error("Task failed:", error);
49
}
50
}
51
```
52
53
### AbortError Class
54
55
Error thrown when tasks are cancelled via abort signals.
56
57
```typescript { .api }
58
/**
59
* Error thrown when a task is aborted
60
*/
61
class AbortError extends Error {
62
/**
63
* Create abort error with optional reason
64
* @param reason - Cancellation reason from AbortSignal
65
*/
66
constructor(reason?: AbortSignalEventTarget['reason']);
67
68
/** Error name is always 'AbortError' */
69
readonly name: 'AbortError';
70
}
71
```
72
73
**Usage Examples:**
74
75
```typescript
76
import Piscina from "piscina";
77
78
const pool = new Piscina({ filename: "worker.js" });
79
80
async function cancellableTask() {
81
const controller = new AbortController();
82
83
// Cancel with custom reason
84
setTimeout(() => {
85
controller.abort(new Error("Custom timeout"));
86
}, 1000);
87
88
try {
89
await pool.run({ task: "slow" }, { signal: controller.signal });
90
} catch (error) {
91
if (error instanceof AbortError) {
92
console.log("Cancelled:", error.message); // "The task has been aborted"
93
console.log("Reason:", error.cause); // Custom timeout error
94
}
95
}
96
}
97
```
98
99
### AbortSignal Types
100
101
Support for both DOM-style and Node.js-style abort signals.
102
103
```typescript { .api }
104
/**
105
* DOM-style abort signal (AbortController.signal)
106
*/
107
interface AbortSignalEventTarget {
108
addEventListener(
109
name: 'abort',
110
listener: () => void,
111
options?: { once: boolean }
112
): void;
113
removeEventListener(name: 'abort', listener: () => void): void;
114
readonly aborted?: boolean;
115
readonly reason?: unknown;
116
}
117
118
/**
119
* Node.js EventEmitter-style abort signal
120
*/
121
interface AbortSignalEventEmitter {
122
off(name: 'abort', listener: () => void): void;
123
once(name: 'abort', listener: () => void): void;
124
}
125
```
126
127
### Abort Signal Utilities
128
129
Utility functions for working with different abort signal types.
130
131
```typescript { .api }
132
/**
133
* Attach abort listener to any abort signal type
134
* @param abortSignal - AbortSignal to listen to
135
* @param listener - Function to call on abort
136
*/
137
function onabort(abortSignal: AbortSignalAny, listener: () => void): void;
138
```
139
140
**Usage Examples:**
141
142
```typescript
143
import { onabort } from "piscina";
144
145
// Works with AbortController
146
const controller = new AbortController();
147
onabort(controller.signal, () => {
148
console.log("DOM-style signal aborted");
149
});
150
151
// Works with EventEmitter-style signals
152
import { EventEmitter } from "events";
153
const emitter = new EventEmitter();
154
onabort(emitter as any, () => {
155
console.log("EventEmitter-style signal aborted");
156
});
157
158
// Trigger abort
159
controller.abort();
160
emitter.emit('abort');
161
```
162
163
### Task Cancellation Scenarios
164
165
#### Timeout-Based Cancellation
166
167
```typescript
168
import Piscina from "piscina";
169
170
const pool = new Piscina({ filename: "worker.js" });
171
172
/**
173
* Run task with timeout
174
* @param task - Task data
175
* @param timeoutMs - Timeout in milliseconds
176
*/
177
async function runWithTimeout(task: any, timeoutMs: number) {
178
const controller = new AbortController();
179
180
const timeoutId = setTimeout(() => {
181
controller.abort(`Task timeout after ${timeoutMs}ms`);
182
}, timeoutMs);
183
184
try {
185
const result = await pool.run(task, { signal: controller.signal });
186
clearTimeout(timeoutId);
187
return result;
188
} catch (error) {
189
clearTimeout(timeoutId);
190
throw error;
191
}
192
}
193
194
// Usage
195
try {
196
const result = await runWithTimeout({ operation: "compute" }, 5000);
197
console.log("Completed within timeout:", result);
198
} catch (error) {
199
if (error.name === 'AbortError') {
200
console.log("Task timed out");
201
}
202
}
203
```
204
205
#### User-Initiated Cancellation
206
207
```typescript
208
import Piscina from "piscina";
209
210
const pool = new Piscina({ filename: "worker.js" });
211
212
class CancellableTaskManager {
213
private controllers = new Map<string, AbortController>();
214
215
async startTask(taskId: string, taskData: any): Promise<any> {
216
const controller = new AbortController();
217
this.controllers.set(taskId, controller);
218
219
try {
220
const result = await pool.run(taskData, { signal: controller.signal });
221
this.controllers.delete(taskId);
222
return result;
223
} catch (error) {
224
this.controllers.delete(taskId);
225
throw error;
226
}
227
}
228
229
cancelTask(taskId: string, reason = "User cancelled"): boolean {
230
const controller = this.controllers.get(taskId);
231
if (controller) {
232
controller.abort(reason);
233
return true;
234
}
235
return false;
236
}
237
238
cancelAllTasks(reason = "Shutdown"): void {
239
for (const [taskId, controller] of this.controllers) {
240
controller.abort(reason);
241
}
242
this.controllers.clear();
243
}
244
}
245
246
// Usage
247
const taskManager = new CancellableTaskManager();
248
249
// Start multiple tasks
250
const task1 = taskManager.startTask("task1", { operation: "compute1" });
251
const task2 = taskManager.startTask("task2", { operation: "compute2" });
252
253
// Cancel specific task
254
setTimeout(() => {
255
taskManager.cancelTask("task1", "No longer needed");
256
}, 2000);
257
258
// Handle results
259
Promise.allSettled([task1, task2]).then(results => {
260
results.forEach((result, index) => {
261
if (result.status === 'fulfilled') {
262
console.log(`Task ${index + 1} completed:`, result.value);
263
} else {
264
console.log(`Task ${index + 1} failed:`, result.reason.message);
265
}
266
});
267
});
268
```
269
270
#### Graceful Shutdown
271
272
```typescript
273
import Piscina from "piscina";
274
275
const pool = new Piscina({ filename: "worker.js" });
276
277
class GracefulTaskPool {
278
private shutdownController = new AbortController();
279
private activeTasks = new Set<Promise<any>>();
280
281
async run(task: any, options: any = {}): Promise<any> {
282
if (this.shutdownController.signal.aborted) {
283
throw new AbortError("Pool is shutting down");
284
}
285
286
// Combine user signal with shutdown signal
287
let combinedSignal = this.shutdownController.signal;
288
if (options.signal) {
289
combinedSignal = this.combineSignals(options.signal, this.shutdownController.signal);
290
}
291
292
const taskPromise = pool.run(task, { ...options, signal: combinedSignal });
293
this.activeTasks.add(taskPromise);
294
295
taskPromise.finally(() => {
296
this.activeTasks.delete(taskPromise);
297
});
298
299
return taskPromise;
300
}
301
302
async shutdown(timeoutMs = 30000): Promise<void> {
303
console.log(`Shutting down pool with ${this.activeTasks.size} active tasks`);
304
305
// Signal all tasks to abort
306
this.shutdownController.abort("Pool shutdown");
307
308
// Wait for tasks to complete or timeout
309
const timeoutPromise = new Promise((_, reject) => {
310
setTimeout(() => reject(new Error("Shutdown timeout")), timeoutMs);
311
});
312
313
try {
314
await Promise.race([
315
Promise.allSettled(Array.from(this.activeTasks)),
316
timeoutPromise
317
]);
318
} catch (error) {
319
console.warn("Forced shutdown due to timeout");
320
}
321
322
await pool.close({ force: true });
323
}
324
325
private combineSignals(signal1: AbortSignalAny, signal2: AbortSignalAny): AbortSignalAny {
326
const controller = new AbortController();
327
328
const abortHandler = () => controller.abort();
329
330
if ('addEventListener' in signal1) {
331
signal1.addEventListener('abort', abortHandler, { once: true });
332
} else {
333
signal1.once('abort', abortHandler);
334
}
335
336
if ('addEventListener' in signal2) {
337
signal2.addEventListener('abort', abortHandler, { once: true });
338
} else {
339
signal2.once('abort', abortHandler);
340
}
341
342
return controller.signal;
343
}
344
}
345
346
// Usage
347
const taskPool = new GracefulTaskPool();
348
349
// Handle shutdown signal
350
process.on('SIGINT', async () => {
351
console.log('Received SIGINT, shutting down gracefully...');
352
await taskPool.shutdown();
353
process.exit(0);
354
});
355
356
// Run tasks
357
taskPool.run({ operation: "longTask" }).catch(error => {
358
if (error.name === 'AbortError') {
359
console.log("Task cancelled during shutdown");
360
}
361
});
362
```
363
364
### Worker-Side Cancellation
365
366
Workers can check for cancellation and perform cleanup.
367
368
**Worker file (worker.js):**
369
370
```javascript
371
const { isMainThread, parentPort } = require('worker_threads');
372
373
// Worker function
374
module.exports = async function(data) {
375
const { operation, iterations = 1000000 } = data;
376
377
if (operation === 'cancellableCompute') {
378
let result = 0;
379
380
for (let i = 0; i < iterations; i++) {
381
// Perform computation
382
result += Math.random();
383
384
// Check for cancellation periodically (every 1000 iterations)
385
if (i % 1000 === 0) {
386
// In a real scenario, you might check a shared flag or message
387
// For demonstration, we'll just yield control
388
await new Promise(resolve => setImmediate(resolve));
389
}
390
}
391
392
return { result, iterations };
393
}
394
395
if (operation === 'longRunning') {
396
// Simulate long-running task with periodic checks
397
const startTime = Date.now();
398
const duration = data.duration || 5000;
399
400
while (Date.now() - startTime < duration) {
401
// Do work...
402
await new Promise(resolve => setTimeout(resolve, 100));
403
404
// Worker is terminated externally if task is cancelled
405
// No need for explicit cancellation checks in this case
406
}
407
408
return { completed: true, actualDuration: Date.now() - startTime };
409
}
410
411
return { error: "Unknown operation" };
412
};
413
```
414
415
### Cancellation Best Practices
416
417
#### Error Handling
418
419
```typescript
420
import Piscina from "piscina";
421
422
const pool = new Piscina({ filename: "worker.js" });
423
424
async function robustTaskExecution(task: any, signal?: AbortSignalAny) {
425
try {
426
const result = await pool.run(task, { signal });
427
return { success: true, data: result };
428
} catch (error) {
429
if (error.name === 'AbortError') {
430
return { success: false, cancelled: true, reason: error.cause };
431
} else {
432
return { success: false, cancelled: false, error: error.message };
433
}
434
}
435
}
436
437
// Usage with different outcomes
438
const controller = new AbortController();
439
440
// Task that completes normally
441
const result1 = await robustTaskExecution({ operation: "fast" });
442
console.log(result1); // { success: true, data: { ... } }
443
444
// Task that gets cancelled
445
setTimeout(() => controller.abort("Timeout"), 100);
446
const result2 = await robustTaskExecution(
447
{ operation: "slow" },
448
controller.signal
449
);
450
console.log(result2); // { success: false, cancelled: true, reason: "Timeout" }
451
452
// Task that fails with error
453
const result3 = await robustTaskExecution({ operation: "invalid" });
454
console.log(result3); // { success: false, cancelled: false, error: "..." }
455
```
456
457
#### Resource Cleanup
458
459
```typescript
460
import Piscina from "piscina";
461
462
const pool = new Piscina({ filename: "worker.js" });
463
464
class ResourceManager {
465
private resources = new Map<string, any>();
466
467
async runWithResources(taskId: string, task: any, signal?: AbortSignalAny) {
468
// Allocate resources
469
const resource = await this.allocateResource(taskId);
470
471
try {
472
const result = await pool.run(
473
{ ...task, resourceId: taskId },
474
{ signal }
475
);
476
return result;
477
} catch (error) {
478
// Cleanup on any error (including cancellation)
479
await this.cleanupResource(taskId);
480
throw error;
481
} finally {
482
// Always cleanup
483
await this.cleanupResource(taskId);
484
}
485
}
486
487
private async allocateResource(id: string): Promise<any> {
488
const resource = { id, allocated: Date.now() };
489
this.resources.set(id, resource);
490
return resource;
491
}
492
493
private async cleanupResource(id: string): Promise<void> {
494
const resource = this.resources.get(id);
495
if (resource) {
496
console.log(`Cleaning up resource ${id}`);
497
this.resources.delete(id);
498
// Perform actual cleanup...
499
}
500
}
501
}
502
503
// Usage
504
const resourceManager = new ResourceManager();
505
const controller = new AbortController();
506
507
try {
508
await resourceManager.runWithResources(
509
"task1",
510
{ operation: "useResource" },
511
controller.signal
512
);
513
} catch (error) {
514
// Resources are cleaned up automatically
515
console.log("Task failed or cancelled, but resources cleaned up");
516
}
517
```
518
519
### Integration with Pool Events
520
521
Monitor cancellation-related events at the pool level.
522
523
```typescript
524
import Piscina from "piscina";
525
526
const pool = new Piscina({ filename: "worker.js" });
527
528
// Monitor worker destruction (may indicate cancellation)
529
pool.on('workerDestroy', (worker) => {
530
console.log(`Worker ${worker.id} destroyed (possibly due to cancellation)`);
531
});
532
533
// Monitor errors (may include abort errors)
534
pool.on('error', (error) => {
535
if (error.name === 'AbortError') {
536
console.log('Pool-level abort error:', error.message);
537
}
538
});
539
540
// Run cancellable tasks
541
const controller = new AbortController();
542
543
setTimeout(() => {
544
controller.abort("Demo cancellation");
545
}, 1000);
546
547
try {
548
await pool.run(
549
{ operation: "longRunning", duration: 5000 },
550
{ signal: controller.signal }
551
);
552
} catch (error) {
553
console.log("Expected cancellation:", error.name);
554
}
555
```