A fast, efficient Node.js Worker Thread Pool implementation
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.
Piscina supports standard AbortController/AbortSignal patterns for task cancellation.
interface RunOptions {
/** Abort signal for task cancellation */
signal?: AbortSignalAny | null;
}
/**
* Union type supporting both DOM and Node.js abort signal patterns
*/
type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;Usage Examples:
import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// Basic cancellation with AbortController
const controller = new AbortController();
const taskPromise = pool.run(
{ operation: "longRunning", duration: 10000 },
{ signal: controller.signal }
);
// Cancel after 2 seconds
setTimeout(() => {
controller.abort("Task timeout");
}, 2000);
try {
const result = await taskPromise;
console.log("Task completed:", result);
} catch (error) {
if (error.name === 'AbortError') {
console.log("Task was cancelled:", error.cause);
} else {
console.error("Task failed:", error);
}
}Error thrown when tasks are cancelled via abort signals.
/**
* Error thrown when a task is aborted
*/
class AbortError extends Error {
/**
* Create abort error with optional reason
* @param reason - Cancellation reason from AbortSignal
*/
constructor(reason?: AbortSignalEventTarget['reason']);
/** Error name is always 'AbortError' */
readonly name: 'AbortError';
}Usage Examples:
import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
async function cancellableTask() {
const controller = new AbortController();
// Cancel with custom reason
setTimeout(() => {
controller.abort(new Error("Custom timeout"));
}, 1000);
try {
await pool.run({ task: "slow" }, { signal: controller.signal });
} catch (error) {
if (error instanceof AbortError) {
console.log("Cancelled:", error.message); // "The task has been aborted"
console.log("Reason:", error.cause); // Custom timeout error
}
}
}Support for both DOM-style and Node.js-style abort signals.
/**
* DOM-style abort signal (AbortController.signal)
*/
interface AbortSignalEventTarget {
addEventListener(
name: 'abort',
listener: () => void,
options?: { once: boolean }
): void;
removeEventListener(name: 'abort', listener: () => void): void;
readonly aborted?: boolean;
readonly reason?: unknown;
}
/**
* Node.js EventEmitter-style abort signal
*/
interface AbortSignalEventEmitter {
off(name: 'abort', listener: () => void): void;
once(name: 'abort', listener: () => void): void;
}Utility functions for working with different abort signal types.
/**
* Attach abort listener to any abort signal type
* @param abortSignal - AbortSignal to listen to
* @param listener - Function to call on abort
*/
function onabort(abortSignal: AbortSignalAny, listener: () => void): void;Usage Examples:
import { onabort } from "piscina";
// Works with AbortController
const controller = new AbortController();
onabort(controller.signal, () => {
console.log("DOM-style signal aborted");
});
// Works with EventEmitter-style signals
import { EventEmitter } from "events";
const emitter = new EventEmitter();
onabort(emitter as any, () => {
console.log("EventEmitter-style signal aborted");
});
// Trigger abort
controller.abort();
emitter.emit('abort');import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
/**
* Run task with timeout
* @param task - Task data
* @param timeoutMs - Timeout in milliseconds
*/
async function runWithTimeout(task: any, timeoutMs: number) {
const controller = new AbortController();
const timeoutId = setTimeout(() => {
controller.abort(`Task timeout after ${timeoutMs}ms`);
}, timeoutMs);
try {
const result = await pool.run(task, { signal: controller.signal });
clearTimeout(timeoutId);
return result;
} catch (error) {
clearTimeout(timeoutId);
throw error;
}
}
// Usage
try {
const result = await runWithTimeout({ operation: "compute" }, 5000);
console.log("Completed within timeout:", result);
} catch (error) {
if (error.name === 'AbortError') {
console.log("Task timed out");
}
}import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
class CancellableTaskManager {
private controllers = new Map<string, AbortController>();
async startTask(taskId: string, taskData: any): Promise<any> {
const controller = new AbortController();
this.controllers.set(taskId, controller);
try {
const result = await pool.run(taskData, { signal: controller.signal });
this.controllers.delete(taskId);
return result;
} catch (error) {
this.controllers.delete(taskId);
throw error;
}
}
cancelTask(taskId: string, reason = "User cancelled"): boolean {
const controller = this.controllers.get(taskId);
if (controller) {
controller.abort(reason);
return true;
}
return false;
}
cancelAllTasks(reason = "Shutdown"): void {
for (const [taskId, controller] of this.controllers) {
controller.abort(reason);
}
this.controllers.clear();
}
}
// Usage
const taskManager = new CancellableTaskManager();
// Start multiple tasks
const task1 = taskManager.startTask("task1", { operation: "compute1" });
const task2 = taskManager.startTask("task2", { operation: "compute2" });
// Cancel specific task
setTimeout(() => {
taskManager.cancelTask("task1", "No longer needed");
}, 2000);
// Handle results
Promise.allSettled([task1, task2]).then(results => {
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Task ${index + 1} completed:`, result.value);
} else {
console.log(`Task ${index + 1} failed:`, result.reason.message);
}
});
});import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
class GracefulTaskPool {
private shutdownController = new AbortController();
private activeTasks = new Set<Promise<any>>();
async run(task: any, options: any = {}): Promise<any> {
if (this.shutdownController.signal.aborted) {
throw new AbortError("Pool is shutting down");
}
// Combine user signal with shutdown signal
let combinedSignal = this.shutdownController.signal;
if (options.signal) {
combinedSignal = this.combineSignals(options.signal, this.shutdownController.signal);
}
const taskPromise = pool.run(task, { ...options, signal: combinedSignal });
this.activeTasks.add(taskPromise);
taskPromise.finally(() => {
this.activeTasks.delete(taskPromise);
});
return taskPromise;
}
async shutdown(timeoutMs = 30000): Promise<void> {
console.log(`Shutting down pool with ${this.activeTasks.size} active tasks`);
// Signal all tasks to abort
this.shutdownController.abort("Pool shutdown");
// Wait for tasks to complete or timeout
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error("Shutdown timeout")), timeoutMs);
});
try {
await Promise.race([
Promise.allSettled(Array.from(this.activeTasks)),
timeoutPromise
]);
} catch (error) {
console.warn("Forced shutdown due to timeout");
}
await pool.close({ force: true });
}
private combineSignals(signal1: AbortSignalAny, signal2: AbortSignalAny): AbortSignalAny {
const controller = new AbortController();
const abortHandler = () => controller.abort();
if ('addEventListener' in signal1) {
signal1.addEventListener('abort', abortHandler, { once: true });
} else {
signal1.once('abort', abortHandler);
}
if ('addEventListener' in signal2) {
signal2.addEventListener('abort', abortHandler, { once: true });
} else {
signal2.once('abort', abortHandler);
}
return controller.signal;
}
}
// Usage
const taskPool = new GracefulTaskPool();
// Handle shutdown signal
process.on('SIGINT', async () => {
console.log('Received SIGINT, shutting down gracefully...');
await taskPool.shutdown();
process.exit(0);
});
// Run tasks
taskPool.run({ operation: "longTask" }).catch(error => {
if (error.name === 'AbortError') {
console.log("Task cancelled during shutdown");
}
});Workers can check for cancellation and perform cleanup.
Worker file (worker.js):
const { isMainThread, parentPort } = require('worker_threads');
// Worker function
module.exports = async function(data) {
const { operation, iterations = 1000000 } = data;
if (operation === 'cancellableCompute') {
let result = 0;
for (let i = 0; i < iterations; i++) {
// Perform computation
result += Math.random();
// Check for cancellation periodically (every 1000 iterations)
if (i % 1000 === 0) {
// In a real scenario, you might check a shared flag or message
// For demonstration, we'll just yield control
await new Promise(resolve => setImmediate(resolve));
}
}
return { result, iterations };
}
if (operation === 'longRunning') {
// Simulate long-running task with periodic checks
const startTime = Date.now();
const duration = data.duration || 5000;
while (Date.now() - startTime < duration) {
// Do work...
await new Promise(resolve => setTimeout(resolve, 100));
// Worker is terminated externally if task is cancelled
// No need for explicit cancellation checks in this case
}
return { completed: true, actualDuration: Date.now() - startTime };
}
return { error: "Unknown operation" };
};import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
async function robustTaskExecution(task: any, signal?: AbortSignalAny) {
try {
const result = await pool.run(task, { signal });
return { success: true, data: result };
} catch (error) {
if (error.name === 'AbortError') {
return { success: false, cancelled: true, reason: error.cause };
} else {
return { success: false, cancelled: false, error: error.message };
}
}
}
// Usage with different outcomes
const controller = new AbortController();
// Task that completes normally
const result1 = await robustTaskExecution({ operation: "fast" });
console.log(result1); // { success: true, data: { ... } }
// Task that gets cancelled
setTimeout(() => controller.abort("Timeout"), 100);
const result2 = await robustTaskExecution(
{ operation: "slow" },
controller.signal
);
console.log(result2); // { success: false, cancelled: true, reason: "Timeout" }
// Task that fails with error
const result3 = await robustTaskExecution({ operation: "invalid" });
console.log(result3); // { success: false, cancelled: false, error: "..." }import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
class ResourceManager {
private resources = new Map<string, any>();
async runWithResources(taskId: string, task: any, signal?: AbortSignalAny) {
// Allocate resources
const resource = await this.allocateResource(taskId);
try {
const result = await pool.run(
{ ...task, resourceId: taskId },
{ signal }
);
return result;
} catch (error) {
// Cleanup on any error (including cancellation)
await this.cleanupResource(taskId);
throw error;
} finally {
// Always cleanup
await this.cleanupResource(taskId);
}
}
private async allocateResource(id: string): Promise<any> {
const resource = { id, allocated: Date.now() };
this.resources.set(id, resource);
return resource;
}
private async cleanupResource(id: string): Promise<void> {
const resource = this.resources.get(id);
if (resource) {
console.log(`Cleaning up resource ${id}`);
this.resources.delete(id);
// Perform actual cleanup...
}
}
}
// Usage
const resourceManager = new ResourceManager();
const controller = new AbortController();
try {
await resourceManager.runWithResources(
"task1",
{ operation: "useResource" },
controller.signal
);
} catch (error) {
// Resources are cleaned up automatically
console.log("Task failed or cancelled, but resources cleaned up");
}Monitor cancellation-related events at the pool level.
import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// Monitor worker destruction (may indicate cancellation)
pool.on('workerDestroy', (worker) => {
console.log(`Worker ${worker.id} destroyed (possibly due to cancellation)`);
});
// Monitor errors (may include abort errors)
pool.on('error', (error) => {
if (error.name === 'AbortError') {
console.log('Pool-level abort error:', error.message);
}
});
// Run cancellable tasks
const controller = new AbortController();
setTimeout(() => {
controller.abort("Demo cancellation");
}, 1000);
try {
await pool.run(
{ operation: "longRunning", duration: 5000 },
{ signal: controller.signal }
);
} catch (error) {
console.log("Expected cancellation:", error.name);
}Install with Tessl CLI
npx tessl i tessl/npm-piscina