Create a stream that emits events from multiple other streams
npx @tessl/cli install tessl/npm-merge-stream@2.0.0Merge-stream is a lightweight Node.js streaming utility that enables merging (interleaving) multiple readable streams into a single output stream. Built on Node.js Streams3 API using PassThrough streams, it provides a simple yet powerful interface for stream aggregation with automatic error handling and lifecycle management.
npm install merge-streamconst mergeStream = require('merge-stream');For ES modules:
import mergeStream from 'merge-stream';const mergeStream = require('merge-stream');
const fs = require('fs');
// Merge multiple readable streams
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');
const merged = mergeStream(stream1, stream2);
merged.on('data', (chunk) => {
console.log('Data:', chunk.toString());
});
merged.on('end', () => {
console.log('All streams completed');
});
// Dynamic addition of streams
const stream3 = fs.createReadStream('file3.txt');
merged.add(stream3);
// Check if merged stream has any sources
if (!merged.isEmpty()) {
// Process the merged stream
merged.pipe(process.stdout);
}merge-stream is built around several key components:
Creates a merged stream from multiple source streams with variadic argument support.
/**
* Creates a merged stream from multiple source streams
* @param {...(Stream|Stream[])} streams - Source streams or arrays of streams to merge
* @returns {PassThrough} PassThrough stream with additional methods (add, isEmpty)
*/
function mergeStream(...streams);The returned stream is a PassThrough stream in object mode with the following properties:
objectMode: true - Handles both object and buffer datamaxListeners: 0 - Unlimited event listeners to prevent warningsadd() and isEmpty()Usage Examples:
const mergeStream = require('merge-stream');
// Create merged stream with initial sources
const merged = mergeStream(stream1, stream2);
// Create empty merged stream
const empty = mergeStream();
// Create with array of streams
const fromArray = mergeStream([stream1, stream2, stream3]);
// Mixed arguments
const mixed = mergeStream(stream1, [stream2, stream3], stream4);Dynamically adds more source streams to an existing merged stream.
/**
* Dynamically adds source streams to the merged stream
* @param {Stream|Stream[]} source - A readable stream or array of streams to add
* @returns {PassThrough} Returns this for method chaining
*/
merged.add(source);Usage Examples:
const merged = mergeStream();
// Add single stream
merged.add(stream1);
// Add array of streams
merged.add([stream2, stream3]);
// Method chaining
merged
.add(stream1)
.add([stream2, stream3])
.add(stream4);Checks if the merged stream has any active source streams.
/**
* Checks if the merged stream has any active source streams
* @returns {boolean} true if no sources are active, false otherwise
*/
merged.isEmpty();Usage Examples:
const merged = mergeStream();
console.log(merged.isEmpty()); // true
merged.add(stream1);
console.log(merged.isEmpty()); // false
// Conditional processing based on stream state
return merged.isEmpty() ? null : merged;The merged stream automatically handles the complete lifecycle of source streams:
The merged stream ends automatically when:
Errors from any source stream are automatically propagated to the merged stream without terminating other active sources.
merged.on('error', (err) => {
console.error('Stream error:', err.message);
// Error came from one of the source streams
// Other source streams continue operating normally
});Commonly used in Gulp build tasks to combine multiple processing streams:
const gulp = require('gulp');
const htmlValidator = require('gulp-w3c-html-validator');
const jsHint = require('gulp-jshint');
const mergeStream = require('merge-stream');
function lint() {
return mergeStream(
gulp.src('src/*.html')
.pipe(htmlValidator())
.pipe(htmlValidator.reporter()),
gulp.src('src/*.js')
.pipe(jsHint())
.pipe(jsHint.reporter())
);
}Using isEmpty() for conditional returns in build systems:
function processFiles(patterns) {
const stream = mergeStream();
patterns.forEach(pattern => {
if (glob.sync(pattern).length > 0) {
stream.add(gulp.src(pattern).pipe(processFile()));
}
});
// Return null if no files match, otherwise return the stream
return stream.isEmpty() ? null : stream;
}Adding streams based on runtime conditions:
const merged = mergeStream();
// Add streams conditionally
if (process.env.NODE_ENV === 'development') {
merged.add(devStream);
}
if (enableFeature) {
merged.add(featureStream);
}
// Process only if streams were added
if (!merged.isEmpty()) {
merged.pipe(destination);
}