An iteration of the Node.js core streams with a series of improvements
—
Readable streams in StreamX provide enhanced lifecycle management, proper backpressure handling, and improved error handling compared to Node.js core streams. They support both flowing and non-flowing modes with consistent behavior.
Creates a readable stream with enhanced lifecycle support and proper resource management.
/**
* Creates a new readable stream with enhanced lifecycle support
* @param options - Configuration options for the readable stream
*/
class Readable extends Stream {
constructor(options?: ReadableOptions);
/** Override this method to implement custom read logic */
_read(cb: () => void): void;
/** Lifecycle hook called before the first read operation */
_open(cb: (err?: Error) => void): void;
/** Cleanup hook called when the stream is destroyed */
_destroy(cb: (err?: Error) => void): void;
/** Hook called immediately when destroy() is first invoked */
_predestroy(): void;
/** Push data to the stream buffer */
push(data: any): boolean;
/** Read data from the stream buffer */
read(): any;
/** Add data to the front of the buffer */
unshift(data: any): void;
/** Pause the stream */
pause(): Readable;
/** Resume the stream */
resume(): Readable;
/** Pipe to a writable stream with callback support */
pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
/** Set text encoding for automatic string decoding */
setEncoding(encoding: string): Readable;
/** Forcefully destroy the stream */
destroy(err?: Error): void;
}
interface ReadableOptions {
/** Maximum buffer size in bytes (default: 16384) */
highWaterMark?: number;
/** Optional function to map input data */
map?: (data: any) => any;
/** Optional function to calculate byte size of data */
byteLength?: (data: any) => number;
/** AbortSignal that triggers destroy when aborted */
signal?: AbortSignal;
/** Eagerly open the stream (default: false) */
eagerOpen?: boolean;
/** Shorthand for _read method */
read?: (cb: () => void) => void;
/** Shorthand for _open method */
open?: (cb: (err?: Error) => void) => void;
/** Shorthand for _destroy method */
destroy?: (cb: (err?: Error) => void) => void;
/** Shorthand for _predestroy method */
predestroy?: () => void;
/** Text encoding for automatic string decoding */
encoding?: string;
}Usage Examples:
const { Readable } = require('streamx');
// Basic readable stream
const readable = new Readable({
read(cb) {
// Push some data
this.push('Hello, ');
this.push('World!');
this.push(null); // End the stream
cb();
}
});
// Listen for data
readable.on('data', (chunk) => {
console.log('Received:', chunk.toString());
});
readable.on('end', () => {
console.log('Stream ended');
});
// Readable with lifecycle hooks
const fileReader = new Readable({
open(cb) {
console.log('Opening file...');
// Open file or resource
cb();
},
read(cb) {
// Read from file
this.push('File content...');
this.push(null);
cb();
},
destroy(cb) {
console.log('Closing file...');
// Clean up resources
cb();
}
});StreamX provides static utility methods for readable stream inspection and manipulation.
/**
* Check if a readable stream is currently paused
* @param stream - The readable stream to check
* @returns True if the stream is paused
*/
static isPaused(stream: Readable): boolean;
/**
* Check if a readable stream is under backpressure
* @param stream - The readable stream to check
* @returns True if the stream is backpressured
*/
static isBackpressured(stream: Readable): boolean;
/**
* Create a readable stream from various data sources
* @param source - Array, buffer, string, or async iterator
* @returns New readable stream
*/
static from(source: any[] | Buffer | string | AsyncIterable<any>): Readable;Static Method Examples:
const { Readable } = require('streamx');
// Create from array
const arrayStream = Readable.from([1, 2, 3, 4, 5]);
// Create from string
const stringStream = Readable.from('Hello World');
// Create from async iterator
async function* generator() {
yield 'first';
yield 'second';
yield 'third';
}
const iteratorStream = Readable.from(generator());
// Check stream state
if (Readable.isPaused(arrayStream)) {
console.log('Stream is paused');
}
if (Readable.isBackpressured(arrayStream)) {
console.log('Stream is under backpressure');
}Readable streams emit various events during their lifecycle.
interface ReadableEvents {
/** Emitted when data is available to read */
'readable': () => void;
/** Emitted when data is being read from the stream */
'data': (chunk: any) => void;
/** Emitted when the stream has ended and no more data is available */
'end': () => void;
/** Emitted when the stream has been fully closed */
'close': () => void;
/** Emitted when an error occurs */
'error': (err: Error) => void;
/** Emitted when the stream is piped to a destination */
'piping': (dest: Writable) => void;
}interface ReadableProperties {
/** Boolean indicating whether the stream has been destroyed */
destroyed: boolean;
}StreamX readable streams support advanced configuration for specialized use cases.
Map Function: Transform data as it's pushed to the stream:
const readable = new Readable({
map: (data) => {
// Transform strings to uppercase
return typeof data === 'string' ? data.toUpperCase() : data;
},
read(cb) {
this.push('hello');
this.push('world');
this.push(null);
cb();
}
});ByteLength Function: Custom byte length calculation for backpressure:
const readable = new Readable({
byteLength: (data) => {
// Custom size calculation
if (typeof data === 'string') return data.length;
if (Buffer.isBuffer(data)) return data.length;
return 1024; // Default size for objects
},
highWaterMark: 8192, // 8KB buffer
read(cb) {
this.push({ large: 'object' });
cb();
}
});AbortSignal Integration:
const controller = new AbortController();
const readable = new Readable({
signal: controller.signal,
read(cb) {
// This will be cancelled when signal is aborted
setTimeout(() => {
if (!controller.signal.aborted) {
this.push('data');
cb();
}
}, 1000);
}
});
// Cancel the stream after 500ms
setTimeout(() => controller.abort(), 500);Install with Tessl CLI
npx tessl i tessl/npm-streamx