Functions for merging, connecting, and combining multiple streams into unified data flows with proper backpressure handling and lifecycle management.
const es = require('event-stream');Merges multiple streams into one readable stream, emitting data as soon as it arrives from any source stream without ordering guarantees.
/**
* Merge multiple streams into a single stream
* @param {...Stream} streams - Variable number of streams to merge
* @returns {ReadableStream} Stream that emits data from all input streams
*/
function merge(...streams);
/**
* Merge an array of streams into a single stream
* @param {Stream[]} streamArray - Array of streams to merge
* @returns {ReadableStream} Stream that emits data from all input streams
*/
function merge(streamArray);
// Alias for merge (concat is the same as merge)
const concat = merge;Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Merge multiple streams
const stream1 = es.readArray([1, 2, 3]);
const stream2 = es.readArray([4, 5, 6]);
const stream3 = es.readArray([7, 8, 9]);
es.merge(stream1, stream2, stream3)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [1, 2, 3, 4, 5, 6, 7, 8, 9] (order may vary)
}));
// Merge from array
const streams = [
fs.createReadStream('file1.txt'),
fs.createReadStream('file2.txt'),
fs.createReadStream('file3.txt')
];
es.merge(streams)
.pipe(process.stdout);
// Merge process streams
es.merge(process.stderr, process.stdout)
.pipe(fs.createWriteStream('output.log'));
// Empty merge
es.merge([]).pipe(es.writeArray(function(err, result) {
console.log(result); // []
}));
// Using concat alias (same as merge)
es.concat(stream1, stream2, stream3)
.pipe(es.writeArray(function(err, result) {
console.log('Concatenated:', result);
}));Connects multiple streams in sequence, where the output of each stream becomes the input of the next stream.
/**
* Connect streams in pipeline sequence
* @param {...Stream} streams - Streams to connect in order
* @returns {Stream} Combined pipeline stream
*/
function pipeline(...streams);
// Aliases for pipeline (all three function names do the same thing)
const connect = pipeline;
const pipe = pipeline;Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Simple pipeline
const processor = es.pipeline(
fs.createReadStream('input.txt'),
es.split(),
es.map(function(line, cb) {
cb(null, line.toUpperCase());
}),
es.join('\n'),
fs.createWriteStream('output.txt')
);
processor.on('error', console.error);
processor.on('end', () => console.log('Pipeline complete'));
// Transform pipeline
const dataProcessor = es.connect(
es.readArray(['hello world', 'foo bar']),
es.split(' '),
es.filterSync(word => word.length > 3),
es.mapSync(word => word.toUpperCase())
);
dataProcessor.pipe(es.writeArray(function(err, result) {
console.log(result); // ['HELLO', 'WORLD']
}));
// Using pipe alias
const textProcessor = es.pipe(
es.split('\n'),
es.filterSync(line => line.trim().length > 0),
es.mapSync(line => line.trim())
);Combines a writable stream and a readable stream into a single duplex stream, assuming they are connected in some way.
/**
* Create duplex stream from separate readable and writable streams
* @param {WritableStream} writeStream - Stream to write to
* @param {ReadableStream} readStream - Stream to read from
* @returns {DuplexStream} Combined duplex stream
*/
function duplex(writeStream, readStream);Usage Examples:
const es = require('event-stream');
const cp = require('child_process');
// Create duplex from child process
const grep = cp.spawn('grep', ['pattern']);
const grepStream = es.duplex(grep.stdin, grep.stdout);
// Use as normal duplex stream
process.stdin
.pipe(grepStream)
.pipe(process.stdout);
// Duplex with transform streams
const processor = cp.spawn('sed', ['s/old/new/g']);
const sedStream = es.duplex(processor.stdin, processor.stdout);
es.readArray(['old text', 'more old stuff'])
.pipe(es.join('\n'))
.pipe(sedStream)
.pipe(es.writeArray(function(err, result) {
console.log(result);
}));Creates a stream that buffers all chunks when paused, providing flow control for stream processing.
/**
* Create a pauseable stream that buffers data when paused
* @returns {TransformStream} Stream with enhanced pause/resume behavior
*/
function pause();Usage Examples:
const es = require('event-stream');
// Create pauseable stream
const pauseStream = es.pause();
// Buffer data while paused
pauseStream.pause();
es.readArray([1, 2, 3, 4, 5])
.pipe(pauseStream)
.pipe(es.writeArray(function(err, result) {
console.log('Received after resume:', result);
}));
// Resume after delay
setTimeout(() => {
console.log('Resuming stream...');
pauseStream.resume();
}, 2000);
// Use in pipeline for flow control
const controlledFlow = es.pipeline(
es.readArray(largeDataSet),
es.pause(), // Control flow here
es.map(function(data, cb) {
// Process data
setTimeout(() => cb(null, data), 100);
})
);
// Pause and resume based on conditions
controlledFlow.on('data', function(data) {
if (needToSlowDown) {
this.pause();
setTimeout(() => this.resume(), 1000);
}
});All combination functions properly handle stream lifecycle events:
Stream combination functions respect Node.js backpressure mechanisms: