CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-mafintosh--streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

streamx

streamx is an iteration of the Node.js core streams with a series of improvements including proper lifecycle support, easy error handling, pipe error handling, simplified API without object mode, and backwards compatibility with Node.js streams.

Package Information

  • Package Name: @mafintosh/streamx
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install @mafintosh/streamx

Core Imports

const { Readable, Writable, Duplex, Transform, Stream, isStream, isStreamx } = require("@mafintosh/streamx");

For ES modules:

import { Readable, Writable, Duplex, Transform, Stream, isStream, isStreamx } from "@mafintosh/streamx";

Basic Usage

const { Readable, Writable } = require("@mafintosh/streamx");

// Create a readable stream
const readable = new Readable({
  read(cb) {
    this.push("Hello World");
    this.push(null); // End stream
    cb(null);
  }
});

// Create a writable stream
const writable = new Writable({
  write(data, cb) {
    console.log("Received:", data.toString());
    cb(null);
  }
});

// Pipe streams together
readable.pipe(writable, (err) => {
  if (err) console.error("Pipeline failed:", err);
  else console.log("Pipeline completed");
});

Architecture

streamx is built around several key improvements over Node.js core streams:

  • Lifecycle Management: _open() and _destroy() methods provide proper initialization and cleanup
  • Integrated Error Handling: .destroy() method with automatic cleanup and error propagation
  • Enhanced Pipe Operations: pipe() accepts callbacks and handles errors automatically
  • Simplified Data Handling: Uses map() and byteLength() functions instead of object mode
  • Single File Implementation: Complete rewrite contained in one file for simplicity
  • Backwards Compatibility: Compatible with Node.js streams where reasonable

Capabilities

Stream Base Class

Base class providing core stream functionality with lifecycle management and error handling.

class Stream extends EventEmitter {
  constructor(opts?: StreamOptions);
  destroy(error?: Error): void;
  get destroyed(): boolean;
  get destroying(): boolean;
  on(name: string, fn: Function): this;
  _open(cb: (error?: Error) => void): void;
  _destroy(cb: (error?: Error) => void): void;
}

interface StreamOptions {
  open?: (cb: (error?: Error) => void) => void;
  destroy?: (cb: (error?: Error) => void) => void;
}

Stream Base Class

Readable Streams

Readable stream implementation with buffer management and data flow control.

class Readable extends Stream {
  constructor(opts?: ReadableOptions);
  push(data: any): boolean;
  read(): any;
  unshift(data: any): void;
  pause(): void;
  resume(): void;
  pipe(destination: Writable, cb?: (error?: Error) => void): Writable;
  _read(cb: (error?: Error) => void): void;
}

interface ReadableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  read?: (cb: (error?: Error) => void) => void;
}

Readable Streams

Writable Streams

Writable stream implementation with buffering and backpressure management.

class Writable extends Stream {
  constructor(opts?: WritableOptions);
  write(data: any): boolean;
  end(data?: any): void;
  _write(data: any, cb: (error?: Error) => void): void;
  _final(cb: (error?: Error) => void): void;
}

interface WritableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

Writable Streams

Duplex Streams

Duplex streams that are both readable and writable, inheriting from Readable with Writable functionality.

class Duplex extends Readable {
  constructor(opts?: DuplexOptions);
  write(data: any): boolean;
  end(data?: any): void;
  _write(data: any, cb: (error?: Error) => void): void;
  _final(cb: (error?: Error) => void): void;
}

interface DuplexOptions extends ReadableOptions {
  mapReadable?: (data: any) => any;
  byteLengthReadable?: (data: any) => number;
  mapWritable?: (data: any) => any;
  byteLengthWritable?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

Duplex Streams

Transform Streams

Transform streams for data transformation, extending Duplex with transformation capabilities.

class Transform extends Duplex {
  constructor(opts?: TransformOptions);
  _transform(data: any, cb: (error?: Error, result?: any) => void): void;
}

interface TransformOptions extends DuplexOptions {
  transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
  flush?: (cb: (error?: Error, result?: any) => void) => void;
}

Transform Streams

Utility Functions

Stream detection utilities for identifying stream types.

/**
 * Check if an object is a Node.js stream
 * @param stream - Object to test
 * @returns True if object is a stream
 */
function isStream(stream: any): boolean;

/**
 * Check if an object is a streamx stream
 * @param stream - Object to test  
 * @returns True if object is a streamx stream
 */
function isStreamx(stream: any): boolean;

Types

interface StreamOptions {
  open?: (cb: (error?: Error) => void) => void;
  destroy?: (cb: (error?: Error) => void) => void;
}

interface ReadableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  read?: (cb: (error?: Error) => void) => void;
}

interface WritableOptions extends StreamOptions {
  highWaterMark?: number;
  map?: (data: any) => any;
  byteLength?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

interface DuplexOptions extends ReadableOptions {
  mapReadable?: (data: any) => any;
  byteLengthReadable?: (data: any) => number;
  mapWritable?: (data: any) => any;
  byteLengthWritable?: (data: any) => number;
  write?: (data: any, cb: (error?: Error) => void) => void;
  final?: (cb: (error?: Error) => void) => void;
}

interface TransformOptions extends DuplexOptions {
  transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
  flush?: (cb: (error?: Error, result?: any) => void) => void;
}
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@mafintosh/streamx@1.0.x
Publish Source
CLI
Badge
tessl/npm-mafintosh--streamx badge