CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-mafintosh--streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

readable.mddocs/

Readable Streams

Readable stream implementation providing data flow control, buffer management, and pipe operations with enhanced error handling.

Capabilities

Readable Constructor

Creates a new Readable stream with configuration options.

/**
 * Creates a new Readable stream
 * @param opts - Configuration options for the readable stream
 */
class Readable extends Stream {
  constructor(opts?: ReadableOptions);
}

interface ReadableOptions extends StreamOptions {
  /** Maximum buffer size in bytes (default: 16384) */
  highWaterMark?: number;
  /** Function to map input data */
  map?: (data: any) => any;
  /** Function to calculate byte size of data */
  byteLength?: (data: any) => number;
  /** Read function shorthand */
  read?: (cb: (error?: Error) => void) => void;
}

Usage Example:

const { Readable } = require("@mafintosh/streamx");

const readable = new Readable({
  highWaterMark: 8192,
  read(cb) {
    this.push("Hello World");
    cb(null);
  }
});

Data Production

_read Method

Called when the stream wants new data. Override to implement data reading logic.

/**
 * Called when stream wants new data
 * @param cb - Callback to call when read operation is complete
 */
_read(cb: (error?: Error) => void): void;

Usage Example:

class CustomReadable extends Readable {
  constructor() {
    super();
    this.counter = 0;
  }

  _read(cb) {
    if (this.counter < 5) {
      this.push(`data-${this.counter++}`);
    } else {
      this.push(null); // End stream
    }
    cb(null);
  }
}

push Method

Pushes data to the stream buffer.

/**
 * Push data to stream buffer
 * @param data - Data to push, or null to end stream
 * @returns True if buffer is not full and more data can be pushed
 */
push(data: any): boolean;

Usage Example:

const readable = new Readable({
  read(cb) {
    const shouldContinue = this.push("some data");
    if (shouldContinue) {
      this.push("more data");
    }
    this.push(null); // End stream
    cb(null);
  }
});

Data Consumption

read Method

Reads data from the stream buffer.

/**
 * Read data from stream buffer
 * @returns Data from buffer, or null if buffer is empty or stream ended
 */
read(): any;

Usage Example:

const readable = new Readable({
  read(cb) {
    this.push("Hello");
    this.push("World");
    this.push(null);
    cb(null);
  }
});

readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log('Read:', chunk);
  }
});

unshift Method

Adds data to the front of the buffer (useful for putting back over-read data).

/**
 * Add data to front of buffer
 * @param data - Data to add to front of buffer
 */
unshift(data: any): void;

Usage Example:

const readable = new Readable({
  read(cb) {
    this.push("Hello World");
    this.push(null);
    cb(null);
  }
});

readable.on('readable', () => {
  const data = readable.read();
  if (data === "Hello World") {
    // Put it back for later processing
    readable.unshift(data);
  }
});

Flow Control

pause Method

Pauses the stream (only needed if stream is resumed).

/**
 * Pause the stream
 */
pause(): void;

resume Method

Resumes/starts consuming the stream as fast as possible.

/**
 * Resume consuming the stream
 */
resume(): void;

Usage Example:

const readable = new Readable({
  read(cb) {
    this.push(`data-${Date.now()}`);
    cb(null);
  }
});

// Start consuming
readable.resume();

// Pause after 1 second
setTimeout(() => {
  readable.pause();
}, 1000);

// Resume after another second
setTimeout(() => {
  readable.resume();
}, 2000);

Pipe Operations

pipe Method

Efficiently pipes the readable stream to a writable stream with error handling.

/**
 * Pipe readable stream to writable stream
 * @param destination - Writable stream to pipe to
 * @param callback - Optional callback called when pipeline completes
 * @returns The destination stream
 */
pipe(destination: Writable, callback?: (error?: Error) => void): Writable;

Usage Example:

const { Readable, Writable } = require("@mafintosh/streamx");

const readable = new Readable({
  read(cb) {
    this.push("Hello World");
    this.push(null);
    cb(null);
  }
});

const writable = new Writable({
  write(data, cb) {
    console.log("Received:", data.toString());
    cb(null);
  }
});

readable.pipe(writable, (err) => {
  if (err) console.error("Pipeline failed:", err);
  else console.log("Pipeline completed successfully");
});

Events

readable Event

Emitted when data is available in the buffer and buffer was previously empty.

readable.on('readable', () => {
  // Data is available to read
});

data Event

Emitted when data is being read from the stream. Attaching this event automatically resumes the stream.

readable.on('data', (chunk) => {
  console.log('Received chunk:', chunk);
});

end Event

Emitted when the stream has ended and no more data is available.

readable.on('end', () => {
  console.log('Stream ended');
});

close Event

Emitted when the stream has fully closed.

readable.on('close', () => {
  console.log('Stream closed');
});

error Event

Emitted when an error occurs.

readable.on('error', (err) => {
  console.error('Stream error:', err);
});

Complete Usage Example:

const { Readable } = require("@mafintosh/streamx");

const readable = new Readable({
  read(cb) {
    // Simulate reading data
    setTimeout(() => {
      if (Math.random() > 0.8) {
        this.push(null); // End stream
      } else {
        this.push(`data-${Date.now()}`);
      }
      cb(null);
    }, 100);
  }
});

readable.on('data', (chunk) => {
  console.log('Data:', chunk);
});

readable.on('end', () => {
  console.log('Stream ended');
});

readable.on('close', () => {
  console.log('Stream closed');
});

readable.on('error', (err) => {
  console.error('Error:', err);
});

Install with Tessl CLI

npx tessl i tessl/npm-mafintosh--streamx

docs

duplex.md

index.md

readable.md

stream.md

transform.md

writable.md

tile.json