or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdpromise-api.mdstream-classes.mdstream-operators.mdutility-functions.md
tile.json

tessl/npm-readable-stream

Node.js Streams, a user-land copy of the stream library from Node.js

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/readable-stream@4.7.x

To install, run

npx @tessl/cli install tessl/npm-readable-stream@4.7.0

index.mddocs/

Readable Stream

Readable Stream is a userland implementation of Node.js core streams, providing a stable and consistent streaming API across different Node.js versions. It serves as a mirror of the streams implementations from Node.js 18.19.0, allowing developers to guarantee a stable streams base regardless of the Node.js version being used.

Package Information

  • Package Name: readable-stream
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install readable-stream

Core Imports

const {
  Readable,
  Writable,
  Transform,
  Duplex,
  PassThrough,
  pipeline,
  finished
} = require('readable-stream');

For ESM:

import {
  Readable,
  Writable,
  Transform,
  Duplex,
  PassThrough,
  pipeline,
  finished
} from 'readable-stream';

Basic Usage

You can swap your require('stream') with require('readable-stream') without any changes, if you are just using one of the main classes and functions.

const { Readable, Transform, pipeline } = require('readable-stream');

// Create a readable stream
const readableStream = new Readable({
  read() {
    this.push('hello ');
    this.push('world');
    this.push(null); // End the stream
  }
});

// Create a transform stream
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Pipe streams together using pipeline
pipeline(
  readableStream,
  upperCaseTransform,
  process.stdout,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Architecture

Readable Stream is built around several key components:

  • Core Stream Classes: Readable, Writable, Duplex, Transform, and PassThrough providing the foundation for all streaming operations
  • Utility Functions: pipeline, finished, compose for stream composition and lifecycle management
  • Stream Operators: Functional programming methods like map, filter, reduce for data transformation
  • Promise API: Promise-based versions of utility functions for modern async/await patterns
  • Cross-Platform Support: Works in both Node.js and browser environments with appropriate polyfills

Capabilities

Stream Classes

Core stream classes that form the foundation of the streaming system. These provide the base functionality for creating readable, writable, and transform streams.

class Readable extends Stream {
  constructor(options?: ReadableOptions);
  read(size?: number): any;
  push(chunk: any, encoding?: BufferEncoding): boolean;
}

class Writable extends Stream {
  constructor(options?: WritableOptions);
  write(chunk: any, encoding?: BufferEncoding, cb?: (error: Error | null | undefined) => void): boolean;
  end(chunk?: any, encoding?: BufferEncoding, cb?: () => void): this;
}

class Duplex extends Readable {
  constructor(options?: DuplexOptions);
  // Implements both Readable and Writable interfaces
}

class Transform extends Duplex {
  constructor(options?: TransformOptions);
  _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
}

class PassThrough extends Transform {
  constructor(options?: PassThroughOptions);
}

Stream Classes

Utility Functions

Essential utilities for stream composition, error handling, and lifecycle management. These functions provide robust patterns for working with multiple streams.

function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>, callback: (err: NodeJS.ErrnoException | null) => void): NodeJS.ReadableStream;

function finished(stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options: FinishedOptions, callback: (err?: NodeJS.ErrnoException | null) => void): () => void;

function compose(...streams: Array<NodeJS.ReadWriteStream>): NodeJS.ReadWriteStream;

// Stream state utilities
function isDisturbed(stream: NodeJS.ReadableStream): boolean;
function isErrored(stream: NodeJS.ReadableStream | NodeJS.WritableStream): boolean;

// Internal utilities (exported for compatibility)
function _uint8ArrayToBuffer(chunk: Uint8Array): Buffer;
function _isUint8Array(value: any): boolean;

// Backwards compatibility
const Stream: typeof import('readable-stream');

Utility Functions

Stream Operators

Functional programming methods available on Readable streams for data transformation and processing. These operators provide a chainable API for stream manipulation.

// Stream-returning operators (return streams)
map(fn: (chunk: any, options?: any) => any, options?: any): Readable;
filter(fn: (chunk: any, options?: any) => boolean, options?: any): Readable;
drop(number: number, options?: any): Readable;
take(number: number, options?: any): Readable;

// Promise-returning operators (return promises)
reduce(fn: (previous: any, current: any, options?: any) => any, initial?: any, options?: any): Promise<any>;
toArray(options?: any): Promise<any[]>;
forEach(fn: (chunk: any, options?: any) => void, options?: any): Promise<void>;

Stream Operators

Promise API

Promise-based versions of utility functions that integrate seamlessly with modern async/await patterns.

const promises = {
  pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
  finished: (stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options?: FinishedOptions) => Promise<void>;
};

Promise API

Types

interface ReadableOptions {
  highWaterMark?: number;
  encoding?: BufferEncoding;
  objectMode?: boolean;
  emitClose?: boolean;
  read?(this: Readable, size: number): void;
  destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void;
  construct?(this: Readable, callback: (error?: Error | null) => void): void;
  autoDestroy?: boolean;
  signal?: AbortSignal;
}

interface WritableOptions {
  highWaterMark?: number;
  decodeStrings?: boolean;
  defaultEncoding?: BufferEncoding;
  objectMode?: boolean;
  emitClose?: boolean;
  write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
  writev?(this: Writable, chunks: Array<{ chunk: any, encoding: BufferEncoding }>, callback: (error?: Error | null) => void): void;
  destroy?(this: Writable, error: Error | null, callback: (error: Error | null) => void): void;
  final?(this: Writable, callback: (error?: Error | null) => void): void;
  construct?(this: Writable, callback: (error?: Error | null) => void): void;
  autoDestroy?: boolean;
  signal?: AbortSignal;
}

interface DuplexOptions extends ReadableOptions, WritableOptions {
  allowHalfOpen?: boolean;
  readableObjectMode?: boolean;
  writableObjectMode?: boolean;
  readableHighWaterMark?: number;
  writableHighWaterMark?: number;
}

interface TransformOptions extends DuplexOptions {
  transform?(this: Transform, chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
  flush?(this: Transform, callback: TransformCallback): void;
}

interface PassThroughOptions extends TransformOptions {}

interface FinishedOptions {
  error?: boolean;
  readable?: boolean;
  writable?: boolean;
  signal?: AbortSignal;
}

type TransformCallback = (error?: Error | null, data?: any) => void;