CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-readable-stream

Node.js Streams, a user-land copy of the stream library from Node.js

Pending
Overview
Eval results
Files

stream-classes.mddocs/

Stream Classes

Core stream classes that provide the foundation for all streaming operations in readable-stream. These classes mirror Node.js native streams with enhanced stability and cross-version compatibility.

Capabilities

Readable

Base class for readable streams that produce data for consumption.

/**
 * Creates a readable stream
 * @param options - Configuration options for the readable stream
 */
class Readable extends Stream {
  constructor(options?: ReadableOptions);
  
  /**
   * Reads data from the stream
   * @param size - Optional amount of data to read
   * @returns Data chunk or null if no data available
   */
  read(size?: number): any;
  
  /**
   * Pushes data into the stream buffer
   * @param chunk - Data to push (null to end stream)
   * @param encoding - Encoding for string chunks
   * @returns true if more data can be written
   */
  push(chunk: any, encoding?: BufferEncoding): boolean;
  
  /**
   * Unshifts data back to the front of the stream
   * @param chunk - Data to unshift
   * @param encoding - Encoding for string chunks
   */
  unshift(chunk: any, encoding?: BufferEncoding): void;
  
  /**
   * Pauses the stream from emitting 'data' events
   * @returns this stream instance
   */
  pause(): this;
  
  /**
   * Resumes emitting 'data' events
   * @returns this stream instance
   */
  resume(): this;
  
  /**
   * Sets the encoding for string data
   * @param encoding - The encoding to use
   * @returns this stream instance
   */
  setEncoding(encoding: BufferEncoding): this;
  
  /**
   * Pipes this readable to a writable stream
   * @param destination - The writable stream to pipe to
   * @param options - Piping options
   * @returns the destination stream
   */
  pipe<T extends NodeJS.WritableStream>(destination: T, options?: { end?: boolean }): T;
  
  /**
   * Removes a pipe destination
   * @param destination - The destination to unpipe
   * @returns this stream instance
   */
  unpipe(destination?: NodeJS.WritableStream): this;
  
  /**
   * Wraps an old-style stream in a Readable
   * @param stream - Old-style readable stream
   * @param options - Wrap options
   * @returns Readable stream
   */
  static wrap(stream: NodeJS.ReadableStream, options?: ReadableOptions): Readable;
  
  /**
   * Creates a Readable from iterable/async iterable
   * @param iterable - Source iterable
   * @param options - Stream options
   * @returns Readable stream
   */
  static from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable;
}

Usage Examples:

const { Readable } = require('readable-stream');

// Create a simple readable stream
const readable = new Readable({
  read() {
    this.push('chunk 1');
    this.push('chunk 2');
    this.push(null); // End the stream
  }
});

// Create a readable from an array
const fromArray = Readable.from(['a', 'b', 'c']);

// Handle data events
readable.on('data', (chunk) => {
  console.log('Received:', chunk.toString());
});

readable.on('end', () => {
  console.log('Stream ended');
});

Writable

Base class for writable streams that consume data.

/**
 * Creates a writable stream
 * @param options - Configuration options for the writable stream
 */
class Writable extends Stream {
  constructor(options?: WritableOptions);
  
  /**
   * Writes data to the stream
   * @param chunk - Data to write
   * @param encoding - Encoding for string chunks
   * @param callback - Callback when write completes
   * @returns false if buffer is full, true otherwise
   */
  write(chunk: any, encoding?: BufferEncoding, callback?: (error: Error | null | undefined) => void): boolean;
  
  /**
   * Ends the writable stream
   * @param chunk - Optional final chunk to write
   * @param encoding - Encoding for string chunks
   * @param callback - Callback when stream ends
   * @returns this stream instance
   */
  end(chunk?: any, encoding?: BufferEncoding, callback?: () => void): this;
  
  /**
   * Sets default encoding for string writes
   * @param encoding - The default encoding
   * @returns this stream instance
   */
  setDefaultEncoding(encoding: BufferEncoding): this;
  
  /**
   * Destroys the stream
   * @param error - Optional error to emit
   * @returns this stream instance
   */
  destroy(error?: Error): this;
  
  /**
   * Corked state - temporarily buffer writes
   */
  cork(): void;
  
  /**
   * Uncork the stream - flush buffered writes
   */
  uncork(): void;
}

Usage Examples:

const { Writable } = require('readable-stream');

// Create a writable that logs data
const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log('Writing:', chunk.toString());
    callback();
  }
});

// Write data
writable.write('hello');
writable.write('world');
writable.end();

// Handle events
writable.on('finish', () => {
  console.log('All writes completed');
});

Duplex

Stream that is both readable and writable, allowing bidirectional data flow.

