CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-piscina

A fast, efficient Node.js Worker Thread Pool implementation

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transferable-objects.mddocs/

Transferable Objects

Efficient data transfer mechanisms using structured cloning and transferable objects for optimal performance with large data sets.

Capabilities

Piscina.move Function

Primary utility for marking objects as transferable to optimize data transfer between main thread and workers.

/**
 * Mark objects for efficient transfer to worker threads
 * Transfers ownership instead of copying data
 * @param val - Object to transfer (ArrayBuffer, MessagePort, etc.)
 * @returns Wrapped transferable object
 */
function move(
  val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort
): any;

Usage Examples:

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// Transfer ArrayBuffer efficiently
const buffer = new ArrayBuffer(1024 * 1024); // 1MB buffer
const transferableBuffer = Piscina.move(buffer);

await pool.run({
  data: transferableBuffer,
  operation: "processLargeData"
});

// buffer is now unusable in main thread (ownership transferred)
console.log(buffer.byteLength); // 0 - buffer is detached

// Transfer typed arrays
const uint8Array = new Uint8Array(1000);
uint8Array.fill(42);

await pool.run({
  data: Piscina.move(uint8Array), // Transfers underlying ArrayBuffer
  operation: "processTypedArray"
});

// Transfer multiple objects
const buffer1 = new ArrayBuffer(512);
const buffer2 = new ArrayBuffer(256);

await pool.run({
  buffers: [Piscina.move(buffer1), Piscina.move(buffer2)],
  operation: "processMultipleBuffers"
});

Transferable Interface

Interface for objects that can be efficiently transferred between threads.

/**
 * Interface for transferable objects
 */
interface Transferable {
  /** Internal transferable object reference */
  readonly [transferableSymbol]: object;
  
  /** Internal value object reference */
  readonly [valueSymbol]: object;
}

Transfer List Types

Type definitions for transferable object collections.

/**
 * Array of transferable objects
 * Extracted from MessagePort.postMessage signature
 */
type TransferList = MessagePort extends {
  postMessage: (value: any, transferList: infer T) => any;
} ? T : never;

/**
 * Individual transferable object type
 */
type TransferListItem = TransferList extends Array<infer T> ? T : never;

Manual Transfer Lists

Use transfer lists directly in run options for fine-grained control.

interface RunOptions {
  /** Objects to transfer ownership to worker (for performance) */
  transferList?: TransferList;
}

Usage Examples:

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// Manual transfer list (without Piscina.move)
const buffer = new ArrayBuffer(2048);
const result = await pool.run(
  { data: buffer, operation: "process" },
  { transferList: [buffer] }
);

// Mixed approach: some moved, some in transfer list
const buffer1 = new ArrayBuffer(1024);
const buffer2 = new ArrayBuffer(1024);
const port = new MessageChannel().port1;

await pool.run(
  {
    moved: Piscina.move(buffer1),    // Automatic transfer
    manual: buffer2,                 // Manual transfer
    port: port                       // Manual transfer
  },
  { transferList: [buffer2, port] }  // Explicit transfer list
);

Transferable Object Detection

Utilities for working with transferable objects.

/**
 * Check if object implements Transferable interface
 * @param value - Object to check
 * @returns True if object is transferable
 */
function isTransferable(value: unknown): boolean;

/**
 * Check if object is marked as movable by Piscina.move()
 * @param value - Object to check
 * @returns True if object was processed by move()
 */
function isMovable(value: any): boolean;

Usage Examples:

import { isTransferable, isMovable, move } from "piscina";

const buffer = new ArrayBuffer(1024);
const movedBuffer = move(buffer);
const regularObject = { data: "test" };

console.log(isTransferable(buffer));      // false (not wrapped)
console.log(isTransferable(movedBuffer)); // true (wrapped by move)
console.log(isMovable(movedBuffer));      // true (marked as movable)
console.log(isTransferable(regularObject)); // false
console.log(isMovable(regularObject));      // false

// Use in conditional logic
function processData(data: any) {
  if (isMovable(data)) {
    console.log("Data will be transferred efficiently");
  } else {
    console.log("Data will be cloned (slower for large objects)");
  }
}

Supported Transferable Types

Objects that can be transferred efficiently between threads:

// Native transferable types (partial list)
type NativeTransferableTypes =
  | ArrayBuffer
  | MessagePort
  | ReadableStream
  | WritableStream
  | TransformStream
  | AudioData
  | ImageBitmap
  | OffscreenCanvas;

// Typed array views (transfer underlying ArrayBuffer)
type TypedArrayTypes =
  | Int8Array
  | Uint8Array
  | Uint8ClampedArray
  | Int16Array
  | Uint16Array
  | Int32Array
  | Uint32Array
  | Float32Array
  | Float64Array
  | BigInt64Array
  | BigUint64Array;

Usage Examples:

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// ArrayBuffer transfer
const arrayBuffer = new ArrayBuffer(1024);
await pool.run({ buffer: Piscina.move(arrayBuffer) });

// Typed array transfer (transfers underlying ArrayBuffer)
const float32Array = new Float32Array(256);
await pool.run({ floats: Piscina.move(float32Array) });

// MessagePort transfer
const { port1, port2 } = new MessageChannel();
await pool.run({ 
  port: Piscina.move(port1),
  setupCommunication: true 
});

