or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

duplex-transform.mdindex.mdpipeline.mdreadable.mdwritable.md
tile.json

tessl/npm-streamx

An iteration of the Node.js core streams with a series of improvements

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/streamx@2.22.x

To install, run

npx @tessl/cli install tessl/npm-streamx@2.22.0

index.mddocs/

StreamX

StreamX is an improved iteration of Node.js core streams with enhanced lifecycle support, proper error handling, simplified API design, and a significantly smaller browser footprint. It provides better stream lifecycle management, integrated error handling with automatic cleanup, pipe operations with built-in error handling and callbacks, unified binary and object mode streams, backwards compatibility with Node.js streams, and AbortSignal support for async/await integration.

Package Information

  • Package Name: streamx
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install streamx

Core Imports

const { 
  Readable, 
  Writable, 
  Duplex, 
  Transform, 
  PassThrough, 
  Stream,
  pipeline,
  pipelinePromise
} = require('streamx');

For utility functions:

const { 
  isStream, 
  isStreamx, 
  isEnded, 
  isFinished, 
  isDisturbed, 
  getStreamError 
} = require('streamx');

Basic Usage

const { Readable, Writable, pipeline } = require('streamx');

// Create a readable stream
const readable = new Readable({
  read(cb) {
    this.push('Hello StreamX!');
    this.push(null); // End the stream
    cb();
  }
});

// Create a writable stream
const writable = new Writable({
  write(data, cb) {
    console.log('Received:', data.toString());
    cb();
  }
});

// Pipe streams together
readable.pipe(writable, (err) => {
  if (err) console.error('Pipeline failed:', err);
  else console.log('Pipeline completed successfully');
});

Architecture

StreamX is built around several key improvements over Node.js core streams:

  • Enhanced Lifecycle: Proper _open and _destroy hooks for resource management
  • Integrated Error Handling: Unified error handling with automatic cleanup and destroy logic
  • Unified Stream Mode: Single stream type handles both binary and object modes through map and byteLength functions
  • Pipeline Integration: Built-in pipeline functions with error handling and callback support
  • AbortSignal Support: Integration with AbortController for cancellation
  • Smaller Footprint: 8x smaller gzipped size compared to Node.js core streams

Capabilities

Readable Streams

Core readable stream functionality with enhanced lifecycle management and backpressure handling. Supports both flowing and non-flowing modes with proper pause/resume behavior.

class Readable extends Stream {
  constructor(options?: ReadableOptions);
  _read(cb: () => void): void;
  _open(cb: (err?: Error) => void): void;
  _destroy(cb: (err?: Error) => void): void;
  push(data: any): boolean;
  read(): any;
  pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
}

interface ReadableOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  signal?: AbortSignal;
  eagerOpen?: boolean;
  read?: (cb: () => void) => void;
  open?: (cb: (err?: Error) => void) => void;
  destroy?: (cb: (err?: Error) => void) => void;
  predestroy?: () => void;
}

Readable Streams

Writable Streams

Core writable stream functionality with enhanced drain handling and batch writing support. Includes proper finish/close lifecycle management.

class Writable extends Stream {
  constructor(options?: WritableOptions);
  _write(data: any, cb: (err?: Error) => void): void;
  _writev(batch: any[], cb: (err?: Error) => void): void;
  _open(cb: (err?: Error) => void): void;
  _destroy(cb: (err?: Error) => void): void;
  write(data: any): boolean;
  end(): void;
}

interface WritableOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  signal?: AbortSignal;
  write?: (data: any, cb: (err?: Error) => void) => void;
  writev?: (batch: any[], cb: (err?: Error) => void) => void;
  final?: (cb: (err?: Error) => void) => void;
  open?: (cb: (err?: Error) => void) => void;
  destroy?: (cb: (err?: Error) => void) => void;
  predestroy?: () => void;
}

Writable Streams

Duplex and Transform Streams

Duplex streams that are both readable and writable, and Transform streams that provide data transformation capabilities.

class Duplex extends Readable {
  constructor(options?: DuplexOptions);
  // Includes all Readable methods plus Writable methods
  write(data: any): boolean;
  end(): void;
}

class Transform extends Duplex {
  constructor(options?: TransformOptions);
  _transform(data: any, cb: (err?: Error, data?: any) => void): void;
}

class PassThrough extends Transform {
  constructor(options?: TransformOptions);
  // Pass-through implementation (no transformation)
}

Duplex and Transform Streams

Pipeline Functions

Pipeline utilities for connecting multiple streams with proper error handling and cleanup.

function pipeline(...streams: Stream[], callback?: (err?: Error) => void): Stream;
function pipelinePromise(...streams: Stream[]): Promise<void>;

Pipeline Functions

Utility Functions

Helper functions for stream inspection and compatibility checking.

function isStream(obj: any): boolean;
function isStreamx(obj: any): boolean;
function isEnded(stream: Readable): boolean;
function isFinished(stream: Writable): boolean;
function isDisturbed(stream: Readable): boolean;
function getStreamError(stream: Stream, opts?: object): Error | null;

Types

class Stream extends EventEmitter {
  destroyed: boolean;
  destroy(err?: Error): void;
}

interface CommonOptions {
  highWaterMark?: number;      // Buffer size in bytes (default: 16384)
  map?: (data: any) => any;    // Transform input data
  byteLength?: (data: any) => number;  // Calculate data size
  signal?: AbortSignal;        // AbortSignal for destruction
}