An iteration of the Node.js core streams with a series of improvements
—
Duplex streams are both readable and writable, while Transform streams provide data transformation capabilities. StreamX provides enhanced implementations with proper lifecycle management and error handling.
A duplex stream is both readable and writable, inheriting from Readable and implementing the Writable API.
/**
* Creates a duplex stream that is both readable and writable
* @param options - Configuration options for the duplex stream
*/
class Duplex extends Readable {
constructor(options?: DuplexOptions);
// Inherits all Readable methods
_read(cb: () => void): void;
push(data: any): boolean;
read(): any;
// Implements Writable methods
_write(data: any, cb: (err?: Error) => void): void;
_writev(batch: any[], cb: (err?: Error) => void): void;
_final(cb: (err?: Error) => void): void;
write(data: any): boolean;
end(): Duplex;
// Shared lifecycle methods
_open(cb: (err?: Error) => void): void;
_destroy(cb: (err?: Error) => void): void;
_predestroy(): void;
destroy(err?: Error): void;
}
interface DuplexOptions extends ReadableOptions {
/** Map function for readable side only */
mapReadable?: (data: any) => any;
/** ByteLength function for readable side only */
byteLengthReadable?: (data: any) => number;
/** Map function for writable side only */
mapWritable?: (data: any) => any;
/** ByteLength function for writable side only */
byteLengthWritable?: (data: any) => number;
/** Shorthand for _write method */
write?: (data: any, cb: (err?: Error) => void) => void;
/** Shorthand for _writev method */
writev?: (batch: any[], cb: (err?: Error) => void) => void;
/** Shorthand for _final method */
final?: (cb: (err?: Error) => void) => void;
}Usage Examples:
const { Duplex } = require('streamx');
// Basic duplex stream
const echo = new Duplex({
write(data, cb) {
// Echo data back to readable side
this.push(data);
cb();
},
read(cb) {
// Data is pushed from write side
cb();
}
});
// Write data and read it back
echo.write('Hello');
echo.write('World');
echo.on('data', (chunk) => {
console.log('Echoed:', chunk.toString());
});
// More complex duplex with separate read/write logic
const processor = new Duplex({
write(data, cb) {
console.log('Processing input:', data.toString());
// Process and push to readable side
this.push(`Processed: ${data}`);
cb();
},
read(cb) {
// Readable side is fed by write operations
cb();
},
final(cb) {
console.log('Processing complete');
this.push(null); // End readable side
cb();
}
});A transform stream is a duplex stream that transforms data from its writable side to its readable side.
/**
* Creates a transform stream that maps input data to output data
* @param options - Configuration options for the transform stream
*/
class Transform extends Duplex {
constructor(options?: TransformOptions);
/** Override this method to implement data transformation */
_transform(data: any, cb: (err?: Error, output?: any) => void): void;
/** Override this method for final transformation operations */
_flush(cb: (err?: Error, output?: any) => void): void;
}
interface TransformOptions extends DuplexOptions {
/** Shorthand for _transform method */
transform?: (data: any, cb: (err?: Error, output?: any) => void) => void;
/** Shorthand for _flush method */
flush?: (cb: (err?: Error, output?: any) => void) => void;
}Usage Examples:
const { Transform } = require('streamx');
// Basic transformation
const upperCase = new Transform({
transform(data, cb) {
const transformed = data.toString().toUpperCase();
cb(null, transformed);
}
});
upperCase.write('hello');
upperCase.write('world');
upperCase.end();
upperCase.on('data', (chunk) => {
console.log('Uppercase:', chunk.toString());
});
// JSON parser transform
const jsonParser = new Transform({
transform(data, cb) {
try {
const parsed = JSON.parse(data.toString());
cb(null, parsed);
} catch (err) {
cb(err);
}
}
});
// Line-by-line processor
const lineProcessor = new Transform({
constructor() {
super();
this.buffer = '';
},
transform(chunk, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line
lines.forEach(line => {
if (line.trim()) {
this.push(`Processed: ${line}\n`);
}
});
cb();
},
flush(cb) {
if (this.buffer.trim()) {
this.push(`Processed: ${this.buffer}\n`);
}
cb();
}
});A PassThrough stream is a Transform stream that passes data through unchanged.
/**
* Creates a pass-through stream (identity transform)
* @param options - Configuration options for the pass-through stream
*/
class PassThrough extends Transform {
constructor(options?: TransformOptions);
// Automatically passes data through without transformation
}Usage Examples:
const { PassThrough } = require('streamx');
// Basic pass-through
const passThrough = new PassThrough();
passThrough.write('data flows through');
passThrough.end();
passThrough.on('data', (chunk) => {
console.log('Passed through:', chunk.toString());
});
// Use as a proxy with monitoring
const monitor = new PassThrough();
monitor.on('data', (chunk) => {
console.log(`Data passing through: ${chunk.length} bytes`);
});
// Pipe data through the monitor
someReadable.pipe(monitor).pipe(someWritable);StreamX transforms support advanced patterns for complex data processing.
Buffering Transform:
const bufferingTransform = new Transform({
constructor() {
super();
this.chunks = [];
this.totalSize = 0;
},
transform(chunk, cb) {
this.chunks.push(chunk);
this.totalSize += chunk.length;
// Emit when we have enough data
if (this.totalSize >= 1024) {
const combined = Buffer.concat(this.chunks);
this.chunks = [];
this.totalSize = 0;
cb(null, combined);
} else {
cb();
}
},
flush(cb) {
if (this.chunks.length > 0) {
const combined = Buffer.concat(this.chunks);
cb(null, combined);
} else {
cb();
}
}
});Async Transform:
const asyncTransform = new Transform({
async transform(data, cb) {
try {
// Simulate async operation
const processed = await processDataAsync(data.toString());
cb(null, processed);
} catch (err) {
cb(err);
}
}
});
async function processDataAsync(data) {
return new Promise((resolve) => {
setTimeout(() => {
resolve(`Async processed: ${data}`);
}, 100);
});
}Multi-output Transform:
const multiOutput = new Transform({
transform(data, cb) {
const input = data.toString();
// Push multiple outputs for single input
this.push(`Original: ${input}`);
this.push(`Reversed: ${input.split('').reverse().join('')}`);
this.push(`Length: ${input.length}`);
cb(); // Don't pass data to cb, we used push instead
}
});Transform streams include comprehensive error handling with proper cleanup.
const errorHandlingTransform = new Transform({
transform(data, cb) {
try {
if (data.toString().includes('poison')) {
throw new Error('Poisoned data detected');
}
const result = data.toString().toUpperCase();
cb(null, result);
} catch (err) {
cb(err); // Pass error to callback
}
}
});
errorHandlingTransform.on('error', (err) => {
console.error('Transform error:', err.message);
});
errorHandlingTransform.on('close', () => {
console.log('Transform stream closed');
});
// This will cause an error
errorHandlingTransform.write('poison pill');Duplex and Transform streams emit events from both readable and writable sides.
interface DuplexTransformEvents {
// Readable events
'readable': () => void;
'data': (chunk: any) => void;
'end': () => void;
// Writable events
'drain': () => void;
'finish': () => void;
// Shared events
'close': () => void;
'error': (err: Error) => void;
'pipe': (src: Readable) => void;
'piping': (dest: Writable) => void;
}Install with Tessl CLI
npx tessl i tessl/npm-streamx