Construct pipes of streams of events with functional programming patterns for Node.js
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Additional utilities for debugging, child process integration, and specialized stream operations that support the core streaming functionality.
const es = require('event-stream');Debug utility that logs all data passing through the stream to console.error, useful for monitoring stream flow and debugging pipelines.
/**
* Log stream data to console.error for debugging
* @param {string} name - Optional prefix for log messages
* @returns {TransformStream} Stream that logs data and passes it through
*/
function log(name);Usage Examples:
const es = require('event-stream');
// Basic logging
es.readArray([1, 2, 3])
.pipe(es.log()) // Logs each item to console.error
.pipe(es.writeArray(function(err, result) {
console.log('Final result:', result);
}));
// Named logging for pipeline debugging
es.readArray(['hello', 'world'])
.pipe(es.log('input'))
.pipe(es.mapSync(str => str.toUpperCase()))
.pipe(es.log('after uppercase'))
.pipe(es.filterSync(str => str.length > 4))
.pipe(es.log('after filter'))
.pipe(es.writeArray(function(err, result) {
console.log(result);
}));
// Multiple log points in complex pipeline
const dataProcessor = es.pipeline(
es.readArray([1, 2, 3, 4, 5]),
es.log('original'),
es.mapSync(n => n * 2),
es.log('doubled'),
es.filterSync(n => n > 5),
es.log('filtered')
);Creates a duplex stream from a child process, allowing stream-based communication with external processes.
/**
* Create duplex stream from child process
* @param {Object} childProcess - Child process with stdin/stdout
* @returns {DuplexStream} Stream connected to child process
*/
function child(childProcess);Usage Examples:
const es = require('event-stream');
const cp = require('child_process');
// Stream through grep
const grepProcess = cp.exec('grep "pattern"');
const grepStream = es.child(grepProcess);
es.readArray(['line with pattern', 'other line', 'pattern here too'])
.pipe(es.join('\n'))
.pipe(grepStream)
.pipe(es.writeArray(function(err, result) {
console.log('Grep results:', result);
}));
// Stream through sed for text replacement
const sedProcess = cp.exec('sed s/old/new/g');
es.readArray(['old text', 'more old stuff'])
.pipe(es.join('\n'))
.pipe(es.child(sedProcess))
.pipe(process.stdout);
// Pipe files through external command
const fs = require('fs');
const sortProcess = cp.exec('sort');
fs.createReadStream('unsorted.txt')
.pipe(es.child(sortProcess))
.pipe(fs.createWriteStream('sorted.txt'));
// Handle process errors
const failingProcess = cp.exec('nonexistent-command');
const processStream = es.child(failingProcess);
processStream.on('error', function(err) {
console.error('Process error:', err);
});Re-exports Node.js core Stream class for direct access to the underlying stream implementation.
/**
* Node.js core Stream class
* @type {typeof require('stream').Stream}
*/
const Stream = require('stream').Stream;Usage Examples:
const es = require('event-stream');
// Access core Stream class
const coreStream = new es.Stream();
coreStream.readable = true;
// Create custom stream extending core Stream
function CustomStream() {
es.Stream.call(this);
this.readable = true;
this.writable = false;
}
CustomStream.prototype = Object.create(es.Stream.prototype);
// Check if object is a stream
function isStream(obj) {
return obj instanceof es.Stream;
}
// Use in stream detection
const testStream = es.readArray([1, 2, 3]);
console.log(isStream(testStream)); // trueThis function is deprecated and throws an error when called. It was removed from the API and should not be used.
/**
* @deprecated This function is deprecated and throws an error
* @throws {Error} Always throws deprecation error
*/
function pipeable();Migration:
// Don't use - will throw error
// es.pipeable(); // Error: [EVENT-STREAM] es.pipeable is deprecated
// Use pipeline/connect/pipe instead
const combined = es.pipeline(stream1, stream2, stream3);
// or
const combined = es.connect(stream1, stream2, stream3);Event Stream re-exports several useful stream utilities from its dependencies:
/**
* Create through stream (from 'through' package)
* @param {Function} writeFunction - Optional write handler
* @param {Function} endFunction - Optional end handler
* @returns {TransformStream} Through stream
*/
const through = require('through');Creates a readable stream from a function that generates data chunks on demand.
/**
* Create readable stream from function (from 'from' package)
* @param {Function} getChunk - Function called with (count, next) to generate chunks
* @returns {ReadableStream} Generated readable stream
*/
function from(getChunk);Usage Examples:
const es = require('event-stream');
// Create stream from function
const numberStream = es.from(function(count, next) {
if (count >= 5) {
return next(null, null); // End stream
}
next(null, count * 2); // Emit doubled count
});
numberStream.pipe(es.writeArray(function(err, result) {
console.log(result); // [0, 2, 4, 6, 8]
}));
// Create stream with error handling
const errorStream = es.from(function(count, next) {
if (count === 3) {
return next(new Error('Test error'));
}
if (count >= 5) {
return next(null, null);
}
next(null, count);
});
errorStream.on('error', function(err) {
console.log('Error:', err.message);
});/**
* Async map transformation (from 'map-stream' package)
* @param {Function} asyncFunction - Async transformation function
* @returns {TransformStream} Async map stream
*/
const map = require('map-stream');/**
* Pauseable stream (from 'pause-stream' package)
* @returns {TransformStream} Pauseable transform stream
*/
const pause = require('pause-stream');/**
* Stream splitter (from 'split' package)
* @param {string|RegExp} matcher - Split delimiter
* @returns {TransformStream} Splitting transform stream
*/
const split = require('split');/**
* Stream combiner (from 'stream-combiner' package)
* @param {...Stream} streams - Streams to combine
* @returns {Stream} Combined stream
*/
const pipeline = require('stream-combiner');/**
* Duplex stream creator (from 'duplexer' package)
* @param {WritableStream} writable - Writable side
* @param {ReadableStream} readable - Readable side
* @returns {DuplexStream} Combined duplex stream
*/
const duplex = require('duplexer');Use utility functions effectively for stream debugging:
Common patterns for using child processes with streams: