Node.js Streams, a user-land copy of the stream library from Node.js
npx @tessl/cli install tessl/npm-readable-stream@4.7.0Readable Stream is a userland implementation of Node.js core streams, providing a stable and consistent streaming API across different Node.js versions. It serves as a mirror of the streams implementations from Node.js 18.19.0, allowing developers to guarantee a stable streams base regardless of the Node.js version being used.
npm install readable-streamconst {
Readable,
Writable,
Transform,
Duplex,
PassThrough,
pipeline,
finished
} = require('readable-stream');For ESM:
import {
Readable,
Writable,
Transform,
Duplex,
PassThrough,
pipeline,
finished
} from 'readable-stream';You can swap your require('stream') with require('readable-stream') without any changes, if you are just using one of the main classes and functions.
const { Readable, Transform, pipeline } = require('readable-stream');
// Create a readable stream
const readableStream = new Readable({
read() {
this.push('hello ');
this.push('world');
this.push(null); // End the stream
}
});
// Create a transform stream
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Pipe streams together using pipeline
pipeline(
readableStream,
upperCaseTransform,
process.stdout,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);Readable Stream is built around several key components:
Readable, Writable, Duplex, Transform, and PassThrough providing the foundation for all streaming operationspipeline, finished, compose for stream composition and lifecycle managementmap, filter, reduce for data transformationCore stream classes that form the foundation of the streaming system. These provide the base functionality for creating readable, writable, and transform streams.
class Readable extends Stream {
constructor(options?: ReadableOptions);
read(size?: number): any;
push(chunk: any, encoding?: BufferEncoding): boolean;
}
class Writable extends Stream {
constructor(options?: WritableOptions);
write(chunk: any, encoding?: BufferEncoding, cb?: (error: Error | null | undefined) => void): boolean;
end(chunk?: any, encoding?: BufferEncoding, cb?: () => void): this;
}
class Duplex extends Readable {
constructor(options?: DuplexOptions);
// Implements both Readable and Writable interfaces
}
class Transform extends Duplex {
constructor(options?: TransformOptions);
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
}
class PassThrough extends Transform {
constructor(options?: PassThroughOptions);
}Essential utilities for stream composition, error handling, and lifecycle management. These functions provide robust patterns for working with multiple streams.
function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>, callback: (err: NodeJS.ErrnoException | null) => void): NodeJS.ReadableStream;
function finished(stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options: FinishedOptions, callback: (err?: NodeJS.ErrnoException | null) => void): () => void;
function compose(...streams: Array<NodeJS.ReadWriteStream>): NodeJS.ReadWriteStream;
// Stream state utilities
function isDisturbed(stream: NodeJS.ReadableStream): boolean;
function isErrored(stream: NodeJS.ReadableStream | NodeJS.WritableStream): boolean;
// Internal utilities (exported for compatibility)
function _uint8ArrayToBuffer(chunk: Uint8Array): Buffer;
function _isUint8Array(value: any): boolean;
// Backwards compatibility
const Stream: typeof import('readable-stream');Functional programming methods available on Readable streams for data transformation and processing. These operators provide a chainable API for stream manipulation.
// Stream-returning operators (return streams)
map(fn: (chunk: any, options?: any) => any, options?: any): Readable;
filter(fn: (chunk: any, options?: any) => boolean, options?: any): Readable;
drop(number: number, options?: any): Readable;
take(number: number, options?: any): Readable;
// Promise-returning operators (return promises)
reduce(fn: (previous: any, current: any, options?: any) => any, initial?: any, options?: any): Promise<any>;
toArray(options?: any): Promise<any[]>;
forEach(fn: (chunk: any, options?: any) => void, options?: any): Promise<void>;Promise-based versions of utility functions that integrate seamlessly with modern async/await patterns.
const promises = {
pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
finished: (stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options?: FinishedOptions) => Promise<void>;
};interface ReadableOptions {
highWaterMark?: number;
encoding?: BufferEncoding;
objectMode?: boolean;
emitClose?: boolean;
read?(this: Readable, size: number): void;
destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void;
construct?(this: Readable, callback: (error?: Error | null) => void): void;
autoDestroy?: boolean;
signal?: AbortSignal;
}
interface WritableOptions {
highWaterMark?: number;
decodeStrings?: boolean;
defaultEncoding?: BufferEncoding;
objectMode?: boolean;
emitClose?: boolean;
write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
writev?(this: Writable, chunks: Array<{ chunk: any, encoding: BufferEncoding }>, callback: (error?: Error | null) => void): void;
destroy?(this: Writable, error: Error | null, callback: (error: Error | null) => void): void;
final?(this: Writable, callback: (error?: Error | null) => void): void;
construct?(this: Writable, callback: (error?: Error | null) => void): void;
autoDestroy?: boolean;
signal?: AbortSignal;
}
interface DuplexOptions extends ReadableOptions, WritableOptions {
allowHalfOpen?: boolean;
readableObjectMode?: boolean;
writableObjectMode?: boolean;
readableHighWaterMark?: number;
writableHighWaterMark?: number;
}
interface TransformOptions extends DuplexOptions {
transform?(this: Transform, chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
flush?(this: Transform, callback: TransformCallback): void;
}
interface PassThroughOptions extends TransformOptions {}
interface FinishedOptions {
error?: boolean;
readable?: boolean;
writable?: boolean;
signal?: AbortSignal;
}
type TransformCallback = (error?: Error | null, data?: any) => void;