CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-piscina

A fast, efficient Node.js Worker Thread Pool implementation

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

task-queues.mddocs/

Task Queues

Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.

Capabilities

TaskQueue Interface

Base interface that all task queue implementations must satisfy.

/**
 * Interface for task queue implementations
 */
interface TaskQueue {
  /** Current number of tasks in the queue */
  readonly size: number;
  
  /**
   * Remove and return the first task from the queue
   * @returns First task or null if queue is empty
   */
  shift(): Task | null;
  
  /**
   * Remove a specific task from the queue
   * @param task - Task to remove
   */
  remove(task: Task): void;
  
  /**
   * Add a task to the end of the queue
   * @param task - Task to add
   */
  push(task: Task): void;
}

/**
 * Base task interface
 */
interface Task {
  readonly [queueOptionsSymbol]: object | null;
}

ArrayTaskQueue

Simple array-based FIFO task queue implementation suitable for most use cases.

/**
 * Simple array-based FIFO task queue implementation
 * Best for: General purpose, variable queue sizes, simple scenarios
 * Performance: O(n) for shift operations due to array shifting
 */
class ArrayTaskQueue implements TaskQueue {
  readonly size: number;
  
  /**
   * Remove and return the first task (FIFO)
   * @returns First task or null if empty
   */
  shift(): Task | null;
  
  /**
   * Add task to end of queue
   * @param task - Task to enqueue
   */
  push(task: Task): void;
  
  /**
   * Remove specific task from queue
   * @param task - Task to remove
   */
  remove(task: Task): void;
}

Usage Examples:

import { Piscina, ArrayTaskQueue } from "piscina";

// Use ArrayTaskQueue explicitly
const pool = new Piscina({
  filename: "worker.js",
  taskQueue: new ArrayTaskQueue()
});

// ArrayTaskQueue is also the default fallback
const defaultPool = new Piscina({
  filename: "worker.js"
  // Uses ArrayTaskQueue internally when no custom queue specified
});

FixedQueue

High-performance fixed-size circular buffer queue implementation optimized for Node.js V8 engine.

/**
 * High-performance fixed-size circular buffer queue implementation
 * Best for: High-throughput scenarios, predictable memory usage
 * Performance: O(1) for all operations
 * Based on Node.js internal implementation, optimized for V8
 */
class FixedQueue implements TaskQueue {
  readonly size: number;
  
  /**
   * Remove and return the first task (FIFO)
   * @returns First task or null if empty
   */
  shift(): Task | null;
  
  /**
   * Add task to end of queue
   * @param task - Task to enqueue
   */
  push(task: Task): void;
  
  /**
   * Remove specific task from queue
   * @param task - Task to remove
   */
  remove(task: Task): void;
}

Usage Examples:

import { Piscina, FixedQueue } from "piscina";

// Use FixedQueue for high-performance scenarios
const highThroughputPool = new Piscina({
  filename: "worker.js",
  taskQueue: new FixedQueue(),
  maxThreads: 16,
  concurrentTasksPerWorker: 4
});

// FixedQueue is the default when not specified
const pool = new Piscina({
  filename: "worker.js"
  // Uses FixedQueue by default
});

Queue Validation

Utility function to validate task queue implementations.

/**
 * Validates that an object implements the TaskQueue interface
 * @param value - Object to validate
 * @returns True if object is a valid TaskQueue
 */
function isTaskQueue(value: TaskQueue): boolean;

Usage Examples:

import { isTaskQueue, ArrayTaskQueue, FixedQueue } from "piscina";

const arrayQueue = new ArrayTaskQueue();
const fixedQueue = new FixedQueue();
const invalidQueue = { size: 0 }; // Missing required methods

console.log(isTaskQueue(arrayQueue)); // true
console.log(isTaskQueue(fixedQueue)); // true
console.log(isTaskQueue(invalidQueue)); // false

// Custom queue validation
class CustomQueue implements TaskQueue {
  readonly size = 0;
  shift() { return null; }
  push(task: Task) { }
  remove(task: Task) { }
}

