CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-piscina

A fast, efficient Node.js Worker Thread Pool implementation

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

task-cancellation.mddocs/

Task Cancellation

Comprehensive task cancellation support using AbortController and AbortSignal patterns for graceful task termination and cleanup.

Capabilities

AbortSignal Integration

Piscina supports standard AbortController/AbortSignal patterns for task cancellation.

interface RunOptions {
  /** Abort signal for task cancellation */
  signal?: AbortSignalAny | null;
}

/**
 * Union type supporting both DOM and Node.js abort signal patterns
 */
type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;

Usage Examples:

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// Basic cancellation with AbortController
const controller = new AbortController();
const taskPromise = pool.run(
  { operation: "longRunning", duration: 10000 },
  { signal: controller.signal }
);

// Cancel after 2 seconds
setTimeout(() => {
  controller.abort("Task timeout");
}, 2000);

try {
  const result = await taskPromise;
  console.log("Task completed:", result);
} catch (error) {
  if (error.name === 'AbortError') {
    console.log("Task was cancelled:", error.cause);
  } else {
    console.error("Task failed:", error);
  }
}

AbortError Class

Error thrown when tasks are cancelled via abort signals.

/**
 * Error thrown when a task is aborted
 */
class AbortError extends Error {
  /**
   * Create abort error with optional reason
   * @param reason - Cancellation reason from AbortSignal
   */
  constructor(reason?: AbortSignalEventTarget['reason']);
  
  /** Error name is always 'AbortError' */
  readonly name: 'AbortError';
}

Usage Examples:

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

async function cancellableTask() {
  const controller = new AbortController();
  
  // Cancel with custom reason
  setTimeout(() => {
    controller.abort(new Error("Custom timeout"));
  }, 1000);
  
  try {
    await pool.run({ task: "slow" }, { signal: controller.signal });
  } catch (error) {
    if (error instanceof AbortError) {
      console.log("Cancelled:", error.message); // "The task has been aborted"
      console.log("Reason:", error.cause);      // Custom timeout error
    }
  }
}

AbortSignal Types

Support for both DOM-style and Node.js-style abort signals.

/**
 * DOM-style abort signal (AbortController.signal)
 */
interface AbortSignalEventTarget {
  addEventListener(
    name: 'abort',
    listener: () => void,
    options?: { once: boolean }
  ): void;
  removeEventListener(name: 'abort', listener: () => void): void;
  readonly aborted?: boolean;
  readonly reason?: unknown;
}

/**
 * Node.js EventEmitter-style abort signal
 */
interface AbortSignalEventEmitter {
  off(name: 'abort', listener: () => void): void;
  once(name: 'abort', listener: () => void): void;
}

Abort Signal Utilities

Utility functions for working with different abort signal types.

/**
 * Attach abort listener to any abort signal type
 * @param abortSignal - AbortSignal to listen to
 * @param listener - Function to call on abort
 */
function onabort(abortSignal: AbortSignalAny, listener: () => void): void;

Usage Examples:

import { onabort } from "piscina";

// Works with AbortController
const controller = new AbortController();
onabort(controller.signal, () => {
  console.log("DOM-style signal aborted");
});

// Works with EventEmitter-style signals
import { EventEmitter } from "events";
const emitter = new EventEmitter();
onabort(emitter as any, () => {
  console.log("EventEmitter-style signal aborted");
});

// Trigger abort
controller.abort();
emitter.emit('abort');

Task Cancellation Scenarios

Timeout-Based Cancellation

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

/**
 * Run task with timeout
 * @param task - Task data
 * @param timeoutMs - Timeout in milliseconds
 */
async function runWithTimeout(task: any, timeoutMs: number) {
  const controller = new AbortController();
  
  const timeoutId = setTimeout(() => {
    controller.abort(`Task timeout after ${timeoutMs}ms`);
  }, timeoutMs);
  
  try {
    const result = await pool.run(task, { signal: controller.signal });
    clearTimeout(timeoutId);
    return result;
  } catch (error) {
    clearTimeout(timeoutId);
    throw error;
  }
}

// Usage
try {
  const result = await runWithTimeout({ operation: "compute" }, 5000);
  console.log("Completed within timeout:", result);
} catch (error) {
  if (error.name === 'AbortError') {
    console.log("Task timed out");
  }
}