// Multiple transfers
const data = {
  buffer1: Piscina.move(new ArrayBuffer(512)),
  buffer2: Piscina.move(new ArrayBuffer(256)),
  array: Piscina.move(new Uint8Array(128))
};
await pool.run(data);

Performance Considerations

When to Use Transferable Objects

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// ✅ Good: Large data structures
const largeBuffer = new ArrayBuffer(10 * 1024 * 1024); // 10MB
await pool.run({
  data: Piscina.move(largeBuffer), // Efficient transfer
  operation: "processLargeData"
});

// ✅ Good: Frequent transfers of medium-sized data
const imageData = new Uint8Array(1920 * 1080 * 4); // 8MB image
await pool.run({
  pixels: Piscina.move(imageData),
  operation: "applyFilter"
});

// ❌ Avoid: Small objects (overhead not worth it)
const smallArray = new Uint8Array(10);
await pool.run({
  data: smallArray, // Just clone, don't transfer
  operation: "processSmall"
});

// ❌ Avoid: Objects you need to keep using
const buffer = new ArrayBuffer(1024);
// Don't move if you need buffer later
await pool.run({
  data: { ...buffer }, // Clone instead
  operation: "process"
});
// buffer is still usable

Transfer vs Clone Performance

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

async function performanceComparison() {
  const largeBuffer = new ArrayBuffer(50 * 1024 * 1024); // 50MB
  
  // Measure cloning performance
  console.time('Clone Transfer');
  await pool.run({
    data: largeBuffer, // Will be cloned
    operation: "benchmark"
  });
  console.timeEnd('Clone Transfer');
  
  // Measure transfer performance  
  const anotherBuffer = new ArrayBuffer(50 * 1024 * 1024);
  console.time('Move Transfer');
  await pool.run({
    data: Piscina.move(anotherBuffer), // Will be transferred
    operation: "benchmark"
  });
  console.timeEnd('Move Transfer'); // Should be much faster
}

Worker-Side Handling

How workers receive and process transferred objects:

Worker file (worker.js):

// Worker receives transferred objects normally
module.exports = function(data) {
  // data.buffer is now owned by this worker thread
  const { buffer, operation } = data;
  
  if (operation === "processLargeData") {
    // Work with transferred ArrayBuffer
    const view = new Uint8Array(buffer);
    
    // Process data...
    for (let i = 0; i < view.length; i++) {
      view[i] = view[i] * 2;
    }
    
    // Can transfer back to main thread
    return { 
      result: "processed",
      // Transfer modified buffer back
      processedData: buffer // Will be transferred back
    };
  }
  
  return { result: "complete" };
};

Symbols and Constants

Access transferable-related symbols for advanced usage.

class Piscina {
  /** Symbol used to mark transferable objects */
  static readonly transferableSymbol: symbol;
  
  /** Symbol used to access transferable values */
  static readonly valueSymbol: symbol;
  
  /** Symbol used for queue options */
  static readonly queueOptionsSymbol: symbol;
}

// Named exports for convenience
const transferableSymbol: typeof Piscina.transferableSymbol;
const valueSymbol: typeof Piscina.valueSymbol;
const queueOptionsSymbol: typeof Piscina.queueOptionsSymbol;

Usage Examples:

import { transferableSymbol, valueSymbol } from "piscina";

// Create custom transferable wrapper
class CustomTransferable {
  constructor(private data: ArrayBuffer) {}
  
  get [transferableSymbol]() {
    return this.data;
  }
  
  get [valueSymbol]() {
    return this.data;
  }
}

// Use custom transferable
const customData = new CustomTransferable(new ArrayBuffer(1024));
await pool.run({ data: customData });

Best Practices

Memory Management

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

// ✅ Good: Clear transfer intent
async function processLargeFile(fileBuffer: ArrayBuffer) {
  try {
    const result = await pool.run({
      data: Piscina.move(fileBuffer),
      operation: "parseFile"
    });
    
    // fileBuffer is now detached, don't use it
    return result;
  } catch (error) {
    // Handle errors - buffer may still be detached
    console.error("Processing failed:", error);
    throw error;
  }
}

// ✅ Good: Keep reference if needed later
async function processWithBackup(originalBuffer: ArrayBuffer) {
  // Clone for transfer, keep original
  const bufferCopy = originalBuffer.slice();
  
  const result = await pool.run({
    data: Piscina.move(bufferCopy),
    operation: "process"
  });
  
  // originalBuffer is still usable
  return { result, originalSize: originalBuffer.byteLength };
}

Error Handling

import Piscina from "piscina";

const pool = new Piscina({ filename: "worker.js" });

async function safeTransfer(buffer: ArrayBuffer) {
  // Check if buffer is already detached
  if (buffer.byteLength === 0) {
    throw new Error("Buffer is already detached");
  }
  
  try {
    const result = await pool.run({
      data: Piscina.move(buffer),
      operation: "process"
    });
    return result;
  } catch (error) {
    // Buffer is detached even if task failed
    console.warn("Task failed, buffer is detached");
    throw error;
  }
}

Install with Tessl CLI

npx tessl i tessl/npm-piscina

docs

index.md

load-balancing.md

performance-monitoring.md

pool-management.md

task-cancellation.md

task-queues.md

transferable-objects.md

tile.json