An iteration of the Node.js core streams with a series of improvements
npx @tessl/cli install tessl/npm-mafintosh--streamx@1.0.0streamx is an iteration of the Node.js core streams with a series of improvements including proper lifecycle support, easy error handling, pipe error handling, simplified API without object mode, and backwards compatibility with Node.js streams.
npm install @mafintosh/streamxconst { Readable, Writable, Duplex, Transform, Stream, isStream, isStreamx } = require("@mafintosh/streamx");For ES modules:
import { Readable, Writable, Duplex, Transform, Stream, isStream, isStreamx } from "@mafintosh/streamx";const { Readable, Writable } = require("@mafintosh/streamx");
// Create a readable stream
const readable = new Readable({
read(cb) {
this.push("Hello World");
this.push(null); // End stream
cb(null);
}
});
// Create a writable stream
const writable = new Writable({
write(data, cb) {
console.log("Received:", data.toString());
cb(null);
}
});
// Pipe streams together
readable.pipe(writable, (err) => {
if (err) console.error("Pipeline failed:", err);
else console.log("Pipeline completed");
});streamx is built around several key improvements over Node.js core streams:
_open() and _destroy() methods provide proper initialization and cleanup.destroy() method with automatic cleanup and error propagationpipe() accepts callbacks and handles errors automaticallymap() and byteLength() functions instead of object modeBase class providing core stream functionality with lifecycle management and error handling.
class Stream extends EventEmitter {
constructor(opts?: StreamOptions);
destroy(error?: Error): void;
get destroyed(): boolean;
get destroying(): boolean;
on(name: string, fn: Function): this;
_open(cb: (error?: Error) => void): void;
_destroy(cb: (error?: Error) => void): void;
}
interface StreamOptions {
open?: (cb: (error?: Error) => void) => void;
destroy?: (cb: (error?: Error) => void) => void;
}Readable stream implementation with buffer management and data flow control.
class Readable extends Stream {
constructor(opts?: ReadableOptions);
push(data: any): boolean;
read(): any;
unshift(data: any): void;
pause(): void;
resume(): void;
pipe(destination: Writable, cb?: (error?: Error) => void): Writable;
_read(cb: (error?: Error) => void): void;
}
interface ReadableOptions extends StreamOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
read?: (cb: (error?: Error) => void) => void;
}Writable stream implementation with buffering and backpressure management.
class Writable extends Stream {
constructor(opts?: WritableOptions);
write(data: any): boolean;
end(data?: any): void;
_write(data: any, cb: (error?: Error) => void): void;
_final(cb: (error?: Error) => void): void;
}
interface WritableOptions extends StreamOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
write?: (data: any, cb: (error?: Error) => void) => void;
final?: (cb: (error?: Error) => void) => void;
}Duplex streams that are both readable and writable, inheriting from Readable with Writable functionality.
class Duplex extends Readable {
constructor(opts?: DuplexOptions);
write(data: any): boolean;
end(data?: any): void;
_write(data: any, cb: (error?: Error) => void): void;
_final(cb: (error?: Error) => void): void;
}
interface DuplexOptions extends ReadableOptions {
mapReadable?: (data: any) => any;
byteLengthReadable?: (data: any) => number;
mapWritable?: (data: any) => any;
byteLengthWritable?: (data: any) => number;
write?: (data: any, cb: (error?: Error) => void) => void;
final?: (cb: (error?: Error) => void) => void;
}Transform streams for data transformation, extending Duplex with transformation capabilities.
class Transform extends Duplex {
constructor(opts?: TransformOptions);
_transform(data: any, cb: (error?: Error, result?: any) => void): void;
}
interface TransformOptions extends DuplexOptions {
transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
flush?: (cb: (error?: Error, result?: any) => void) => void;
}Stream detection utilities for identifying stream types.
/**
* Check if an object is a Node.js stream
* @param stream - Object to test
* @returns True if object is a stream
*/
function isStream(stream: any): boolean;
/**
* Check if an object is a streamx stream
* @param stream - Object to test
* @returns True if object is a streamx stream
*/
function isStreamx(stream: any): boolean;interface StreamOptions {
open?: (cb: (error?: Error) => void) => void;
destroy?: (cb: (error?: Error) => void) => void;
}
interface ReadableOptions extends StreamOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
read?: (cb: (error?: Error) => void) => void;
}
interface WritableOptions extends StreamOptions {
highWaterMark?: number;
map?: (data: any) => any;
byteLength?: (data: any) => number;
write?: (data: any, cb: (error?: Error) => void) => void;
final?: (cb: (error?: Error) => void) => void;
}
interface DuplexOptions extends ReadableOptions {
mapReadable?: (data: any) => any;
byteLengthReadable?: (data: any) => number;
mapWritable?: (data: any) => any;
byteLengthWritable?: (data: any) => number;
write?: (data: any, cb: (error?: Error) => void) => void;
final?: (cb: (error?: Error) => void) => void;
}
interface TransformOptions extends DuplexOptions {
transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
flush?: (cb: (error?: Error, result?: any) => void) => void;
}