User-Initiated Cancellation

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

class CancellableTaskManager {
  private controllers = new Map<string, AbortController>();
  
  async startTask(taskId: string, taskData: any): Promise<any> {
    const controller = new AbortController();
    this.controllers.set(taskId, controller);
    
    try {
      const result = await pool.run(taskData, { signal: controller.signal });
      this.controllers.delete(taskId);
      return result;
    } catch (error) {
      this.controllers.delete(taskId);
      throw error;
    }
  }
  
  cancelTask(taskId: string, reason = "User cancelled"): boolean {
    const controller = this.controllers.get(taskId);
    if (controller) {
      controller.abort(reason);
      return true;
    }
    return false;
  }
  
  cancelAllTasks(reason = "Shutdown"): void {
    for (const [taskId, controller] of this.controllers) {
      controller.abort(reason);
    }
    this.controllers.clear();
  }
}

// Usage
const taskManager = new CancellableTaskManager();

// Start multiple tasks
const task1 = taskManager.startTask("task1", { operation: "compute1" });
const task2 = taskManager.startTask("task2", { operation: "compute2" });

// Cancel specific task
setTimeout(() => {
  taskManager.cancelTask("task1", "No longer needed");
}, 2000);

// Handle results
Promise.allSettled([task1, task2]).then(results => {
  results.forEach((result, index) => {
    if (result.status === 'fulfilled') {
      console.log(`Task ${index + 1} completed:`, result.value);
    } else {
      console.log(`Task ${index + 1} failed:`, result.reason.message);
    }
  });
});

Graceful Shutdown

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

class GracefulTaskPool {
  private shutdownController = new AbortController();
  private activeTasks = new Set<Promise<any>>();
  
  async run(task: any, options: any = {}): Promise<any> {
    if (this.shutdownController.signal.aborted) {
      throw new AbortError("Pool is shutting down");
    }
    
    // Combine user signal with shutdown signal
    let combinedSignal = this.shutdownController.signal;
    if (options.signal) {
      combinedSignal = this.combineSignals(options.signal, this.shutdownController.signal);
    }
    
    const taskPromise = pool.run(task, { ...options, signal: combinedSignal });
    this.activeTasks.add(taskPromise);
    
    taskPromise.finally(() => {
      this.activeTasks.delete(taskPromise);
    });
    
    return taskPromise;
  }
  
  async shutdown(timeoutMs = 30000): Promise<void> {
    console.log(`Shutting down pool with ${this.activeTasks.size} active tasks`);
    
    // Signal all tasks to abort
    this.shutdownController.abort("Pool shutdown");
    
    // Wait for tasks to complete or timeout
    const timeoutPromise = new Promise((_, reject) => {
      setTimeout(() => reject(new Error("Shutdown timeout")), timeoutMs);
    });
    
    try {
      await Promise.race([
        Promise.allSettled(Array.from(this.activeTasks)),
        timeoutPromise
      ]);
    } catch (error) {
      console.warn("Forced shutdown due to timeout");
    }
    
    await pool.close({ force: true });
  }
  
  private combineSignals(signal1: AbortSignalAny, signal2: AbortSignalAny): AbortSignalAny {
    const controller = new AbortController();
    
    const abortHandler = () => controller.abort();
    
    if ('addEventListener' in signal1) {
      signal1.addEventListener('abort', abortHandler, { once: true });
    } else {
      signal1.once('abort', abortHandler);
    }
    
    if ('addEventListener' in signal2) {
      signal2.addEventListener('abort', abortHandler, { once: true });
    } else {
      signal2.once('abort', abortHandler);
    }
    
    return controller.signal;
  }
}

// Usage
const taskPool = new GracefulTaskPool();

// Handle shutdown signal
process.on('SIGINT', async () => {
  console.log('Received SIGINT, shutting down gracefully...');
  await taskPool.shutdown();
  process.exit(0);
});

// Run tasks
taskPool.run({ operation: "longTask" }).catch(error => {
  if (error.name === 'AbortError') {
    console.log("Task cancelled during shutdown");
  }
});

Worker-Side Cancellation

Workers can check for cancellation and perform cleanup.

Worker file (worker.js):

const { isMainThread, parentPort } = require('worker_threads');

