An iteration of the Node.js core streams with a series of improvements
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
StreamX is an improved iteration of Node.js core streams with enhanced lifecycle support, proper error handling, simplified API design, and a significantly smaller browser footprint. It provides better stream lifecycle management, integrated error handling with automatic cleanup, pipe operations with built-in error handling and callbacks, unified binary and object mode streams, backwards compatibility with Node.js streams, and AbortSignal support for async/await integration.
npm install streamxconst {
Readable,
Writable,
Duplex,
Transform,
PassThrough,
Stream,
pipeline,
pipelinePromise
} = require('streamx');For utility functions:
const {
isStream,
isStreamx,
isEnded,
isFinished,
isDisturbed,
getStreamError
} = require('streamx');const { Readable, Writable, pipeline } = require('streamx');
// Create a readable stream
const readable = new Readable({
read(cb) {
this.push('Hello StreamX!');
this.push(null); // End the stream
cb();
}
});
// Create a writable stream
const writable = new Writable({
write(data, cb) {
console.log('Received:', data.toString());
cb();
}
});
// Pipe streams together
readable.pipe(writable, (err) => {
if (err) console.error('Pipeline failed:', err);
else console.log('Pipeline completed successfully');
});StreamX is built around several key improvements over Node.js core streams:
_open and _destroy hooks for resource managementmap and byteLength functionsCore readable stream functionality with enhanced lifecycle management and backpressure handling. Supports both flowing and non-flowing modes with proper pause/resume behavior.
class Readable extends Stream {
constructor(options?: ReadableOptions);
_read(cb: () => void): void;
_open(cb: (err?: Error) => void): void;
_destroy(cb: (err?: Error) => void): void;
push(data: any): boolean;
read(): any;
pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
}
interface ReadableOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
signal?: AbortSignal;
eagerOpen?: boolean;
read?: (cb: () => void) => void;
open?: (cb: (err?: Error) => void) => void;
destroy?: (cb: (err?: Error) => void) => void;
predestroy?: () => void;
}Core writable stream functionality with enhanced drain handling and batch writing support. Includes proper finish/close lifecycle management.
class Writable extends Stream {
constructor(options?: WritableOptions);
_write(data: any, cb: (err?: Error) => void): void;
_writev(batch: any[], cb: (err?: Error) => void): void;
_open(cb: (err?: Error) => void): void;
_destroy(cb: (err?: Error) => void): void;
write(data: any): boolean;
end(): void;
}
interface WritableOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
signal?: AbortSignal;
write?: (data: any, cb: (err?: Error) => void) => void;
writev?: (batch: any[], cb: (err?: Error) => void) => void;
final?: (cb: (err?: Error) => void) => void;
open?: (cb: (err?: Error) => void) => void;
destroy?: (cb: (err?: Error) => void) => void;
predestroy?: () => void;
}Duplex streams that are both readable and writable, and Transform streams that provide data transformation capabilities.
class Duplex extends Readable {
constructor(options?: DuplexOptions);
// Includes all Readable methods plus Writable methods
write(data: any): boolean;
end(): void;
}
class Transform extends Duplex {
constructor(options?: TransformOptions);
_transform(data: any, cb: (err?: Error, data?: any) => void): void;
}
class PassThrough extends Transform {
constructor(options?: TransformOptions);
// Pass-through implementation (no transformation)
}Pipeline utilities for connecting multiple streams with proper error handling and cleanup.
function pipeline(...streams: Stream[], callback?: (err?: Error) => void): Stream;
function pipelinePromise(...streams: Stream[]): Promise<void>;Helper functions for stream inspection and compatibility checking.
function isStream(obj: any): boolean;
function isStreamx(obj: any): boolean;
function isEnded(stream: Readable): boolean;
function isFinished(stream: Writable): boolean;
function isDisturbed(stream: Readable): boolean;
function getStreamError(stream: Stream, opts?: object): Error | null;class Stream extends EventEmitter {
destroyed: boolean;
destroy(err?: Error): void;
}
interface CommonOptions {
highWaterMark?: number; // Buffer size in bytes (default: 16384)
map?: (data: any) => any; // Transform input data
byteLength?: (data: any) => number; // Calculate data size
signal?: AbortSignal; // AbortSignal for destruction
}