CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-through2

A tiny wrapper around Node.js streams.Transform to avoid explicit subclassing noise

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

through2

through2 is a tiny wrapper around Node.js streams.Transform that eliminates the complexity of explicit subclassing and prototype chain setup. It provides a functional approach to creating transform streams through simple function parameters, supporting both binary and object modes with optional transform and flush functions.

Package Information

  • Package Name: through2
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install through2

Core Imports

const through2 = require('through2');

For ESM environments:

import through2 from 'through2';

Basic Usage

const fs = require('fs');
const through2 = require('through2');

// Transform text data
fs.createReadStream('input.txt')
  .pipe(through2(function (chunk, enc, callback) {
    // Transform chunk (replace 'a' with 'z')
    for (let i = 0; i < chunk.length; i++) {
      if (chunk[i] == 97) chunk[i] = 122;
    }
    this.push(chunk);
    callback();
  }))
  .pipe(fs.createWriteStream('output.txt'));

// Object mode transformation
const objectStream = through2.obj(function (obj, enc, callback) {
  obj.processed = true;
  obj.timestamp = Date.now();
  this.push(obj);
  callback();
});

Capabilities

Transform Stream Creation

Creates a transform stream instance with optional transform and flush functions.

/**
 * Creates a transform stream with optional transformation logic
 * @param {Object} options - Stream options (optional)
 * @param {Function} transform - Transform function (optional)
 * @param {Function} flush - Flush function (optional)
 * @returns {Transform} Transform stream instance
 */
function through2(options, transform, flush);

/**
 * Transform function signature
 * @param {Buffer|String|any} chunk - Data chunk to transform
 * @param {String} encoding - Encoding (for string chunks)
 * @param {Function} callback - Completion callback
 */
function transform(chunk, encoding, callback);

/**
 * Flush function signature
 * @param {Function} callback - Completion callback
 */
function flush(callback);

Usage Examples:

// Basic transform with all parameters
const transform = through2(
  { objectMode: false },
  function (chunk, enc, callback) {
    // Process chunk
    this.push(chunk.toString().toUpperCase());
    callback();
  },
  function (callback) {
    // Final processing
    this.push('\n-- END --\n');
    callback();
  }
);

// Transform function only
const simpleTransform = through2(function (chunk, enc, callback) {
  callback(null, chunk); // Pass through unchanged
});

// No-op transform (pass-through)
const passThrough = through2();

Object Mode Transform Stream

Creates transform streams optimized for object processing with objectMode: true and highWaterMark: 16.

/**
 * Creates an object mode transform stream
 * @param {Object} options - Stream options (optional)
 * @param {Function} transform - Transform function (optional)
 * @param {Function} flush - Flush function (optional)
 * @returns {Transform} Object mode transform stream
 */
function through2.obj(options, transform, flush);

Usage Examples:

// Object transformation
const processor = through2.obj(function (obj, enc, callback) {
  if (obj.type === 'user') {
    obj.processedAt = new Date().toISOString();
    this.push(obj);
  }
  callback();
});

// With options override
const customProcessor = through2.obj(
  { highWaterMark: 32 },
  function (obj, enc, callback) {
    this.push({ ...obj, id: Math.random() });
    callback();
  }
);

Constructor Function Creation

Creates reusable constructor functions for transform streams, useful when the same transform logic needs to be used in multiple instances.

/**
 * Creates a reusable transform stream constructor
 * @param {Object} options - Default stream options (optional)
 * @param {Function} transform - Transform function
 * @param {Function} flush - Flush function (optional)
 * @returns {Function} Constructor function
 */
function through2.ctor(options, transform, flush);

/**
 * Constructor function signature
 * @param {Object} override - Options to override defaults (optional)
 * @returns {Transform} Transform stream instance
 */
function Through2Constructor(override);

Usage Examples:

// Create reusable constructor
const TextProcessor = through2.ctor(
  { encoding: 'utf8' },
  function (chunk, enc, callback) {
    this.push(chunk.toString().toLowerCase());
    callback();
  }
);

// Create instances
const processor1 = new TextProcessor();
const processor2 = TextProcessor(); // 'new' is optional
const processor3 = TextProcessor({ objectMode: true }); // Override options

// Object mode constructor
const ObjectProcessor = through2.ctor(
  { objectMode: true },
  function (record, enc, callback) {
    if (record.temp != null && record.unit === 'F') {
      record.temp = ((record.temp - 32) * 5) / 9;
      record.unit = 'C';
    }
    this.push(record);
    callback();
  }
);

Transform Function Details

Transform functions receive three parameters and must call the callback to signal completion:

/**
 * Transform function implementation pattern
 * @param {Buffer|String|any} chunk - Input data chunk
 * @param {String} encoding - Character encoding for string chunks
 * @param {Function} callback - Completion callback
 */
function transformFunction(chunk, encoding, callback) {
  // Process the chunk
  // Option 1: Push data and call callback
  this.push(processedData);
  callback();
  
  // Option 2: Use callback shorthand
  callback(null, processedData);
  
  // Option 3: Signal error
  callback(new Error('Processing failed'));
  
  // Option 4: Drop chunk (don't push anything)
  callback();
}

Flush Function Details

Flush functions are called when the stream is ending and no more data will be written:

/**
 * Flush function implementation pattern
 * @param {Function} callback - Completion callback
 */
function flushFunction(callback) {
  // Perform final processing
  // Push any remaining data
  this.push(finalData);
  callback();
  
  // Or signal completion without data
  callback();
  
  // Or signal error
  callback(new Error('Flush failed'));
}

Stream Options

All functions accept standard Node.js Transform stream options:

interface StreamOptions {
  /** Enable object mode for non-binary data */
  objectMode?: boolean;
  /** Internal buffer size */
  highWaterMark?: number;
  /** Keep readable side open when writable ends */
  allowHalfOpen?: boolean;
  /** Decode strings to Buffers before passing to _transform */
  decodeStrings?: boolean;
  /** Default string encoding */
  encoding?: string;
  /** Destroy stream if writable side ends */
  autoDestroy?: boolean;
  /** Emit 'close' after 'finish' and 'end' */
  emitClose?: boolean;
}

Error Handling and Stream Control

// Error handling in transform function
const errorTransform = through2(function (chunk, enc, callback) {
  try {
    const result = riskyOperation(chunk);
    callback(null, result);
  } catch (error) {
    callback(error); // Stream will emit 'error' event
  }
});

// Stream destruction
const stream = through2();
stream.destroy(); // Destroys stream and emits 'close'

// Stream can be destroyed multiple times safely
stream.destroy();
stream.destroy(); // No additional effects

Advanced Usage Patterns

// Stateful transformation
const counter = through2(function (chunk, enc, callback) {
  if (!this._count) this._count = 0;
  this._count++;
  
  this.push(`${this._count}: ${chunk}`);
  callback();
});

// Conditional processing
const filter = through2.obj(function (obj, enc, callback) {
  if (obj.include) {
    this.push(obj);
  }
  // Skip objects without 'include' property
  callback();
});

// Batch processing with flush
const batcher = through2.obj(
  function (obj, enc, callback) {
    if (!this._batch) this._batch = [];
    this._batch.push(obj);
    
    if (this._batch.length >= 10) {
      this.push(this._batch);
      this._batch = [];
    }
    callback();
  },
  function (callback) {
    // Push remaining items
    if (this._batch && this._batch.length > 0) {
      this.push(this._batch);
    }
    callback();
  }
);
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/through2@4.0.x
Publish Source
CLI
Badge
tessl/npm-through2 badge