An iteration of the Node.js core streams with a series of improvements
—
Readable stream implementation providing data flow control, buffer management, and pipe operations with enhanced error handling.
Creates a new Readable stream with configuration options.
/**
* Creates a new Readable stream
* @param opts - Configuration options for the readable stream
*/
class Readable extends Stream {
constructor(opts?: ReadableOptions);
}
interface ReadableOptions extends StreamOptions {
/** Maximum buffer size in bytes (default: 16384) */
highWaterMark?: number;
/** Function to map input data */
map?: (data: any) => any;
/** Function to calculate byte size of data */
byteLength?: (data: any) => number;
/** Read function shorthand */
read?: (cb: (error?: Error) => void) => void;
}Usage Example:
const { Readable } = require("@mafintosh/streamx");
const readable = new Readable({
highWaterMark: 8192,
read(cb) {
this.push("Hello World");
cb(null);
}
});Called when the stream wants new data. Override to implement data reading logic.
/**
* Called when stream wants new data
* @param cb - Callback to call when read operation is complete
*/
_read(cb: (error?: Error) => void): void;Usage Example:
class CustomReadable extends Readable {
constructor() {
super();
this.counter = 0;
}
_read(cb) {
if (this.counter < 5) {
this.push(`data-${this.counter++}`);
} else {
this.push(null); // End stream
}
cb(null);
}
}Pushes data to the stream buffer.
/**
* Push data to stream buffer
* @param data - Data to push, or null to end stream
* @returns True if buffer is not full and more data can be pushed
*/
push(data: any): boolean;Usage Example:
const readable = new Readable({
read(cb) {
const shouldContinue = this.push("some data");
if (shouldContinue) {
this.push("more data");
}
this.push(null); // End stream
cb(null);
}
});Reads data from the stream buffer.
/**
* Read data from stream buffer
* @returns Data from buffer, or null if buffer is empty or stream ended
*/
read(): any;Usage Example:
const readable = new Readable({
read(cb) {
this.push("Hello");
this.push("World");
this.push(null);
cb(null);
}
});
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log('Read:', chunk);
}
});Adds data to the front of the buffer (useful for putting back over-read data).
/**
* Add data to front of buffer
* @param data - Data to add to front of buffer
*/
unshift(data: any): void;Usage Example:
const readable = new Readable({
read(cb) {
this.push("Hello World");
this.push(null);
cb(null);
}
});
readable.on('readable', () => {
const data = readable.read();
if (data === "Hello World") {
// Put it back for later processing
readable.unshift(data);
}
});Pauses the stream (only needed if stream is resumed).
/**
* Pause the stream
*/
pause(): void;Resumes/starts consuming the stream as fast as possible.
/**
* Resume consuming the stream
*/
resume(): void;Usage Example:
const readable = new Readable({
read(cb) {
this.push(`data-${Date.now()}`);
cb(null);
}
});
// Start consuming
readable.resume();
// Pause after 1 second
setTimeout(() => {
readable.pause();
}, 1000);
// Resume after another second
setTimeout(() => {
readable.resume();
}, 2000);Efficiently pipes the readable stream to a writable stream with error handling.
/**
* Pipe readable stream to writable stream
* @param destination - Writable stream to pipe to
* @param callback - Optional callback called when pipeline completes
* @returns The destination stream
*/
pipe(destination: Writable, callback?: (error?: Error) => void): Writable;Usage Example:
const { Readable, Writable } = require("@mafintosh/streamx");
const readable = new Readable({
read(cb) {
this.push("Hello World");
this.push(null);
cb(null);
}
});
const writable = new Writable({
write(data, cb) {
console.log("Received:", data.toString());
cb(null);
}
});
readable.pipe(writable, (err) => {
if (err) console.error("Pipeline failed:", err);
else console.log("Pipeline completed successfully");
});Emitted when data is available in the buffer and buffer was previously empty.
readable.on('readable', () => {
// Data is available to read
});Emitted when data is being read from the stream. Attaching this event automatically resumes the stream.
readable.on('data', (chunk) => {
console.log('Received chunk:', chunk);
});Emitted when the stream has ended and no more data is available.
readable.on('end', () => {
console.log('Stream ended');
});Emitted when the stream has fully closed.
readable.on('close', () => {
console.log('Stream closed');
});Emitted when an error occurs.
readable.on('error', (err) => {
console.error('Stream error:', err);
});Complete Usage Example:
const { Readable } = require("@mafintosh/streamx");
const readable = new Readable({
read(cb) {
// Simulate reading data
setTimeout(() => {
if (Math.random() > 0.8) {
this.push(null); // End stream
} else {
this.push(`data-${Date.now()}`);
}
cb(null);
}, 100);
}
});
readable.on('data', (chunk) => {
console.log('Data:', chunk);
});
readable.on('end', () => {
console.log('Stream ended');
});
readable.on('close', () => {
console.log('Stream closed');
});
readable.on('error', (err) => {
console.error('Error:', err);
});Install with Tessl CLI
npx tessl i tessl/npm-mafintosh--streamx