Additional utilities for debugging, child process integration, and specialized stream operations that support the core streaming functionality.
const es = require('event-stream');Debug utility that logs all data passing through the stream to console.error, useful for monitoring stream flow and debugging pipelines.
/**
* Log stream data to console.error for debugging
* @param {string} name - Optional prefix for log messages
* @returns {TransformStream} Stream that logs data and passes it through
*/
function log(name);Usage Examples:
const es = require('event-stream');
// Basic logging
es.readArray([1, 2, 3])
.pipe(es.log()) // Logs each item to console.error
.pipe(es.writeArray(function(err, result) {
console.log('Final result:', result);
}));
// Named logging for pipeline debugging
es.readArray(['hello', 'world'])
.pipe(es.log('input'))
.pipe(es.mapSync(str => str.toUpperCase()))
.pipe(es.log('after uppercase'))
.pipe(es.filterSync(str => str.length > 4))
.pipe(es.log('after filter'))
.pipe(es.writeArray(function(err, result) {
console.log(result);
}));
// Multiple log points in complex pipeline
const dataProcessor = es.pipeline(
es.readArray([1, 2, 3, 4, 5]),
es.log('original'),
es.mapSync(n => n * 2),
es.log('doubled'),
es.filterSync(n => n > 5),
es.log('filtered')
);Creates a duplex stream from a child process, allowing stream-based communication with external processes.
/**
* Create duplex stream from child process
* @param {Object} childProcess - Child process with stdin/stdout
* @returns {DuplexStream} Stream connected to child process
*/
function child(childProcess);Usage Examples:
const es = require('event-stream');
const cp = require('child_process');
// Stream through grep
const grepProcess = cp.exec('grep "pattern"');
const grepStream = es.child(grepProcess);
es.readArray(['line with pattern', 'other line', 'pattern here too'])
.pipe(es.join('\n'))
.pipe(grepStream)
.pipe(es.writeArray(function(err, result) {
console.log('Grep results:', result);
}));
// Stream through sed for text replacement
const sedProcess = cp.exec('sed s/old/new/g');
es.readArray(['old text', 'more old stuff'])
.pipe(es.join('\n'))
.pipe(es.child(sedProcess))
.pipe(process.stdout);
// Pipe files through external command
const fs = require('fs');
const sortProcess = cp.exec('sort');
fs.createReadStream('unsorted.txt')
.pipe(es.child(sortProcess))
.pipe(fs.createWriteStream('sorted.txt'));
// Handle process errors
const failingProcess = cp.exec('nonexistent-command');
const processStream = es.child(failingProcess);
processStream.on('error', function(err) {
console.error('Process error:', err);
});Re-exports Node.js core Stream class for direct access to the underlying stream implementation.
/**
* Node.js core Stream class
* @type {typeof require('stream').Stream}
*/
const Stream = require('stream').Stream;Usage Examples:
const es = require('event-stream');
// Access core Stream class
const coreStream = new es.Stream();
coreStream.readable = true;
// Create custom stream extending core Stream
function CustomStream() {
es.Stream.call(this);
this.readable = true;
this.writable = false;
}
CustomStream.prototype = Object.create(es.Stream.prototype);
// Check if object is a stream
function isStream(obj) {
return obj instanceof es.Stream;
}
// Use in stream detection
const testStream = es.readArray([1, 2, 3]);
console.log(isStream(testStream)); // trueThis function is deprecated and throws an error when called. It was removed from the API and should not be used.
/**
* @deprecated This function is deprecated and throws an error
* @throws {Error} Always throws deprecation error
*/
function pipeable();Migration:
// Don't use - will throw error
// es.pipeable(); // Error: [EVENT-STREAM] es.pipeable is deprecated
// Use pipeline/connect/pipe instead
const combined = es.pipeline(stream1, stream2, stream3);
// or
const combined = es.connect(stream1, stream2, stream3);Event Stream re-exports several useful stream utilities from its dependencies:
/**
* Create through stream (from 'through' package)
* @param {Function} writeFunction - Optional write handler
* @param {Function} endFunction - Optional end handler
* @returns {TransformStream} Through stream
*/
const through = require('through');Creates a readable stream from a function that generates data chunks on demand.
/**
* Create readable stream from function (from 'from' package)
* @param {Function} getChunk - Function called with (count, next) to generate chunks
* @returns {ReadableStream} Generated readable stream
*/
function from(getChunk);Usage Examples:
const es = require('event-stream');
// Create stream from function
const numberStream = es.from(function(count, next) {
if (count >= 5) {
return next(null, null); // End stream
}
next(null, count * 2); // Emit doubled count
});
numberStream.pipe(es.writeArray(function(err, result) {
console.log(result); // [0, 2, 4, 6, 8]
}));
// Create stream with error handling
const errorStream = es.from(function(count, next) {
if (count === 3) {
return next(new Error('Test error'));
}
if (count >= 5) {
return next(null, null);
}
next(null, count);
});
errorStream.on('error', function(err) {
console.log('Error:', err.message);
});/**
* Async map transformation (from 'map-stream' package)
* @param {Function} asyncFunction - Async transformation function
* @returns {TransformStream} Async map stream
*/
const map = require('map-stream');/**
* Pauseable stream (from 'pause-stream' package)
* @returns {TransformStream} Pauseable transform stream
*/
const pause = require('pause-stream');/**
* Stream splitter (from 'split' package)
* @param {string|RegExp} matcher - Split delimiter
* @returns {TransformStream} Splitting transform stream
*/
const split = require('split');/**
* Stream combiner (from 'stream-combiner' package)
* @param {...Stream} streams - Streams to combine
* @returns {Stream} Combined stream
*/
const pipeline = require('stream-combiner');/**
* Duplex stream creator (from 'duplexer' package)
* @param {WritableStream} writable - Writable side
* @param {ReadableStream} readable - Readable side
* @returns {DuplexStream} Combined duplex stream
*/
const duplex = require('duplexer');Use utility functions effectively for stream debugging:
Common patterns for using child processes with streams: