An iteration of the Node.js core streams with a series of improvements
—
StreamX provides powerful pipeline utilities for connecting multiple streams with automatic error handling, cleanup, and callback support. Pipelines handle stream lifecycle management and ensure proper resource cleanup.
Connects multiple streams together with comprehensive error handling and automatic cleanup.
/**
* Pipe multiple streams together with error handling and cleanup
* @param streams - Streams to connect, with optional callback as last argument
* @returns The last stream in the pipeline
*/
function pipeline(...streams: (Stream | ((err?: Error) => void))[]): Stream;Usage Examples:
const { pipeline, Readable, Transform, Writable } = require('streamx');
// Basic pipeline with callback
const lastStream = pipeline(
Readable.from(['hello', 'world', 'from', 'streamx']),
new Transform({
transform(data, cb) {
cb(null, data.toString().toUpperCase());
}
}),
new Writable({
write(data, cb) {
console.log('Output:', data.toString());
cb();
}
}),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline completed successfully');
}
}
);
// Pipeline without callback
const result = pipeline(
source,
transformer1,
transformer2,
destination
);
// Monitor the last stream
result.on('finish', () => {
console.log('Pipeline finished');
});Promise-based version of pipeline for async/await workflows.
/**
* Promise-based pipeline that resolves when complete or rejects on error
* @param streams - Streams to connect in pipeline
* @returns Promise that resolves when pipeline completes
*/
function pipelinePromise(...streams: Stream[]): Promise<void>;Usage Examples:
const { pipelinePromise, Readable, Transform, Writable } = require('streamx');
// Using async/await
async function processData() {
try {
await pipelinePromise(
Readable.from(['data1', 'data2', 'data3']),
new Transform({
transform(data, cb) {
const processed = `Processed: ${data}`;
cb(null, processed);
}
}),
new Writable({
write(data, cb) {
console.log(data.toString());
cb();
}
})
);
console.log('Pipeline completed successfully');
} catch (err) {
console.error('Pipeline failed:', err);
}
}
// Multiple pipelines in sequence
async function sequentialProcessing() {
await pipelinePromise(sourceA, transformA, destinationA);
await pipelinePromise(sourceB, transformB, destinationB);
console.log('All pipelines completed');
}
// Parallel pipelines
async function parallelProcessing() {
await Promise.all([
pipelinePromise(source1, transform1, dest1),
pipelinePromise(source2, transform2, dest2),
pipelinePromise(source3, transform3, dest3)
]);
console.log('All parallel pipelines completed');
}Helper functions for stream inspection and state checking.
/**
* Check if an object is a stream (Node.js or StreamX)
* @param obj - Object to check
* @returns True if object is a stream
*/
function isStream(obj: any): boolean;
/**
* Check if an object is specifically a StreamX stream
* @param obj - Object to check
* @returns True if object is a StreamX stream
*/
function isStreamx(obj: any): boolean;
/**
* Check if a readable stream has ended
* @param stream - Readable stream to check
* @returns True if stream has ended
*/
function isEnded(stream: Readable): boolean;
/**
* Check if a writable stream has finished
* @param stream - Writable stream to check
* @returns True if stream has finished
*/
function isFinished(stream: Writable): boolean;
/**
* Check if a readable stream has been disturbed (data read from it)
* @param stream - Readable stream to check
* @returns True if stream has been disturbed
*/
function isDisturbed(stream: Readable): boolean;
/**
* Get error from a stream if any exists
* @param stream - Stream to check for errors
* @param opts - Optional configuration
* @returns Error object or null if no error
*/
function getStreamError(stream: Stream, opts?: object): Error | null;Utility Function Examples:
const {
isStream,
isStreamx,
isEnded,
isFinished,
isDisturbed,
getStreamError,
Readable,
Writable
} = require('streamx');
const fs = require('fs');
// Check stream types
const streamxReadable = new Readable();
const nodeReadable = fs.createReadStream('file.txt');
console.log(isStream(streamxReadable)); // true
console.log(isStream(nodeReadable)); // true
console.log(isStreamx(streamxReadable)); // true
console.log(isStreamx(nodeReadable)); // false
// Check stream states
const readable = Readable.from(['data1', 'data2']);
console.log(isEnded(readable)); // false
console.log(isDisturbed(readable)); // false
readable.read(); // Disturb the stream
console.log(isDisturbed(readable)); // true
// Monitor writable stream
const writable = new Writable({
write(data, cb) {
console.log('Written:', data.toString());
cb();
}
});
writable.write('test');
writable.end();
writable.on('finish', () => {
console.log(isFinished(writable)); // true
});
// Error checking
const errorStream = new Readable({
read(cb) {
cb(new Error('Read failed'));
}
});
setTimeout(() => {
const error = getStreamError(errorStream);
if (error) {
console.log('Stream has error:', error.message);
}
}, 100);StreamX pipelines support advanced patterns for complex stream processing workflows.
Conditional Pipeline:
async function conditionalPipeline(data, shouldTransform) {
const streams = [
Readable.from(data)
];
if (shouldTransform) {
streams.push(new Transform({
transform(chunk, cb) {
cb(null, chunk.toString().toUpperCase());
}
}));
}
streams.push(new Writable({
write(chunk, cb) {
console.log('Result:', chunk.toString());
cb();
}
}));
await pipelinePromise(...streams);
}Pipeline with Error Recovery:
async function resilientPipeline() {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
await pipelinePromise(
createSource(),
createTransform(),
createDestination()
);
console.log('Pipeline succeeded');
break;
} catch (err) {
attempt++;
console.log(`Attempt ${attempt} failed:`, err.message);
if (attempt >= maxRetries) {
throw new Error(`Pipeline failed after ${maxRetries} attempts`);
}
// Wait before retry
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
}Pipeline with Monitoring:
function createMonitoredPipeline() {
const source = Readable.from(['data1', 'data2', 'data3']);
const transform = new Transform({
transform(data, cb) {
console.log('Transforming:', data.toString());
cb(null, data.toString().toUpperCase());
}
});
const destination = new Writable({
write(data, cb) {
console.log('Writing:', data.toString());
cb();
}
});
// Monitor each stream
[source, transform, destination].forEach((stream, index) => {
stream.on('error', (err) => {
console.error(`Stream ${index} error:`, err.message);
});
stream.on('close', () => {
console.log(`Stream ${index} closed`);
});
});
return pipeline(source, transform, destination, (err) => {
if (err) {
console.error('Pipeline error:', err.message);
} else {
console.log('Pipeline completed successfully');
}
});
}Dynamic Pipeline Construction:
function createDynamicPipeline(config) {
const streams = [
Readable.from(config.data)
];
// Add transforms based on configuration
if (config.uppercase) {
streams.push(new Transform({
transform(data, cb) {
cb(null, data.toString().toUpperCase());
}
}));
}
if (config.prefix) {
streams.push(new Transform({
transform(data, cb) {
cb(null, `${config.prefix}: ${data}`);
}
}));
}
if (config.filter) {
streams.push(new Transform({
transform(data, cb) {
if (data.toString().includes(config.filter)) {
cb(null, data);
} else {
cb(); // Skip this data
}
}
}));
}
// Add destination
streams.push(new Writable({
write(data, cb) {
console.log('Final output:', data.toString());
cb();
}
}));
return pipelinePromise(...streams);
}
// Usage
createDynamicPipeline({
data: ['hello', 'world', 'test'],
uppercase: true,
prefix: 'OUTPUT',
filter: 'world'
});Pipelines provide comprehensive error handling with automatic cleanup of all streams.
const { pipeline } = require('streamx');
// Pipeline with error-prone transform
pipeline(
Readable.from(['good', 'error', 'data']),
new Transform({
transform(data, cb) {
if (data.toString() === 'error') {
return cb(new Error('Transform failed'));
}
cb(null, data.toString().toUpperCase());
}
}),
new Writable({
write(data, cb) {
console.log('Success:', data.toString());
cb();
}
}),
(err) => {
if (err) {
console.error('Pipeline failed:', err.message);
// All streams are automatically cleaned up
}
}
);Install with Tessl CLI
npx tessl i tessl/npm-streamx