Node.js Streams, a user-land copy of the stream library from Node.js
—
Core stream classes that provide the foundation for all streaming operations in readable-stream. These classes mirror Node.js native streams with enhanced stability and cross-version compatibility.
Base class for readable streams that produce data for consumption.
/**
* Creates a readable stream
* @param options - Configuration options for the readable stream
*/
class Readable extends Stream {
constructor(options?: ReadableOptions);
/**
* Reads data from the stream
* @param size - Optional amount of data to read
* @returns Data chunk or null if no data available
*/
read(size?: number): any;
/**
* Pushes data into the stream buffer
* @param chunk - Data to push (null to end stream)
* @param encoding - Encoding for string chunks
* @returns true if more data can be written
*/
push(chunk: any, encoding?: BufferEncoding): boolean;
/**
* Unshifts data back to the front of the stream
* @param chunk - Data to unshift
* @param encoding - Encoding for string chunks
*/
unshift(chunk: any, encoding?: BufferEncoding): void;
/**
* Pauses the stream from emitting 'data' events
* @returns this stream instance
*/
pause(): this;
/**
* Resumes emitting 'data' events
* @returns this stream instance
*/
resume(): this;
/**
* Sets the encoding for string data
* @param encoding - The encoding to use
* @returns this stream instance
*/
setEncoding(encoding: BufferEncoding): this;
/**
* Pipes this readable to a writable stream
* @param destination - The writable stream to pipe to
* @param options - Piping options
* @returns the destination stream
*/
pipe<T extends NodeJS.WritableStream>(destination: T, options?: { end?: boolean }): T;
/**
* Removes a pipe destination
* @param destination - The destination to unpipe
* @returns this stream instance
*/
unpipe(destination?: NodeJS.WritableStream): this;
/**
* Wraps an old-style stream in a Readable
* @param stream - Old-style readable stream
* @param options - Wrap options
* @returns Readable stream
*/
static wrap(stream: NodeJS.ReadableStream, options?: ReadableOptions): Readable;
/**
* Creates a Readable from iterable/async iterable
* @param iterable - Source iterable
* @param options - Stream options
* @returns Readable stream
*/
static from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable;
}Usage Examples:
const { Readable } = require('readable-stream');
// Create a simple readable stream
const readable = new Readable({
read() {
this.push('chunk 1');
this.push('chunk 2');
this.push(null); // End the stream
}
});
// Create a readable from an array
const fromArray = Readable.from(['a', 'b', 'c']);
// Handle data events
readable.on('data', (chunk) => {
console.log('Received:', chunk.toString());
});
readable.on('end', () => {
console.log('Stream ended');
});Base class for writable streams that consume data.
/**
* Creates a writable stream
* @param options - Configuration options for the writable stream
*/
class Writable extends Stream {
constructor(options?: WritableOptions);
/**
* Writes data to the stream
* @param chunk - Data to write
* @param encoding - Encoding for string chunks
* @param callback - Callback when write completes
* @returns false if buffer is full, true otherwise
*/
write(chunk: any, encoding?: BufferEncoding, callback?: (error: Error | null | undefined) => void): boolean;
/**
* Ends the writable stream
* @param chunk - Optional final chunk to write
* @param encoding - Encoding for string chunks
* @param callback - Callback when stream ends
* @returns this stream instance
*/
end(chunk?: any, encoding?: BufferEncoding, callback?: () => void): this;
/**
* Sets default encoding for string writes
* @param encoding - The default encoding
* @returns this stream instance
*/
setDefaultEncoding(encoding: BufferEncoding): this;
/**
* Destroys the stream
* @param error - Optional error to emit
* @returns this stream instance
*/
destroy(error?: Error): this;
/**
* Corked state - temporarily buffer writes
*/
cork(): void;
/**
* Uncork the stream - flush buffered writes
*/
uncork(): void;
}Usage Examples:
const { Writable } = require('readable-stream');
// Create a writable that logs data
const writable = new Writable({
write(chunk, encoding, callback) {
console.log('Writing:', chunk.toString());
callback();
}
});
// Write data
writable.write('hello');
writable.write('world');
writable.end();
// Handle events
writable.on('finish', () => {
console.log('All writes completed');
});Stream that is both readable and writable, allowing bidirectional data flow.
/**
* Creates a duplex stream
* @param options - Configuration options combining readable and writable options
*/
class Duplex extends Readable {
constructor(options?: DuplexOptions);
// Inherits all Readable methods
// Plus Writable interface:
write(chunk: any, encoding?: BufferEncoding, callback?: (error: Error | null | undefined) => void): boolean;
end(chunk?: any, encoding?: BufferEncoding, callback?: () => void): this;
setDefaultEncoding(encoding: BufferEncoding): this;
cork(): void;
uncork(): void;
}Usage Examples:
const { Duplex } = require('readable-stream');
// Create a duplex stream
const duplex = new Duplex({
read() {
this.push('data from readable side');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('Received on writable side:', chunk.toString());
callback();
}
});
// Use both sides
duplex.write('hello');
duplex.on('data', (chunk) => {
console.log('Read:', chunk.toString());
});Duplex stream where the output is computed from the input, providing data transformation capabilities.
/**
* Creates a transform stream
* @param options - Configuration options including transform function
*/
class Transform extends Duplex {
constructor(options?: TransformOptions);
/**
* Transform implementation - override this method
* @param chunk - Input data chunk
* @param encoding - Encoding of the chunk
* @param callback - Callback to call when transform is complete
*/
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
/**
* Flush implementation - called before stream ends
* @param callback - Callback to call when flush is complete
*/
_flush(callback: TransformCallback): void;
}Usage Examples:
const { Transform } = require('readable-stream');
// Create an uppercase transform
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Use the transform
upperCaseTransform.write('hello');
upperCaseTransform.write('world');
upperCaseTransform.end();
upperCaseTransform.on('data', (chunk) => {
console.log('Transformed:', chunk.toString()); // "HELLO", "WORLD"
});Transform stream that passes input through to output unchanged, useful for testing and stream composition.
/**
* Creates a passthrough stream
* @param options - Configuration options
*/
class PassThrough extends Transform {
constructor(options?: PassThroughOptions);
// Automatically passes all input to output unchanged
}Usage Examples:
const { PassThrough } = require('readable-stream');
// Create a passthrough for monitoring
const monitor = new PassThrough();
monitor.on('data', (chunk) => {
console.log('Data passing through:', chunk.toString());
});
// Data flows through unchanged
monitor.write('test data');
monitor.end();All streams emit various events during their lifecycle:
'data' - Emitted when data is available to read'end' - Emitted when no more data will be provided'readable' - Emitted when data is available to be read'close' - Emitted when the stream is closed'error' - Emitted when an error occurs'drain' - Emitted when it's safe to write again after buffer was full'finish' - Emitted after end() is called and all data has been processed'pipe' - Emitted when a readable is piped to this writable'unpipe' - Emitted when a readable is unpiped from this writable'close' - Emitted when the stream is closed'error' - Emitted when an error occursinterface ReadableOptions {
highWaterMark?: number;
encoding?: BufferEncoding;
objectMode?: boolean;
read?(this: Readable, size: number): void;
destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void;
autoDestroy?: boolean;
signal?: AbortSignal;
}
interface WritableOptions {
highWaterMark?: number;
decodeStrings?: boolean;
defaultEncoding?: BufferEncoding;
objectMode?: boolean;
emitClose?: boolean;
write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
writev?(this: Writable, chunks: Array<{ chunk: any, encoding: BufferEncoding }>, callback: (error?: Error | null) => void): void;
destroy?(this: Writable, error: Error | null, callback: (error: Error | null) => void): void;
final?(this: Writable, callback: (error?: Error | null) => void): void;
autoDestroy?: boolean;
signal?: AbortSignal;
}
interface DuplexOptions extends ReadableOptions, WritableOptions {
allowHalfOpen?: boolean;
readableObjectMode?: boolean;
writableObjectMode?: boolean;
readableHighWaterMark?: number;
writableHighWaterMark?: number;
}
interface TransformOptions extends DuplexOptions {
transform?(this: Transform, chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
flush?(this: Transform, callback: TransformCallback): void;
}
interface PassThroughOptions extends TransformOptions {}
type TransformCallback = (error?: Error | null, data?: any) => void;Install with Tessl CLI
npx tessl i tessl/npm-readable-stream