An iteration of the Node.js core streams with a series of improvements
—
Transform streams for data transformation, extending Duplex with specialized transformation capabilities that map written data to readable output.
Creates a new Transform stream with transformation configuration.
/**
* Creates a new Transform stream
* @param opts - Configuration options including transformation function
*/
class Transform extends Duplex {
constructor(opts?: TransformOptions);
}
interface TransformOptions extends DuplexOptions {
/** Transform function shorthand */
transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
/** Flush function shorthand */
flush?: (cb: (error?: Error, result?: any) => void) => void;
}Usage Example:
const { Transform } = require("@mafintosh/streamx");
const upperCaseTransform = new Transform({
transform(data, cb) {
const result = data.toString().toUpperCase();
cb(null, result);
}
});Called to transform incoming data. Override to implement transformation logic.
/**
* Transform incoming data
* @param data - Data to transform
* @param callback - Callback with error and transformed result, or use push()
*/
_transform(data: any, callback: (error?: Error, result?: any) => void): void;Usage Example:
class JSONTransform extends Transform {
_transform(data, cb) {
try {
const parsed = JSON.parse(data.toString());
const transformed = {
...parsed,
processed: true,
timestamp: Date.now()
};
cb(null, JSON.stringify(transformed));
} catch (err) {
cb(err);
}
}
}Called at the end of the transform to flush any remaining data.
/**
* Called to flush any remaining data at the end of transformation
* @param callback - Callback with error and optional final data
*/
_flush(callback: (error?: Error, result?: any) => void): void;Called internally by the stream when ending. Automatically calls _flush.
/**
* Called internally when the transform stream is ending
* @param callback - Callback to call when final processing is complete
*/
_final(callback: (error?: Error) => void): void;Transform data and pass result via callback:
const csvToJson = new Transform({
transform(data, cb) {
const lines = data.toString().split('\n');
const result = lines.map(line => {
const [name, age] = line.split(',');
return JSON.stringify({ name, age: parseInt(age) });
}).join('\n');
cb(null, result);
}
});Transform data and push result directly:
const splitTransform = new Transform({
transform(data, cb) {
const lines = data.toString().split('\n');
lines.forEach(line => {
if (line.trim()) {
this.push(line.trim());
}
});
cb(null); // No result via callback
}
});Generate multiple outputs for single input:
const duplicateTransform = new Transform({
transform(data, cb) {
const str = data.toString();
this.push(`Original: ${str}`);
this.push(`Copy: ${str}`);
this.push(`Uppercase: ${str.toUpperCase()}`);
cb(null);
}
});const { Transform } = require("@mafintosh/streamx");
// Line-by-line processor
class LineProcessor extends Transform {
constructor() {
super();
this.buffer = '';
}
_transform(chunk, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Keep the last incomplete line in buffer
this.buffer = lines.pop();
// Process complete lines
lines.forEach(line => {
if (line.trim()) {
const processed = `[${new Date().toISOString()}] ${line}`;
this.push(processed);
}
});
cb(null);
}
_flush(cb) {
// Process any remaining data in buffer
if (this.buffer.trim()) {
const processed = `[${new Date().toISOString()}] ${this.buffer}`;
this.push(processed);
}
cb(null);
}
}class ValidatorTransform extends Transform {
constructor(validator) {
super();
this.validate = validator;
this.validCount = 0;
this.invalidCount = 0;
}
_transform(data, cb) {
try {
const item = JSON.parse(data.toString());
if (this.validate(item)) {
this.validCount++;
this.push(JSON.stringify(item));
} else {
this.invalidCount++;
console.warn('Invalid item:', item);
}
} catch (err) {
this.invalidCount++;
console.error('Parse error:', err.message);
}
cb(null);
}
_flush(cb) {
console.log(`Processed: ${this.validCount} valid, ${this.invalidCount} invalid`);
cb(null);
}
}
// Usage
const validator = new ValidatorTransform(item =>
typeof item.name === 'string' && typeof item.age === 'number'
);const crypto = require('crypto');
class Base64Transform extends Transform {
_transform(data, cb) {
const encoded = Buffer.from(data).toString('base64');
cb(null, encoded);
}
}
class HashTransform extends Transform {
constructor(algorithm = 'sha256') {
super();
this.algorithm = algorithm;
}
_transform(data, cb) {
const hash = crypto.createHash(this.algorithm);
hash.update(data);
const result = hash.digest('hex');
cb(null, result);
}
}By default, Transform acts as a pass-through stream if no _transform is implemented:
const passThrough = new Transform(); // Acts as pass-through
// Data written to it is emitted as readable data unchanged
passThrough.write("Hello");
passThrough.end();
passThrough.on('data', (chunk) => {
console.log('Passed through:', chunk.toString()); // "Hello"
});Transform streams work excellently in pipelines:
const { Readable, Writable, Transform } = require("@mafintosh/streamx");
// Create source data
const source = new Readable({
read(cb) {
this.push('hello world\n');
this.push('foo bar\n');
this.push(null);
cb(null);
}
});
// Create transforms
const upperCase = new Transform({
transform(data, cb) {
cb(null, data.toString().toUpperCase());
}
});
const addPrefix = new Transform({
transform(data, cb) {
cb(null, `>> ${data}`);
}
});
// Create destination
const destination = new Writable({
write(data, cb) {
console.log('Final:', data.toString());
cb(null);
}
});
// Pipeline
source.pipe(upperCase).pipe(addPrefix).pipe(destination, (err) => {
if (err) console.error('Pipeline error:', err);
else console.log('Pipeline completed');
});class CounterTransform extends Transform {
constructor() {
super();
this.lineNumber = 0;
}
_transform(data, cb) {
const lines = data.toString().split('\n');
const result = lines.map(line => {
if (line.trim()) {
return `${++this.lineNumber}: ${line}`;
}
return line;
}).join('\n');
cb(null, result);
}
}class AsyncTransform extends Transform {
async _transform(data, cb) {
try {
// Simulate async operation
const result = await this.processAsync(data.toString());
cb(null, result);
} catch (err) {
cb(err);
}
}
async processAsync(data) {
return new Promise(resolve => {
setTimeout(() => {
resolve(data.toUpperCase());
}, 100);
});
}
}Transform streams inherit all events from Duplex streams:
const transform = new Transform({
transform(data, cb) {
cb(null, data.toString().toUpperCase());
}
});
transform.on('data', (chunk) => {
console.log('Transformed:', chunk.toString());
});
transform.on('finish', () => {
console.log('All transforms completed');
});
transform.on('close', () => {
console.log('Transform stream closed');
});
transform.write('hello');
transform.write('world');
transform.end();Install with Tessl CLI
npx tessl i tessl/npm-mafintosh--streamx