// Worker function
module.exports = async function(data) {
  const { operation, iterations = 1000000 } = data;
  
  if (operation === 'cancellableCompute') {
    let result = 0;
    
    for (let i = 0; i < iterations; i++) {
      // Perform computation
      result += Math.random();
      
      // Check for cancellation periodically (every 1000 iterations)
      if (i % 1000 === 0) {
        // In a real scenario, you might check a shared flag or message
        // For demonstration, we'll just yield control
        await new Promise(resolve => setImmediate(resolve));
      }
    }
    
    return { result, iterations };
  }
  
  if (operation === 'longRunning') {
    // Simulate long-running task with periodic checks
    const startTime = Date.now();
    const duration = data.duration || 5000;
    
    while (Date.now() - startTime < duration) {
      // Do work...
      await new Promise(resolve => setTimeout(resolve, 100));
      
      // Worker is terminated externally if task is cancelled
      // No need for explicit cancellation checks in this case
    }
    
    return { completed: true, actualDuration: Date.now() - startTime };
  }
  
  return { error: "Unknown operation" };
};

Cancellation Best Practices

Error Handling

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

async function robustTaskExecution(task: any, signal?: AbortSignalAny) {
  try {
    const result = await pool.run(task, { signal });
    return { success: true, data: result };
  } catch (error) {
    if (error.name === 'AbortError') {
      return { success: false, cancelled: true, reason: error.cause };
    } else {
      return { success: false, cancelled: false, error: error.message };
    }
  }
}

// Usage with different outcomes
const controller = new AbortController();

// Task that completes normally
const result1 = await robustTaskExecution({ operation: "fast" });
console.log(result1); // { success: true, data: { ... } }

// Task that gets cancelled
setTimeout(() => controller.abort("Timeout"), 100);
const result2 = await robustTaskExecution(
  { operation: "slow" },
  controller.signal
);
console.log(result2); // { success: false, cancelled: true, reason: "Timeout" }

// Task that fails with error
const result3 = await robustTaskExecution({ operation: "invalid" });
console.log(result3); // { success: false, cancelled: false, error: "..." }

Resource Cleanup

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

class ResourceManager {
  private resources = new Map<string, any>();
  
  async runWithResources(taskId: string, task: any, signal?: AbortSignalAny) {
    // Allocate resources
    const resource = await this.allocateResource(taskId);
    
    try {
      const result = await pool.run(
        { ...task, resourceId: taskId },
        { signal }
      );
      return result;
    } catch (error) {
      // Cleanup on any error (including cancellation)
      await this.cleanupResource(taskId);
      throw error;
    } finally {
      // Always cleanup
      await this.cleanupResource(taskId);
    }
  }
  
  private async allocateResource(id: string): Promise<any> {
    const resource = { id, allocated: Date.now() };
    this.resources.set(id, resource);
    return resource;
  }
  
  private async cleanupResource(id: string): Promise<void> {
    const resource = this.resources.get(id);
    if (resource) {
      console.log(`Cleaning up resource ${id}`);
      this.resources.delete(id);
      // Perform actual cleanup...
    }
  }
}

// Usage
const resourceManager = new ResourceManager();
const controller = new AbortController();

try {
  await resourceManager.runWithResources(
    "task1",
    { operation: "useResource" },
    controller.signal
  );
} catch (error) {
  // Resources are cleaned up automatically
  console.log("Task failed or cancelled, but resources cleaned up");
}

Integration with Pool Events

Monitor cancellation-related events at the pool level.

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// Monitor worker destruction (may indicate cancellation)
pool.on('workerDestroy', (worker) => {
  console.log(`Worker ${worker.id} destroyed (possibly due to cancellation)`);
});

// Monitor errors (may include abort errors)
pool.on('error', (error) => {
  if (error.name === 'AbortError') {
    console.log('Pool-level abort error:', error.message);
  }
});

// Run cancellable tasks
const controller = new AbortController();

setTimeout(() => {
  controller.abort("Demo cancellation");
}, 1000);

try {
  await pool.run(
    { operation: "longRunning", duration: 5000 },
    { signal: controller.signal }
  );
} catch (error) {
  console.log("Expected cancellation:", error.name);
}

Install with Tessl CLI

npx tessl i tessl/npm-piscina

docs

index.md

load-balancing.md

performance-monitoring.md

pool-management.md

task-cancellation.md

task-queues.md

transferable-objects.md

tile.json