A fast, efficient Node.js Worker Thread Pool implementation
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.
Base interface that all task queue implementations must satisfy.
/**
* Interface for task queue implementations
*/
interface TaskQueue {
/** Current number of tasks in the queue */
readonly size: number;
/**
* Remove and return the first task from the queue
* @returns First task or null if queue is empty
*/
shift(): Task | null;
/**
* Remove a specific task from the queue
* @param task - Task to remove
*/
remove(task: Task): void;
/**
* Add a task to the end of the queue
* @param task - Task to add
*/
push(task: Task): void;
}
/**
* Base task interface
*/
interface Task {
readonly [queueOptionsSymbol]: object | null;
}Simple array-based FIFO task queue implementation suitable for most use cases.
/**
* Simple array-based FIFO task queue implementation
* Best for: General purpose, variable queue sizes, simple scenarios
* Performance: O(n) for shift operations due to array shifting
*/
class ArrayTaskQueue implements TaskQueue {
readonly size: number;
/**
* Remove and return the first task (FIFO)
* @returns First task or null if empty
*/
shift(): Task | null;
/**
* Add task to end of queue
* @param task - Task to enqueue
*/
push(task: Task): void;
/**
* Remove specific task from queue
* @param task - Task to remove
*/
remove(task: Task): void;
}Usage Examples:
import { Piscina, ArrayTaskQueue } from "piscina";
// Use ArrayTaskQueue explicitly
const pool = new Piscina({
filename: "worker.js",
taskQueue: new ArrayTaskQueue()
});
// ArrayTaskQueue is also the default fallback
const defaultPool = new Piscina({
filename: "worker.js"
// Uses ArrayTaskQueue internally when no custom queue specified
});High-performance fixed-size circular buffer queue implementation optimized for Node.js V8 engine.
/**
* High-performance fixed-size circular buffer queue implementation
* Best for: High-throughput scenarios, predictable memory usage
* Performance: O(1) for all operations
* Based on Node.js internal implementation, optimized for V8
*/
class FixedQueue implements TaskQueue {
readonly size: number;
/**
* Remove and return the first task (FIFO)
* @returns First task or null if empty
*/
shift(): Task | null;
/**
* Add task to end of queue
* @param task - Task to enqueue
*/
push(task: Task): void;
/**
* Remove specific task from queue
* @param task - Task to remove
*/
remove(task: Task): void;
}Usage Examples:
import { Piscina, FixedQueue } from "piscina";
// Use FixedQueue for high-performance scenarios
const highThroughputPool = new Piscina({
filename: "worker.js",
taskQueue: new FixedQueue(),
maxThreads: 16,
concurrentTasksPerWorker: 4
});
// FixedQueue is the default when not specified
const pool = new Piscina({
filename: "worker.js"
// Uses FixedQueue by default
});Utility function to validate task queue implementations.
/**
* Validates that an object implements the TaskQueue interface
* @param value - Object to validate
* @returns True if object is a valid TaskQueue
*/
function isTaskQueue(value: TaskQueue): boolean;Usage Examples:
import { isTaskQueue, ArrayTaskQueue, FixedQueue } from "piscina";
const arrayQueue = new ArrayTaskQueue();
const fixedQueue = new FixedQueue();
const invalidQueue = { size: 0 }; // Missing required methods
console.log(isTaskQueue(arrayQueue)); // true
console.log(isTaskQueue(fixedQueue)); // true
console.log(isTaskQueue(invalidQueue)); // false
// Custom queue validation
class CustomQueue implements TaskQueue {
readonly size = 0;
shift() { return null; }
push(task: Task) { }
remove(task: Task) { }
}
console.log(isTaskQueue(new CustomQueue())); // trueYou can implement custom task queues for specialized scenarios.
/**
* Example custom priority queue implementation
*/
class PriorityTaskQueue implements TaskQueue {
private tasks: (Task & { priority: number })[] = [];
get size(): number {
return this.tasks.length;
}
shift(): Task | null {
if (this.tasks.length === 0) return null;
// Find highest priority task
let highestIndex = 0;
for (let i = 1; i < this.tasks.length; i++) {
if (this.tasks[i].priority > this.tasks[highestIndex].priority) {
highestIndex = i;
}
}
return this.tasks.splice(highestIndex, 1)[0];
}
push(task: Task): void {
// Assume task has priority property
this.tasks.push(task as Task & { priority: number });
}
remove(task: Task): void {
const index = this.tasks.indexOf(task as Task & { priority: number });
if (index !== -1) {
this.tasks.splice(index, 1);
}
}
}Usage Examples:
import { Piscina } from "piscina";
// Custom priority queue usage
const priorityPool = new Piscina({
filename: "worker.js",
taskQueue: new PriorityTaskQueue()
});
// Tasks with priority (requires custom task wrapper)
await priorityPool.run({
data: "urgent task",
[Piscina.queueOptionsSymbol]: { priority: 10 }
});
await priorityPool.run({
data: "normal task",
[Piscina.queueOptionsSymbol]: { priority: 1 }
});Public interface representing queued tasks with metadata.
/**
* Public interface for queued tasks
*/
interface PiscinaTask extends Task {
/** Unique task identifier */
readonly taskId: number;
/** Worker filename for this task */
readonly filename: string;
/** Task name identifier */
readonly name: string;
/** Task creation timestamp */
readonly created: number;
/** Whether task supports cancellation */
readonly isAbortable: boolean;
}isTaskQueue() to verify implementationPerformance Comparison:
import { Piscina, ArrayTaskQueue, FixedQueue } from "piscina";
// For high-throughput scenarios (recommended)
const performancePool = new Piscina({
filename: "worker.js",
taskQueue: new FixedQueue(),
maxThreads: 8,
concurrentTasksPerWorker: 2
});
// For general purpose (simpler but slower at scale)
const generalPool = new Piscina({
filename: "worker.js",
taskQueue: new ArrayTaskQueue(),
maxThreads: 4
});
// Benchmark queues
async function benchmarkQueues() {
const tasks = Array.from({ length: 1000 }, (_, i) => ({ id: i }));
console.time('FixedQueue');
await Promise.all(tasks.map(task => performancePool.run(task)));
console.timeEnd('FixedQueue');
console.time('ArrayTaskQueue');
await Promise.all(tasks.map(task => generalPool.run(task)));
console.timeEnd('ArrayTaskQueue');
}Install with Tessl CLI
npx tessl i tessl/npm-piscina