CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

readable.mddocs/

Readable Streams

Readable streams in StreamX provide enhanced lifecycle management, proper backpressure handling, and improved error handling compared to Node.js core streams. They support both flowing and non-flowing modes with consistent behavior.

Capabilities

Readable Class

Creates a readable stream with enhanced lifecycle support and proper resource management.

/**
 * Creates a new readable stream with enhanced lifecycle support
 * @param options - Configuration options for the readable stream
 */
class Readable extends Stream {
  constructor(options?: ReadableOptions);
  
  /** Override this method to implement custom read logic */
  _read(cb: () => void): void;
  
  /** Lifecycle hook called before the first read operation */
  _open(cb: (err?: Error) => void): void;
  
  /** Cleanup hook called when the stream is destroyed */
  _destroy(cb: (err?: Error) => void): void;
  
  /** Hook called immediately when destroy() is first invoked */
  _predestroy(): void;
  
  /** Push data to the stream buffer */
  push(data: any): boolean;
  
  /** Read data from the stream buffer */
  read(): any;
  
  /** Add data to the front of the buffer */
  unshift(data: any): void;
  
  /** Pause the stream */
  pause(): Readable;
  
  /** Resume the stream */
  resume(): Readable;
  
  /** Pipe to a writable stream with callback support */
  pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
  
  /** Set text encoding for automatic string decoding */
  setEncoding(encoding: string): Readable;
  
  /** Forcefully destroy the stream */
  destroy(err?: Error): void;
}

interface ReadableOptions {
  /** Maximum buffer size in bytes (default: 16384) */
  highWaterMark?: number;
  
  /** Optional function to map input data */
  map?: (data: any) => any;
  
  /** Optional function to calculate byte size of data */
  byteLength?: (data: any) => number;
  
  /** AbortSignal that triggers destroy when aborted */
  signal?: AbortSignal;
  
  /** Eagerly open the stream (default: false) */
  eagerOpen?: boolean;
  
  /** Shorthand for _read method */
  read?: (cb: () => void) => void;
  
  /** Shorthand for _open method */
  open?: (cb: (err?: Error) => void) => void;
  
  /** Shorthand for _destroy method */
  destroy?: (cb: (err?: Error) => void) => void;
  
  /** Shorthand for _predestroy method */
  predestroy?: () => void;
  
  /** Text encoding for automatic string decoding */
  encoding?: string;
}

Usage Examples:

const { Readable } = require('streamx');

// Basic readable stream
const readable = new Readable({
  read(cb) {
    // Push some data
    this.push('Hello, ');
    this.push('World!');
    this.push(null); // End the stream
    cb();
  }
});

// Listen for data
readable.on('data', (chunk) => {
  console.log('Received:', chunk.toString());
});

readable.on('end', () => {
  console.log('Stream ended');
});

// Readable with lifecycle hooks
const fileReader = new Readable({
  open(cb) {
    console.log('Opening file...');
    // Open file or resource
    cb();
  },
  
  read(cb) {
    // Read from file
    this.push('File content...');
    this.push(null);
    cb();
  },
  
  destroy(cb) {
    console.log('Closing file...');
    // Clean up resources
    cb();
  }
});

Static Methods

StreamX provides static utility methods for readable stream inspection and manipulation.

/**
 * Check if a readable stream is currently paused
 * @param stream - The readable stream to check
 * @returns True if the stream is paused
 */
static isPaused(stream: Readable): boolean;

/**
 * Check if a readable stream is under backpressure
 * @param stream - The readable stream to check
 * @returns True if the stream is backpressured
 */
static isBackpressured(stream: Readable): boolean;

/**
 * Create a readable stream from various data sources
 * @param source - Array, buffer, string, or async iterator
 * @returns New readable stream
 */
static from(source: any[] | Buffer | string | AsyncIterable<any>): Readable;

Static Method Examples:

const { Readable } = require('streamx');

// Create from array
const arrayStream = Readable.from([1, 2, 3, 4, 5]);

// Create from string
const stringStream = Readable.from('Hello World');

// Create from async iterator
async function* generator() {
  yield 'first';
  yield 'second';
  yield 'third';
}
const iteratorStream = Readable.from(generator());

// Check stream state
if (Readable.isPaused(arrayStream)) {
  console.log('Stream is paused');
}

if (Readable.isBackpressured(arrayStream)) {
  console.log('Stream is under backpressure');
}

Events

Readable streams emit various events during their lifecycle.

interface ReadableEvents {
  /** Emitted when data is available to read */
  'readable': () => void;
  
  /** Emitted when data is being read from the stream */
  'data': (chunk: any) => void;
  
  /** Emitted when the stream has ended and no more data is available */
  'end': () => void;
  
  /** Emitted when the stream has been fully closed */
  'close': () => void;
  
  /** Emitted when an error occurs */
  'error': (err: Error) => void;
  
  /** Emitted when the stream is piped to a destination */
  'piping': (dest: Writable) => void;
}

Properties

interface ReadableProperties {
  /** Boolean indicating whether the stream has been destroyed */
  destroyed: boolean;
}

Advanced Configuration

StreamX readable streams support advanced configuration for specialized use cases.

Map Function: Transform data as it's pushed to the stream:

const readable = new Readable({
  map: (data) => {
    // Transform strings to uppercase
    return typeof data === 'string' ? data.toUpperCase() : data;
  },
  
  read(cb) {
    this.push('hello');
    this.push('world');
    this.push(null);
    cb();
  }
});

ByteLength Function: Custom byte length calculation for backpressure:

const readable = new Readable({
  byteLength: (data) => {
    // Custom size calculation
    if (typeof data === 'string') return data.length;
    if (Buffer.isBuffer(data)) return data.length;
    return 1024; // Default size for objects
  },
  
  highWaterMark: 8192, // 8KB buffer
  
  read(cb) {
    this.push({ large: 'object' });
    cb();
  }
});

AbortSignal Integration:

const controller = new AbortController();

const readable = new Readable({
  signal: controller.signal,
  
  read(cb) {
    // This will be cancelled when signal is aborted
    setTimeout(() => {
      if (!controller.signal.aborted) {
        this.push('data');
        cb();
      }
    }, 1000);
  }
});

// Cancel the stream after 500ms
setTimeout(() => controller.abort(), 500);

Install with Tessl CLI

npx tessl i tessl/npm-streamx

docs

duplex-transform.md

index.md

pipeline.md

readable.md

writable.md

tile.json