or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

duplex.mdindex.mdreadable.mdstream.mdtransform.mdwritable.md
tile.json

tessl/npm-mafintosh--streamx

An iteration of the Node.js core streams with a series of improvements

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@mafintosh/streamx@1.0.x

To install, run

npx @tessl/cli install tessl/npm-mafintosh--streamx@1.0.0

index.mddocs/

streamx

streamx is an iteration of the Node.js core streams with a series of improvements including proper lifecycle support, easy error handling, pipe error handling, simplified API without object mode, and backwards compatibility with Node.js streams.

Package Information

  • Package Name: @mafintosh/streamx
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install @mafintosh/streamx

Core Imports

const { Readable, Writable, Duplex, Transform, Stream, isStream, isStreamx } = require("@mafintosh/streamx");

For ES modules:

import { Readable, Writable, Duplex, Transform, Stream, isStream, isStreamx } from "@mafintosh/streamx";

Basic Usage

const { Readable, Writable } = require("@mafintosh/streamx");

// Create a readable stream
const readable = new Readable({
  read(cb) {
    this.push("Hello World");
    this.push(null); // End stream
    cb(null);
  }
});

// Create a writable stream
const writable = new Writable({
  write(data, cb) {
    console.log("Received:", data.toString());
    cb(null);
  }
});

// Pipe streams together
readable.pipe(writable, (err) => {
  if (err) console.error("Pipeline failed:", err);
  else console.log("Pipeline completed");
});

Architecture

streamx is built around several key improvements over Node.js core streams:

  • Lifecycle Management: _open() and _destroy() methods provide proper initialization and cleanup
  • Integrated Error Handling: .destroy() method with automatic cleanup and error propagation
  • Enhanced Pipe Operations: pipe() accepts callbacks and handles errors automatically
  • Simplified Data Handling: Uses map() and byteLength() functions instead of object mode
  • Single File Implementation: Complete rewrite contained in one file for simplicity
  • Backwards Compatibility: Compatible with Node.js streams where reasonable

Capabilities

Stream Base Class

Base class providing core stream functionality with lifecycle management and error handling.

class Stream extends EventEmitter {
  constructor(opts?: StreamOptions);
  destroy(error?: Error): void;
  get destroyed(): boolean;
  get destroying(): boolean;
  on(name: string, fn: Function): this;
  _open(cb: (error?: Error) => void): void;
  _destroy(cb: (error?: Error) => void): void;
}

interface StreamOptions {
  open?: (cb: (error?: Error) => void) => void;
  destroy?: (cb: (error?: Error) => void) => void;
}

Stream Base Class

Readable Streams

Readable stream implementation with buffer management and data flow control.

class Readable extends Stream {
  constructor(opts?: ReadableOptions);
  push(data: any): boolean;
  read(): any;
  unshift(data: any): void;
  pause(): void;
  resume(): void;
  pipe(destination: Writable, cb?: (error?: Error) => void): Writable;
  _read(cb: (error?: Error) => void): void;
}

interface ReadableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  read?: (cb: (error?: Error) => void) => void;
}

Readable Streams

Writable Streams

Writable stream implementation with buffering and backpressure management.

class Writable extends Stream {
  constructor(opts?: WritableOptions);
  write(data: any): boolean;
  end(data?: any): void;
  _write(data: any, cb: (error?: Error) => void): void;
  _final(cb: (error?: Error) => void): void;
}

interface WritableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

Writable Streams

Duplex Streams

Duplex streams that are both readable and writable, inheriting from Readable with Writable functionality.

class Duplex extends Readable {
  constructor(opts?: DuplexOptions);
  write(data: any): boolean;
  end(data?: any): void;
  _write(data: any, cb: (error?: Error) => void): void;
  _final(cb: (error?: Error) => void): void;
}

interface DuplexOptions extends ReadableOptions {
  mapReadable?: (data: any) => any;
  byteLengthReadable?: (data: any) => number;
  mapWritable?: (data: any) => any;
  byteLengthWritable?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

Duplex Streams

Transform Streams

Transform streams for data transformation, extending Duplex with transformation capabilities.

class Transform extends Duplex {
  constructor(opts?: TransformOptions);
  _transform(data: any, cb: (error?: Error, result?: any) => void): void;
}

interface TransformOptions extends DuplexOptions {
  transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
  flush?: (cb: (error?: Error, result?: any) => void) => void;
}

Transform Streams

Utility Functions

Stream detection utilities for identifying stream types.

/**
 * Check if an object is a Node.js stream
 * @param stream - Object to test
 * @returns True if object is a stream
 */
function isStream(stream: any): boolean;

/**
 * Check if an object is a streamx stream
 * @param stream - Object to test  
 * @returns True if object is a streamx stream
 */
function isStreamx(stream: any): boolean;

Types

interface StreamOptions {
  open?: (cb: (error?: Error) => void) => void;
  destroy?: (cb: (error?: Error) => void) => void;
}

interface ReadableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  read?: (cb: (error?: Error) => void) => void;
}

interface WritableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

interface DuplexOptions extends ReadableOptions {
  mapReadable?: (data: any) => any;
  byteLengthReadable?: (data: any) => number;
  mapWritable?: (data: any) => any;
  byteLengthWritable?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

interface TransformOptions extends DuplexOptions {
  transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
  flush?: (cb: (error?: Error, result?: any) => void) => void;
}