An iteration of the Node.js core streams with a series of improvements
—
Duplex streams that are both readable and writable, inheriting from Readable while implementing the complete Writable interface.
Creates a new Duplex stream with configuration options for both readable and writable sides.
/**
* Creates a new Duplex stream
* @param opts - Configuration options for both readable and writable functionality
*/
class Duplex extends Readable {
constructor(opts?: DuplexOptions);
}
interface DuplexOptions extends ReadableOptions {
/** Map function for readable side only */
mapReadable?: (data: any) => any;
/** ByteLength function for readable side only */
byteLengthReadable?: (data: any) => number;
/** Map function for writable side only */
mapWritable?: (data: any) => any;
/** ByteLength function for writable side only */
byteLengthWritable?: (data: any) => number;
/** Write function shorthand */
write?: (data: any, cb: (error?: Error) => void) => void;
/** Final function shorthand */
final?: (cb: (error?: Error) => void) => void;
}Usage Example:
const { Duplex } = require("@mafintosh/streamx");
const duplex = new Duplex({
read(cb) {
this.push(`read-${Date.now()}`);
cb(null);
},
write(data, cb) {
console.log("Writing:", data.toString());
cb(null);
}
});All methods and events from Readable streams are available:
/**
* Called when stream wants new data for reading
* @param cb - Callback to call when read operation is complete
*/
_read(cb: (error?: Error) => void): void;
/**
* Push data to the readable buffer
* @param data - Data to push, or null to end readable side
* @returns True if buffer is not full
*/
push(data: any): boolean;
/**
* Read data from the readable buffer
* @returns Data from buffer, or null if empty
*/
read(): any;All methods and events from Writable streams are available:
/**
* Called when stream wants to write data
* @param data - Data to write
* @param callback - Callback to call when write is complete
*/
_write(data: any, callback: (error?: Error) => void): void;
/**
* Write data to the stream
* @param data - Data to write
* @returns True if buffer is not full
*/
write(data: any): boolean;
/**
* End the writable side gracefully
* @param data - Optional final data to write before ending
*/
end(data?: any): void;
/**
* Called before finish event for cleanup
* @param callback - Callback to call when final is complete
*/
_final(callback: (error?: Error) => void): void;Echo Server Example:
const { Duplex } = require("@mafintosh/streamx");
class EchoStream extends Duplex {
constructor() {
super();
this.buffer = [];
}
_read(cb) {
if (this.buffer.length > 0) {
this.push(this.buffer.shift());
}
cb(null);
}
_write(data, cb) {
// Echo written data back to readable side
this.buffer.push(`Echo: ${data}`);
this.push(`Echo: ${data}`);
cb(null);
}
}
const echo = new EchoStream();
// Write data
echo.write("Hello");
echo.write("World");
// Read echoed data
echo.on('data', (chunk) => {
console.log('Received:', chunk.toString());
});
echo.end();Proxy Stream Example:
const { Duplex } = require("@mafintosh/streamx");
class ProxyStream extends Duplex {
constructor(target) {
super();
this.target = target;
// Forward data from target to our readable side
this.target.on('data', (chunk) => {
this.push(chunk);
});
this.target.on('end', () => {
this.push(null);
});
}
_write(data, cb) {
// Forward written data to target
this.target.write(data);
cb(null);
}
_final(cb) {
this.target.end();
cb(null);
}
}Bidirectional Transform Example:
const { Duplex } = require("@mafintosh/streamx");
class BidirectionalTransform extends Duplex {
constructor() {
super();
this.readCounter = 0;
this.writeCounter = 0;
}
_read(cb) {
// Generate data for reading
if (this.readCounter < 5) {
this.push(`generated-${this.readCounter++}`);
} else {
this.push(null);
}
cb(null);
}
_write(data, cb) {
// Process written data
const processed = data.toString().toUpperCase();
console.log(`Processed write #${this.writeCounter++}:`, processed);
cb(null);
}
}
const transform = new BidirectionalTransform();
// Read generated data
transform.on('data', (chunk) => {
console.log('Read:', chunk.toString());
});
// Write data to be processed
transform.write("hello");
transform.write("world");
transform.end();Duplex streams emit events from both Readable and Writable:
readable - Data available to readdata - Data chunk read (auto-resumes stream)end - Readable side endeddrain - Buffer drained, safe to write morefinish - All writes completedclose - Stream fully closederror - Error occurredComplete Event Handling Example:
const { Duplex } = require("@mafintosh/streamx");
const duplex = new Duplex({
read(cb) {
this.push(`data-${Date.now()}`);
setTimeout(() => cb(null), 100);
},
write(data, cb) {
console.log("Writing:", data.toString());
setTimeout(() => cb(null), 50);
}
});
// Readable events
duplex.on('readable', () => {
console.log('Readable event');
});
duplex.on('data', (chunk) => {
console.log('Data:', chunk.toString());
});
duplex.on('end', () => {
console.log('Readable end');
});
// Writable events
duplex.on('drain', () => {
console.log('Drain event');
});
duplex.on('finish', () => {
console.log('Writable finish');
});
// Shared events
duplex.on('close', () => {
console.log('Stream closed');
});
duplex.on('error', (err) => {
console.error('Error:', err);
});
// Use both sides
duplex.write("test data");
duplex.end();Separate Readable/Writable Configuration:
const duplex = new Duplex({
// Readable configuration
highWaterMark: 8192,
mapReadable: (data) => data.toString().toUpperCase(),
byteLengthReadable: (data) => Buffer.byteLength(data),
// Writable configuration
mapWritable: (data) => Buffer.from(data),
byteLengthWritable: (data) => data.length,
// Implementation
read(cb) {
this.push("readable data");
cb(null);
},
write(data, cb) {
console.log("Wrote:", data);
cb(null);
}
});Install with Tessl CLI
npx tessl i tessl/npm-mafintosh--streamx