/**
 * Creates a duplex stream
 * @param options - Configuration options combining readable and writable options
 */
class Duplex extends Readable {
  constructor(options?: DuplexOptions);
  
  // Inherits all Readable methods
  // Plus Writable interface:
  
  write(chunk: any, encoding?: BufferEncoding, callback?: (error: Error | null | undefined) => void): boolean;
  end(chunk?: any, encoding?: BufferEncoding, callback?: () => void): this;
  setDefaultEncoding(encoding: BufferEncoding): this;
  cork(): void;
  uncork(): void;
}

Usage Examples:

const { Duplex } = require('readable-stream');

// Create a duplex stream
const duplex = new Duplex({
  read() {
    this.push('data from readable side');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('Received on writable side:', chunk.toString());
    callback();
  }
});

// Use both sides
duplex.write('hello');
duplex.on('data', (chunk) => {
  console.log('Read:', chunk.toString());
});

Transform

Duplex stream where the output is computed from the input, providing data transformation capabilities.

/**
 * Creates a transform stream
 * @param options - Configuration options including transform function
 */
class Transform extends Duplex {
  constructor(options?: TransformOptions);
  
  /**
   * Transform implementation - override this method
   * @param chunk - Input data chunk
   * @param encoding - Encoding of the chunk
   * @param callback - Callback to call when transform is complete
   */
  _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
  
  /**
   * Flush implementation - called before stream ends
   * @param callback - Callback to call when flush is complete
   */
  _flush(callback: TransformCallback): void;
}

Usage Examples:

const { Transform } = require('readable-stream');

// Create an uppercase transform
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Use the transform
upperCaseTransform.write('hello');
upperCaseTransform.write('world');
upperCaseTransform.end();

upperCaseTransform.on('data', (chunk) => {
  console.log('Transformed:', chunk.toString()); // "HELLO", "WORLD"
});

PassThrough

Transform stream that passes input through to output unchanged, useful for testing and stream composition.

/**
 * Creates a passthrough stream
 * @param options - Configuration options
 */
class PassThrough extends Transform {
  constructor(options?: PassThroughOptions);
  // Automatically passes all input to output unchanged
}

Usage Examples:

const { PassThrough } = require('readable-stream');

// Create a passthrough for monitoring
const monitor = new PassThrough();

monitor.on('data', (chunk) => {
  console.log('Data passing through:', chunk.toString());
});

// Data flows through unchanged
monitor.write('test data');
monitor.end();

Stream Events

All streams emit various events during their lifecycle:

Readable Events

  • 'data' - Emitted when data is available to read
  • 'end' - Emitted when no more data will be provided
  • 'readable' - Emitted when data is available to be read
  • 'close' - Emitted when the stream is closed
  • 'error' - Emitted when an error occurs

Writable Events

  • 'drain' - Emitted when it's safe to write again after buffer was full
  • 'finish' - Emitted after end() is called and all data has been processed
  • 'pipe' - Emitted when a readable is piped to this writable
  • 'unpipe' - Emitted when a readable is unpiped from this writable
  • 'close' - Emitted when the stream is closed
  • 'error' - Emitted when an error occurs

Types

interface ReadableOptions {
  highWaterMark?: number;
  encoding?: BufferEncoding;
  objectMode?: boolean;
  read?(this: Readable, size: number): void;
  destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void;
  autoDestroy?: boolean;
  signal?: AbortSignal;
}

interface WritableOptions {
  highWaterMark?: number;
  decodeStrings?: boolean;
  defaultEncoding?: BufferEncoding;
  objectMode?: boolean;
  emitClose?: boolean;
  write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
  writev?(this: Writable, chunks: Array<{ chunk: any, encoding: BufferEncoding }>, callback: (error?: Error | null) => void): void;
  destroy?(this: Writable, error: Error | null, callback: (error: Error | null) => void): void;
  final?(this: Writable, callback: (error?: Error | null) => void): void;
  autoDestroy?: boolean;
  signal?: AbortSignal;
}

interface DuplexOptions extends ReadableOptions, WritableOptions {
  allowHalfOpen?: boolean;
  readableObjectMode?: boolean;
  writableObjectMode?: boolean;
  readableHighWaterMark?: number;
  writableHighWaterMark?: number;
}

interface TransformOptions extends DuplexOptions {
  transform?(this: Transform, chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
  flush?(this: Transform, callback: TransformCallback): void;
}

interface PassThroughOptions extends TransformOptions {}

type TransformCallback = (error?: Error | null, data?: any) => void;

Install with Tessl CLI

npx tessl i tessl/npm-readable-stream

docs

index.md

promise-api.md

stream-classes.md

stream-operators.md

utility-functions.md

tile.json