console.log(isTaskQueue(new CustomQueue())); // true

Custom Queue Implementation

You can implement custom task queues for specialized scenarios.

/**
 * Example custom priority queue implementation
 */
class PriorityTaskQueue implements TaskQueue {
  private tasks: (Task & { priority: number })[] = [];
  
  get size(): number {
    return this.tasks.length;
  }
  
  shift(): Task | null {
    if (this.tasks.length === 0) return null;
    
    // Find highest priority task
    let highestIndex = 0;
    for (let i = 1; i < this.tasks.length; i++) {
      if (this.tasks[i].priority > this.tasks[highestIndex].priority) {
        highestIndex = i;
      }
    }
    
    return this.tasks.splice(highestIndex, 1)[0];
  }
  
  push(task: Task): void {
    // Assume task has priority property
    this.tasks.push(task as Task & { priority: number });
  }
  
  remove(task: Task): void {
    const index = this.tasks.indexOf(task as Task & { priority: number });
    if (index !== -1) {
      this.tasks.splice(index, 1);
    }
  }
}

Usage Examples:

import { Piscina } from "piscina";

// Custom priority queue usage
const priorityPool = new Piscina({
  filename: "worker.js",
  taskQueue: new PriorityTaskQueue()
});

// Tasks with priority (requires custom task wrapper)
await priorityPool.run({ 
  data: "urgent task", 
  [Piscina.queueOptionsSymbol]: { priority: 10 }
});

await priorityPool.run({ 
  data: "normal task", 
  [Piscina.queueOptionsSymbol]: { priority: 1 }
});

PiscinaTask Interface

Public interface representing queued tasks with metadata.

/**
 * Public interface for queued tasks
 */
interface PiscinaTask extends Task {
  /** Unique task identifier */
  readonly taskId: number;
  /** Worker filename for this task */
  readonly filename: string;
  /** Task name identifier */
  readonly name: string;
  /** Task creation timestamp */
  readonly created: number;
  /** Whether task supports cancellation */
  readonly isAbortable: boolean;
}

Queue Selection Guidelines

ArrayTaskQueue

  • Best for: General purpose applications, variable queue sizes
  • Pros: Simple implementation, dynamic sizing, familiar array operations
  • Cons: O(n) shift operations can impact performance with large queues
  • Use when: Queue sizes are typically small to medium, or simplicity is preferred

FixedQueue

  • Best for: High-throughput applications, performance-critical scenarios
  • Pros: O(1) operations, optimized memory usage, V8-optimized implementation
  • Cons: More complex internal structure
  • Use when: Performance is critical, predictable memory usage needed

Custom Queue

  • Best for: Specialized requirements like priority queues, delayed execution
  • Implementation: Must implement TaskQueue interface completely
  • Validation: Use isTaskQueue() to verify implementation

Performance Comparison:

import { Piscina, ArrayTaskQueue, FixedQueue } from "piscina";

// For high-throughput scenarios (recommended)
const performancePool = new Piscina({
  filename: "worker.js",
  taskQueue: new FixedQueue(),
  maxThreads: 8,
  concurrentTasksPerWorker: 2
});

// For general purpose (simpler but slower at scale)
const generalPool = new Piscina({
  filename: "worker.js",
  taskQueue: new ArrayTaskQueue(),
  maxThreads: 4
});

// Benchmark queues
async function benchmarkQueues() {
  const tasks = Array.from({ length: 1000 }, (_, i) => ({ id: i }));
  
  console.time('FixedQueue');
  await Promise.all(tasks.map(task => performancePool.run(task)));
  console.timeEnd('FixedQueue');
  
  console.time('ArrayTaskQueue');
  await Promise.all(tasks.map(task => generalPool.run(task)));
  console.timeEnd('ArrayTaskQueue');
}

Install with Tessl CLI

npx tessl i tessl/npm-piscina

docs

index.md

load-balancing.md

performance-monitoring.md

pool-management.md

task-cancellation.md

task-queues.md

transferable-objects.md

tile.json