A fast, efficient Node.js Worker Thread Pool implementation
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Efficient data transfer mechanisms using structured cloning and transferable objects for optimal performance with large data sets.
Primary utility for marking objects as transferable to optimize data transfer between main thread and workers.
/**
* Mark objects for efficient transfer to worker threads
* Transfers ownership instead of copying data
* @param val - Object to transfer (ArrayBuffer, MessagePort, etc.)
* @returns Wrapped transferable object
*/
function move(
val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
): any;Usage Examples:
import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// Transfer ArrayBuffer efficiently
const buffer = new ArrayBuffer(1024 * 1024); // 1MB buffer
const transferableBuffer = Piscina.move(buffer);
await pool.run({
data: transferableBuffer,
operation: "processLargeData"
});
// buffer is now unusable in main thread (ownership transferred)
console.log(buffer.byteLength); // 0 - buffer is detached
// Transfer typed arrays
const uint8Array = new Uint8Array(1000);
uint8Array.fill(42);
await pool.run({
data: Piscina.move(uint8Array), // Transfers underlying ArrayBuffer
operation: "processTypedArray"
});
// Transfer multiple objects
const buffer1 = new ArrayBuffer(512);
const buffer2 = new ArrayBuffer(256);
await pool.run({
buffers: [Piscina.move(buffer1), Piscina.move(buffer2)],
operation: "processMultipleBuffers"
});Interface for objects that can be efficiently transferred between threads.
/**
* Interface for transferable objects
*/
interface Transferable {
/** Internal transferable object reference */
readonly [transferableSymbol]: object;
/** Internal value object reference */
readonly [valueSymbol]: object;
}Type definitions for transferable object collections.
/**
* Array of transferable objects
* Extracted from MessagePort.postMessage signature
*/
type TransferList = MessagePort extends {
postMessage: (value: any, transferList: infer T) => any;
} ? T : never;
/**
* Individual transferable object type
*/
type TransferListItem = TransferList extends Array<infer T> ? T : never;Use transfer lists directly in run options for fine-grained control.
interface RunOptions {
/** Objects to transfer ownership to worker (for performance) */
transferList?: TransferList;
}Usage Examples:
import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// Manual transfer list (without Piscina.move)
const buffer = new ArrayBuffer(2048);
const result = await pool.run(
{ data: buffer, operation: "process" },
{ transferList: [buffer] }
);
// Mixed approach: some moved, some in transfer list
const buffer1 = new ArrayBuffer(1024);
const buffer2 = new ArrayBuffer(1024);
const port = new MessageChannel().port1;
await pool.run(
{
moved: Piscina.move(buffer1), // Automatic transfer
manual: buffer2, // Manual transfer
port: port // Manual transfer
},
{ transferList: [buffer2, port] } // Explicit transfer list
);Utilities for working with transferable objects.
/**
* Check if object implements Transferable interface
* @param value - Object to check
* @returns True if object is transferable
*/
function isTransferable(value: unknown): boolean;
/**
* Check if object is marked as movable by Piscina.move()
* @param value - Object to check
* @returns True if object was processed by move()
*/
function isMovable(value: any): boolean;Usage Examples:
import { isTransferable, isMovable, move } from "piscina";
const buffer = new ArrayBuffer(1024);
const movedBuffer = move(buffer);
const regularObject = { data: "test" };
console.log(isTransferable(buffer)); // false (not wrapped)
console.log(isTransferable(movedBuffer)); // true (wrapped by move)
console.log(isMovable(movedBuffer)); // true (marked as movable)
console.log(isTransferable(regularObject)); // false
console.log(isMovable(regularObject)); // false
// Use in conditional logic
function processData(data: any) {
if (isMovable(data)) {
console.log("Data will be transferred efficiently");
} else {
console.log("Data will be cloned (slower for large objects)");
}
}Objects that can be transferred efficiently between threads:
// Native transferable types (partial list)
type NativeTransferableTypes =
| ArrayBuffer
| MessagePort
| ReadableStream
| WritableStream
| TransformStream
| AudioData
| ImageBitmap
| OffscreenCanvas;
// Typed array views (transfer underlying ArrayBuffer)
type TypedArrayTypes =
| Int8Array
| Uint8Array
| Uint8ClampedArray
| Int16Array
| Uint16Array
| Int32Array
| Uint32Array
| Float32Array
| Float64Array
| BigInt64Array
| BigUint64Array;Usage Examples:
import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// ArrayBuffer transfer
const arrayBuffer = new ArrayBuffer(1024);
await pool.run({ buffer: Piscina.move(arrayBuffer) });
// Typed array transfer (transfers underlying ArrayBuffer)
const float32Array = new Float32Array(256);
await pool.run({ floats: Piscina.move(float32Array) });
// MessagePort transfer
const { port1, port2 } = new MessageChannel();
await pool.run({
port: Piscina.move(port1),
setupCommunication: true
});
// Multiple transfers
const data = {
buffer1: Piscina.move(new ArrayBuffer(512)),
buffer2: Piscina.move(new ArrayBuffer(256)),
array: Piscina.move(new Uint8Array(128))
};
await pool.run(data);import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// ✅ Good: Large data structures
const largeBuffer = new ArrayBuffer(10 * 1024 * 1024); // 10MB
await pool.run({
data: Piscina.move(largeBuffer), // Efficient transfer
operation: "processLargeData"
});
// ✅ Good: Frequent transfers of medium-sized data
const imageData = new Uint8Array(1920 * 1080 * 4); // 8MB image
await pool.run({
pixels: Piscina.move(imageData),
operation: "applyFilter"
});
// ❌ Avoid: Small objects (overhead not worth it)
const smallArray = new Uint8Array(10);
await pool.run({
data: smallArray, // Just clone, don't transfer
operation: "processSmall"
});
// ❌ Avoid: Objects you need to keep using
const buffer = new ArrayBuffer(1024);
// Don't move if you need buffer later
await pool.run({
data: { ...buffer }, // Clone instead
operation: "process"
});
// buffer is still usableimport Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
async function performanceComparison() {
const largeBuffer = new ArrayBuffer(50 * 1024 * 1024); // 50MB
// Measure cloning performance
console.time('Clone Transfer');
await pool.run({
data: largeBuffer, // Will be cloned
operation: "benchmark"
});
console.timeEnd('Clone Transfer');
// Measure transfer performance
const anotherBuffer = new ArrayBuffer(50 * 1024 * 1024);
console.time('Move Transfer');
await pool.run({
data: Piscina.move(anotherBuffer), // Will be transferred
operation: "benchmark"
});
console.timeEnd('Move Transfer'); // Should be much faster
}How workers receive and process transferred objects:
Worker file (worker.js):
// Worker receives transferred objects normally
module.exports = function(data) {
// data.buffer is now owned by this worker thread
const { buffer, operation } = data;
if (operation === "processLargeData") {
// Work with transferred ArrayBuffer
const view = new Uint8Array(buffer);
// Process data...
for (let i = 0; i < view.length; i++) {
view[i] = view[i] * 2;
}
// Can transfer back to main thread
return {
result: "processed",
// Transfer modified buffer back
processedData: buffer // Will be transferred back
};
}
return { result: "complete" };
};Access transferable-related symbols for advanced usage.
class Piscina {
/** Symbol used to mark transferable objects */
static readonly transferableSymbol: symbol;
/** Symbol used to access transferable values */
static readonly valueSymbol: symbol;
/** Symbol used for queue options */
static readonly queueOptionsSymbol: symbol;
}
// Named exports for convenience
const transferableSymbol: typeof Piscina.transferableSymbol;
const valueSymbol: typeof Piscina.valueSymbol;
const queueOptionsSymbol: typeof Piscina.queueOptionsSymbol;Usage Examples:
import { transferableSymbol, valueSymbol } from "piscina";
// Create custom transferable wrapper
class CustomTransferable {
constructor(private data: ArrayBuffer) {}
get [transferableSymbol]() {
return this.data;
}
get [valueSymbol]() {
return this.data;
}
}
// Use custom transferable
const customData = new CustomTransferable(new ArrayBuffer(1024));
await pool.run({ data: customData });import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
// ✅ Good: Clear transfer intent
async function processLargeFile(fileBuffer: ArrayBuffer) {
try {
const result = await pool.run({
data: Piscina.move(fileBuffer),
operation: "parseFile"
});
// fileBuffer is now detached, don't use it
return result;
} catch (error) {
// Handle errors - buffer may still be detached
console.error("Processing failed:", error);
throw error;
}
}
// ✅ Good: Keep reference if needed later
async function processWithBackup(originalBuffer: ArrayBuffer) {
// Clone for transfer, keep original
const bufferCopy = originalBuffer.slice();
const result = await pool.run({
data: Piscina.move(bufferCopy),
operation: "process"
});
// originalBuffer is still usable
return { result, originalSize: originalBuffer.byteLength };
}import Piscina from "piscina";
const pool = new Piscina({ filename: "worker.js" });
async function safeTransfer(buffer: ArrayBuffer) {
// Check if buffer is already detached
if (buffer.byteLength === 0) {
throw new Error("Buffer is already detached");
}
try {
const result = await pool.run({
data: Piscina.move(buffer),
operation: "process"
});
return result;
} catch (error) {
// Buffer is detached even if task failed
console.warn("Task failed, buffer is detached");
throw error;
}
}Install with Tessl CLI
npx tessl i tessl/npm-piscina