CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

StreamX

StreamX is an improved iteration of Node.js core streams with enhanced lifecycle support, proper error handling, simplified API design, and a significantly smaller browser footprint. It provides better stream lifecycle management, integrated error handling with automatic cleanup, pipe operations with built-in error handling and callbacks, unified binary and object mode streams, backwards compatibility with Node.js streams, and AbortSignal support for async/await integration.

Package Information

  • Package Name: streamx
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install streamx

Core Imports

const { 
  Readable, 
  Writable, 
  Duplex, 
  Transform, 
  PassThrough, 
  Stream,
  pipeline,
  pipelinePromise
} = require('streamx');

For utility functions:

const { 
  isStream, 
  isStreamx, 
  isEnded, 
  isFinished, 
  isDisturbed, 
  getStreamError 
} = require('streamx');

Basic Usage

const { Readable, Writable, pipeline } = require('streamx');

// Create a readable stream
const readable = new Readable({
  read(cb) {
    this.push('Hello StreamX!');
    this.push(null); // End the stream
    cb();
  }
});

// Create a writable stream
const writable = new Writable({
  write(data, cb) {
    console.log('Received:', data.toString());
    cb();
  }
});

// Pipe streams together
readable.pipe(writable, (err) => {
  if (err) console.error('Pipeline failed:', err);
  else console.log('Pipeline completed successfully');
});

Architecture

StreamX is built around several key improvements over Node.js core streams:

  • Enhanced Lifecycle: Proper _open and _destroy hooks for resource management
  • Integrated Error Handling: Unified error handling with automatic cleanup and destroy logic
  • Unified Stream Mode: Single stream type handles both binary and object modes through map and byteLength functions
  • Pipeline Integration: Built-in pipeline functions with error handling and callback support
  • AbortSignal Support: Integration with AbortController for cancellation
  • Smaller Footprint: 8x smaller gzipped size compared to Node.js core streams

Capabilities

Readable Streams

Core readable stream functionality with enhanced lifecycle management and backpressure handling. Supports both flowing and non-flowing modes with proper pause/resume behavior.

class Readable extends Stream {
  constructor(options?: ReadableOptions);
  _read(cb: () => void): void;
  _open(cb: (err?: Error) => void): void;
  _destroy(cb: (err?: Error) => void): void;
  push(data: any): boolean;
  read(): any;
  pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
}

interface ReadableOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  signal?: AbortSignal;
  eagerOpen?: boolean;
  read?: (cb: () => void) => void;
  open?: (cb: (err?: Error) => void) => void;
  destroy?: (cb: (err?: Error) => void) => void;
  predestroy?: () => void;
}

Readable Streams

Writable Streams

Core writable stream functionality with enhanced drain handling and batch writing support. Includes proper finish/close lifecycle management.

class Writable extends Stream {
  constructor(options?: WritableOptions);
  _write(data: any, cb: (err?: Error) => void): void;
  _writev(batch: any[], cb: (err?: Error) => void): void;
  _open(cb: (err?: Error) => void): void;
  _destroy(cb: (err?: Error) => void): void;
  write(data: any): boolean;
  end(): void;
}

interface WritableOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  signal?: AbortSignal;
  write?: (data: any, cb: (err?: Error) => void) => void;
  writev?: (batch: any[], cb: (err?: Error) => void) => void;
  final?: (cb: (err?: Error) => void) => void;
  open?: (cb: (err?: Error) => void) => void;
  destroy?: (cb: (err?: Error) => void) => void;
  predestroy?: () => void;
}

Writable Streams

Duplex and Transform Streams

Duplex streams that are both readable and writable, and Transform streams that provide data transformation capabilities.

class Duplex extends Readable {
  constructor(options?: DuplexOptions);
  // Includes all Readable methods plus Writable methods
  write(data: any): boolean;
  end(): void;
}

class Transform extends Duplex {
  constructor(options?: TransformOptions);
  _transform(data: any, cb: (err?: Error, data?: any) => void): void;
}

class PassThrough extends Transform {
  constructor(options?: TransformOptions);
  // Pass-through implementation (no transformation)
}

Duplex and Transform Streams

Pipeline Functions

Pipeline utilities for connecting multiple streams with proper error handling and cleanup.

function pipeline(...streams: Stream[], callback?: (err?: Error) => void): Stream;
function pipelinePromise(...streams: Stream[]): Promise<void>;

Pipeline Functions

Utility Functions

Helper functions for stream inspection and compatibility checking.

function isStream(obj: any): boolean;
function isStreamx(obj: any): boolean;
function isEnded(stream: Readable): boolean;
function isFinished(stream: Writable): boolean;
function isDisturbed(stream: Readable): boolean;
function getStreamError(stream: Stream, opts?: object): Error | null;

Types

class Stream extends EventEmitter {
  destroyed: boolean;
  destroy(err?: Error): void;
}

interface CommonOptions {
  highWaterMark?: number;      // Buffer size in bytes (default: 16384)
  map?: (data: any) => any;    // Transform input data
  byteLength?: (data: any) => number;  // Calculate data size
  signal?: AbortSignal;        // AbortSignal for destruction
}

docs

duplex-transform.md

index.md

pipeline.md

readable.md

writable.md

tile.json