A tiny wrapper around Node.js streams.Transform to avoid explicit subclassing noise
npx @tessl/cli install tessl/npm-through2@4.0.0through2 is a tiny wrapper around Node.js streams.Transform that eliminates the complexity of explicit subclassing and prototype chain setup. It provides a functional approach to creating transform streams through simple function parameters, supporting both binary and object modes with optional transform and flush functions.
npm install through2const through2 = require('through2');For ESM environments:
import through2 from 'through2';const fs = require('fs');
const through2 = require('through2');
// Transform text data
fs.createReadStream('input.txt')
.pipe(through2(function (chunk, enc, callback) {
// Transform chunk (replace 'a' with 'z')
for (let i = 0; i < chunk.length; i++) {
if (chunk[i] == 97) chunk[i] = 122;
}
this.push(chunk);
callback();
}))
.pipe(fs.createWriteStream('output.txt'));
// Object mode transformation
const objectStream = through2.obj(function (obj, enc, callback) {
obj.processed = true;
obj.timestamp = Date.now();
this.push(obj);
callback();
});Creates a transform stream instance with optional transform and flush functions.
/**
* Creates a transform stream with optional transformation logic
* @param {Object} options - Stream options (optional)
* @param {Function} transform - Transform function (optional)
* @param {Function} flush - Flush function (optional)
* @returns {Transform} Transform stream instance
*/
function through2(options, transform, flush);
/**
* Transform function signature
* @param {Buffer|String|any} chunk - Data chunk to transform
* @param {String} encoding - Encoding (for string chunks)
* @param {Function} callback - Completion callback
*/
function transform(chunk, encoding, callback);
/**
* Flush function signature
* @param {Function} callback - Completion callback
*/
function flush(callback);Usage Examples:
// Basic transform with all parameters
const transform = through2(
{ objectMode: false },
function (chunk, enc, callback) {
// Process chunk
this.push(chunk.toString().toUpperCase());
callback();
},
function (callback) {
// Final processing
this.push('\n-- END --\n');
callback();
}
);
// Transform function only
const simpleTransform = through2(function (chunk, enc, callback) {
callback(null, chunk); // Pass through unchanged
});
// No-op transform (pass-through)
const passThrough = through2();Creates transform streams optimized for object processing with objectMode: true and highWaterMark: 16.
/**
* Creates an object mode transform stream
* @param {Object} options - Stream options (optional)
* @param {Function} transform - Transform function (optional)
* @param {Function} flush - Flush function (optional)
* @returns {Transform} Object mode transform stream
*/
function through2.obj(options, transform, flush);Usage Examples:
// Object transformation
const processor = through2.obj(function (obj, enc, callback) {
if (obj.type === 'user') {
obj.processedAt = new Date().toISOString();
this.push(obj);
}
callback();
});
// With options override
const customProcessor = through2.obj(
{ highWaterMark: 32 },
function (obj, enc, callback) {
this.push({ ...obj, id: Math.random() });
callback();
}
);Creates reusable constructor functions for transform streams, useful when the same transform logic needs to be used in multiple instances.
/**
* Creates a reusable transform stream constructor
* @param {Object} options - Default stream options (optional)
* @param {Function} transform - Transform function
* @param {Function} flush - Flush function (optional)
* @returns {Function} Constructor function
*/
function through2.ctor(options, transform, flush);
/**
* Constructor function signature
* @param {Object} override - Options to override defaults (optional)
* @returns {Transform} Transform stream instance
*/
function Through2Constructor(override);Usage Examples:
// Create reusable constructor
const TextProcessor = through2.ctor(
{ encoding: 'utf8' },
function (chunk, enc, callback) {
this.push(chunk.toString().toLowerCase());
callback();
}
);
// Create instances
const processor1 = new TextProcessor();
const processor2 = TextProcessor(); // 'new' is optional
const processor3 = TextProcessor({ objectMode: true }); // Override options
// Object mode constructor
const ObjectProcessor = through2.ctor(
{ objectMode: true },
function (record, enc, callback) {
if (record.temp != null && record.unit === 'F') {
record.temp = ((record.temp - 32) * 5) / 9;
record.unit = 'C';
}
this.push(record);
callback();
}
);Transform functions receive three parameters and must call the callback to signal completion:
/**
* Transform function implementation pattern
* @param {Buffer|String|any} chunk - Input data chunk
* @param {String} encoding - Character encoding for string chunks
* @param {Function} callback - Completion callback
*/
function transformFunction(chunk, encoding, callback) {
// Process the chunk
// Option 1: Push data and call callback
this.push(processedData);
callback();
// Option 2: Use callback shorthand
callback(null, processedData);
// Option 3: Signal error
callback(new Error('Processing failed'));
// Option 4: Drop chunk (don't push anything)
callback();
}Flush functions are called when the stream is ending and no more data will be written:
/**
* Flush function implementation pattern
* @param {Function} callback - Completion callback
*/
function flushFunction(callback) {
// Perform final processing
// Push any remaining data
this.push(finalData);
callback();
// Or signal completion without data
callback();
// Or signal error
callback(new Error('Flush failed'));
}All functions accept standard Node.js Transform stream options:
interface StreamOptions {
/** Enable object mode for non-binary data */
objectMode?: boolean;
/** Internal buffer size */
highWaterMark?: number;
/** Keep readable side open when writable ends */
allowHalfOpen?: boolean;
/** Decode strings to Buffers before passing to _transform */
decodeStrings?: boolean;
/** Default string encoding */
encoding?: string;
/** Destroy stream if writable side ends */
autoDestroy?: boolean;
/** Emit 'close' after 'finish' and 'end' */
emitClose?: boolean;
}// Error handling in transform function
const errorTransform = through2(function (chunk, enc, callback) {
try {
const result = riskyOperation(chunk);
callback(null, result);
} catch (error) {
callback(error); // Stream will emit 'error' event
}
});
// Stream destruction
const stream = through2();
stream.destroy(); // Destroys stream and emits 'close'
// Stream can be destroyed multiple times safely
stream.destroy();
stream.destroy(); // No additional effects// Stateful transformation
const counter = through2(function (chunk, enc, callback) {
if (!this._count) this._count = 0;
this._count++;
this.push(`${this._count}: ${chunk}`);
callback();
});
// Conditional processing
const filter = through2.obj(function (obj, enc, callback) {
if (obj.include) {
this.push(obj);
}
// Skip objects without 'include' property
callback();
});
// Batch processing with flush
const batcher = through2.obj(
function (obj, enc, callback) {
if (!this._batch) this._batch = [];
this._batch.push(obj);
if (this._batch.length >= 10) {
this.push(this._batch);
this._batch = [];
}
callback();
},
function (callback) {
// Push remaining items
if (this._batch && this._batch.length > 0) {
this.push(this._batch);
}
callback();
}
);