An iteration of the Node.js core streams with a series of improvements
npx @tessl/cli install tessl/npm-streamx@2.22.0StreamX is an improved iteration of Node.js core streams with enhanced lifecycle support, proper error handling, simplified API design, and a significantly smaller browser footprint. It provides better stream lifecycle management, integrated error handling with automatic cleanup, pipe operations with built-in error handling and callbacks, unified binary and object mode streams, backwards compatibility with Node.js streams, and AbortSignal support for async/await integration.
npm install streamxconst {
Readable,
Writable,
Duplex,
Transform,
PassThrough,
Stream,
pipeline,
pipelinePromise
} = require('streamx');For utility functions:
const {
isStream,
isStreamx,
isEnded,
isFinished,
isDisturbed,
getStreamError
} = require('streamx');const { Readable, Writable, pipeline } = require('streamx');
// Create a readable stream
const readable = new Readable({
read(cb) {
this.push('Hello StreamX!');
this.push(null); // End the stream
cb();
}
});
// Create a writable stream
const writable = new Writable({
write(data, cb) {
console.log('Received:', data.toString());
cb();
}
});
// Pipe streams together
readable.pipe(writable, (err) => {
if (err) console.error('Pipeline failed:', err);
else console.log('Pipeline completed successfully');
});StreamX is built around several key improvements over Node.js core streams:
_open and _destroy hooks for resource managementmap and byteLength functionsCore readable stream functionality with enhanced lifecycle management and backpressure handling. Supports both flowing and non-flowing modes with proper pause/resume behavior.
class Readable extends Stream {
constructor(options?: ReadableOptions);
_read(cb: () => void): void;
_open(cb: (err?: Error) => void): void;
_destroy(cb: (err?: Error) => void): void;
push(data: any): boolean;
read(): any;
pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
}
interface ReadableOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
signal?: AbortSignal;
eagerOpen?: boolean;
read?: (cb: () => void) => void;
open?: (cb: (err?: Error) => void) => void;
destroy?: (cb: (err?: Error) => void) => void;
predestroy?: () => void;
}Core writable stream functionality with enhanced drain handling and batch writing support. Includes proper finish/close lifecycle management.
class Writable extends Stream {
constructor(options?: WritableOptions);
_write(data: any, cb: (err?: Error) => void): void;
_writev(batch: any[], cb: (err?: Error) => void): void;
_open(cb: (err?: Error) => void): void;
_destroy(cb: (err?: Error) => void): void;
write(data: any): boolean;
end(): void;
}
interface WritableOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
signal?: AbortSignal;
write?: (data: any, cb: (err?: Error) => void) => void;
writev?: (batch: any[], cb: (err?: Error) => void) => void;
final?: (cb: (err?: Error) => void) => void;
open?: (cb: (err?: Error) => void) => void;
destroy?: (cb: (err?: Error) => void) => void;
predestroy?: () => void;
}Duplex streams that are both readable and writable, and Transform streams that provide data transformation capabilities.
class Duplex extends Readable {
constructor(options?: DuplexOptions);
// Includes all Readable methods plus Writable methods
write(data: any): boolean;
end(): void;
}
class Transform extends Duplex {
constructor(options?: TransformOptions);
_transform(data: any, cb: (err?: Error, data?: any) => void): void;
}
class PassThrough extends Transform {
constructor(options?: TransformOptions);
// Pass-through implementation (no transformation)
}Pipeline utilities for connecting multiple streams with proper error handling and cleanup.
function pipeline(...streams: Stream[], callback?: (err?: Error) => void): Stream;
function pipelinePromise(...streams: Stream[]): Promise<void>;Helper functions for stream inspection and compatibility checking.
function isStream(obj: any): boolean;
function isStreamx(obj: any): boolean;
function isEnded(stream: Readable): boolean;
function isFinished(stream: Writable): boolean;
function isDisturbed(stream: Readable): boolean;
function getStreamError(stream: Stream, opts?: object): Error | null;class Stream extends EventEmitter {
destroyed: boolean;
destroy(err?: Error): void;
}
interface CommonOptions {
highWaterMark?: number; // Buffer size in bytes (default: 16384)
map?: (data: any) => any; // Transform input data
byteLength?: (data: any) => number; // Calculate data size
signal?: AbortSignal; // AbortSignal for destruction
}