or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-through2

A tiny wrapper around Node.js streams.Transform to avoid explicit subclassing noise

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/through2@4.0.x

To install, run

npx @tessl/cli install tessl/npm-through2@4.0.0

index.mddocs/

through2

through2 is a tiny wrapper around Node.js streams.Transform that eliminates the complexity of explicit subclassing and prototype chain setup. It provides a functional approach to creating transform streams through simple function parameters, supporting both binary and object modes with optional transform and flush functions.

Package Information

  • Package Name: through2
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install through2

Core Imports

const through2 = require('through2');

For ESM environments:

import through2 from 'through2';

Basic Usage

const fs = require('fs');
const through2 = require('through2');

// Transform text data
fs.createReadStream('input.txt')
  .pipe(through2(function (chunk, enc, callback) {
    // Transform chunk (replace 'a' with 'z')
    for (let i = 0; i < chunk.length; i++) {
      if (chunk[i] == 97) chunk[i] = 122;
    }
    this.push(chunk);
    callback();
  }))
  .pipe(fs.createWriteStream('output.txt'));

// Object mode transformation
const objectStream = through2.obj(function (obj, enc, callback) {
  obj.processed = true;
  obj.timestamp = Date.now();
  this.push(obj);
  callback();
});

Capabilities

Transform Stream Creation

Creates a transform stream instance with optional transform and flush functions.

/**
 * Creates a transform stream with optional transformation logic
 * @param {Object} options - Stream options (optional)
 * @param {Function} transform - Transform function (optional)
 * @param {Function} flush - Flush function (optional)
 * @returns {Transform} Transform stream instance
 */
function through2(options, transform, flush);

/**
 * Transform function signature
 * @param {Buffer|String|any} chunk - Data chunk to transform
 * @param {String} encoding - Encoding (for string chunks)
 * @param {Function} callback - Completion callback
 */
function transform(chunk, encoding, callback);

/**
 * Flush function signature
 * @param {Function} callback - Completion callback
 */
function flush(callback);

Usage Examples:

// Basic transform with all parameters
const transform = through2(
  { objectMode: false },
  function (chunk, enc, callback) {
    // Process chunk
    this.push(chunk.toString().toUpperCase());
    callback();
  },
  function (callback) {
    // Final processing
    this.push('\n-- END --\n');
    callback();
  }
);

// Transform function only
const simpleTransform = through2(function (chunk, enc, callback) {
  callback(null, chunk); // Pass through unchanged
});

// No-op transform (pass-through)
const passThrough = through2();

Object Mode Transform Stream

Creates transform streams optimized for object processing with objectMode: true and highWaterMark: 16.

/**
 * Creates an object mode transform stream
 * @param {Object} options - Stream options (optional)
 * @param {Function} transform - Transform function (optional)
 * @param {Function} flush - Flush function (optional)
 * @returns {Transform} Object mode transform stream
 */
function through2.obj(options, transform, flush);

Usage Examples:

// Object transformation
const processor = through2.obj(function (obj, enc, callback) {
  if (obj.type === 'user') {
    obj.processedAt = new Date().toISOString();
    this.push(obj);
  }
  callback();
});

// With options override
const customProcessor = through2.obj(
  { highWaterMark: 32 },
  function (obj, enc, callback) {
    this.push({ ...obj, id: Math.random() });
    callback();
  }
);

Constructor Function Creation

Creates reusable constructor functions for transform streams, useful when the same transform logic needs to be used in multiple instances.

/**
 * Creates a reusable transform stream constructor
 * @param {Object} options - Default stream options (optional)
 * @param {Function} transform - Transform function
 * @param {Function} flush - Flush function (optional)
 * @returns {Function} Constructor function
 */
function through2.ctor(options, transform, flush);

/**
 * Constructor function signature
 * @param {Object} override - Options to override defaults (optional)
 * @returns {Transform} Transform stream instance
 */
function Through2Constructor(override);

Usage Examples:

// Create reusable constructor
const TextProcessor = through2.ctor(
  { encoding: 'utf8' },
  function (chunk, enc, callback) {
    this.push(chunk.toString().toLowerCase());
    callback();
  }
);

// Create instances
const processor1 = new TextProcessor();
const processor2 = TextProcessor(); // 'new' is optional
const processor3 = TextProcessor({ objectMode: true }); // Override options

// Object mode constructor
const ObjectProcessor = through2.ctor(
  { objectMode: true },
  function (record, enc, callback) {
    if (record.temp != null && record.unit === 'F') {
      record.temp = ((record.temp - 32) * 5) / 9;
      record.unit = 'C';
    }
    this.push(record);
    callback();
  }
);

Transform Function Details

Transform functions receive three parameters and must call the callback to signal completion:

/**
 * Transform function implementation pattern
 * @param {Buffer|String|any} chunk - Input data chunk
 * @param {String} encoding - Character encoding for string chunks
 * @param {Function} callback - Completion callback
 */
function transformFunction(chunk, encoding, callback) {
  // Process the chunk
  // Option 1: Push data and call callback
  this.push(processedData);
  callback();
  
  // Option 2: Use callback shorthand
  callback(null, processedData);
  
  // Option 3: Signal error
  callback(new Error('Processing failed'));
  
  // Option 4: Drop chunk (don't push anything)
  callback();
}

Flush Function Details

Flush functions are called when the stream is ending and no more data will be written:

/**
 * Flush function implementation pattern
 * @param {Function} callback - Completion callback
 */
function flushFunction(callback) {
  // Perform final processing
  // Push any remaining data
  this.push(finalData);
  callback();
  
  // Or signal completion without data
  callback();
  
  // Or signal error
  callback(new Error('Flush failed'));
}

Stream Options

All functions accept standard Node.js Transform stream options:

interface StreamOptions {
  /** Enable object mode for non-binary data */
  objectMode?: boolean;
  /** Internal buffer size */
  highWaterMark?: number;
  /** Keep readable side open when writable ends */
  allowHalfOpen?: boolean;
  /** Decode strings to Buffers before passing to _transform */
  decodeStrings?: boolean;
  /** Default string encoding */
  encoding?: string;
  /** Destroy stream if writable side ends */
  autoDestroy?: boolean;
  /** Emit 'close' after 'finish' and 'end' */
  emitClose?: boolean;
}

Error Handling and Stream Control

// Error handling in transform function
const errorTransform = through2(function (chunk, enc, callback) {
  try {
    const result = riskyOperation(chunk);
    callback(null, result);
  } catch (error) {
    callback(error); // Stream will emit 'error' event
  }
});

// Stream destruction
const stream = through2();
stream.destroy(); // Destroys stream and emits 'close'

// Stream can be destroyed multiple times safely
stream.destroy();
stream.destroy(); // No additional effects

Advanced Usage Patterns

// Stateful transformation
const counter = through2(function (chunk, enc, callback) {
  if (!this._count) this._count = 0;
  this._count++;
  
  this.push(`${this._count}: ${chunk}`);
  callback();
});

// Conditional processing
const filter = through2.obj(function (obj, enc, callback) {
  if (obj.include) {
    this.push(obj);
  }
  // Skip objects without 'include' property
  callback();
});

// Batch processing with flush
const batcher = through2.obj(
  function (obj, enc, callback) {
    if (!this._batch) this._batch = [];
    this._batch.push(obj);
    
    if (this._batch.length >= 10) {
      this.push(this._batch);
      this._batch = [];
    }
    callback();
  },
  function (callback) {
    // Push remaining items
    if (this._batch && this._batch.length > 0) {
      this.push(this._batch);
    }
    callback();
  }
);