or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-merge-stream

Create a stream that emits events from multiple other streams

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/merge-stream@2.0.x

To install, run

npx @tessl/cli install tessl/npm-merge-stream@2.0.0

index.mddocs/

merge-stream

Merge-stream is a lightweight Node.js streaming utility that enables merging (interleaving) multiple readable streams into a single output stream. Built on Node.js Streams3 API using PassThrough streams, it provides a simple yet powerful interface for stream aggregation with automatic error handling and lifecycle management.

Package Information

  • Package Name: merge-stream
  • Package Type: npm
  • Language: JavaScript (Node.js)
  • Installation: npm install merge-stream

Core Imports

const mergeStream = require('merge-stream');

For ES modules:

import mergeStream from 'merge-stream';

Basic Usage

const mergeStream = require('merge-stream');
const fs = require('fs');

// Merge multiple readable streams
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');

const merged = mergeStream(stream1, stream2);

merged.on('data', (chunk) => {
  console.log('Data:', chunk.toString());
});

merged.on('end', () => {
  console.log('All streams completed');
});

// Dynamic addition of streams
const stream3 = fs.createReadStream('file3.txt');
merged.add(stream3);

// Check if merged stream has any sources
if (!merged.isEmpty()) {
  // Process the merged stream
  merged.pipe(process.stdout);
}

Architecture

merge-stream is built around several key components:

  • PassThrough Stream: Uses Node.js PassThrough stream in object mode as the base
  • Source Management: Tracks active source streams and manages their lifecycle
  • Event Propagation: Forwards data, error, and end events from source streams
  • Dynamic Addition: Allows adding new source streams during runtime
  • Auto-cleanup: Automatically removes ended streams and closes output when no sources remain

Capabilities

Stream Creation and Merging

Creates a merged stream from multiple source streams with variadic argument support.

/**
 * Creates a merged stream from multiple source streams
 * @param {...(Stream|Stream[])} streams - Source streams or arrays of streams to merge
 * @returns {PassThrough} PassThrough stream with additional methods (add, isEmpty)
 */
function mergeStream(...streams);

The returned stream is a PassThrough stream in object mode with the following properties:

  • objectMode: true - Handles both object and buffer data
  • maxListeners: 0 - Unlimited event listeners to prevent warnings
  • Additional methods: add() and isEmpty()

Usage Examples:

const mergeStream = require('merge-stream');

// Create merged stream with initial sources
const merged = mergeStream(stream1, stream2);

// Create empty merged stream
const empty = mergeStream();

// Create with array of streams
const fromArray = mergeStream([stream1, stream2, stream3]);

// Mixed arguments
const mixed = mergeStream(stream1, [stream2, stream3], stream4);

Dynamic Stream Addition

Dynamically adds more source streams to an existing merged stream.

/**
 * Dynamically adds source streams to the merged stream
 * @param {Stream|Stream[]} source - A readable stream or array of streams to add
 * @returns {PassThrough} Returns this for method chaining
 */
merged.add(source);

Usage Examples:

const merged = mergeStream();

// Add single stream
merged.add(stream1);

// Add array of streams
merged.add([stream2, stream3]);

// Method chaining
merged
  .add(stream1)
  .add([stream2, stream3])
  .add(stream4);

Stream State Checking

Checks if the merged stream has any active source streams.

/**
 * Checks if the merged stream has any active source streams
 * @returns {boolean} true if no sources are active, false otherwise
 */
merged.isEmpty();

Usage Examples:

const merged = mergeStream();

console.log(merged.isEmpty()); // true

merged.add(stream1);
console.log(merged.isEmpty()); // false

// Conditional processing based on stream state
return merged.isEmpty() ? null : merged;

Stream Lifecycle and Event Handling

The merged stream automatically handles the complete lifecycle of source streams:

Automatic Event Management

  • Data Events: All data from source streams is forwarded to the merged stream
  • End Events: Source streams are automatically removed when they end
  • Error Events: Errors from source streams are propagated to the merged stream
  • Unpipe Events: Streams that are manually unpiped are removed from tracking

End Behavior

The merged stream ends automatically when:

  • All source streams have ended
  • No more source streams are active
  • The merged stream is readable (not already ended)

Error Propagation

Errors from any source stream are automatically propagated to the merged stream without terminating other active sources.

merged.on('error', (err) => {
  console.error('Stream error:', err.message);
  // Error came from one of the source streams
  // Other source streams continue operating normally
});

Advanced Usage Patterns

Gulp Integration

Commonly used in Gulp build tasks to combine multiple processing streams:

const gulp = require('gulp');
const htmlValidator = require('gulp-w3c-html-validator');
const jsHint = require('gulp-jshint');
const mergeStream = require('merge-stream');

function lint() {
  return mergeStream(
    gulp.src('src/*.html')
      .pipe(htmlValidator())
      .pipe(htmlValidator.reporter()),
    gulp.src('src/*.js')
      .pipe(jsHint())
      .pipe(jsHint.reporter())
  );
}

Conditional Stream Processing

Using isEmpty() for conditional returns in build systems:

function processFiles(patterns) {
  const stream = mergeStream();
  
  patterns.forEach(pattern => {
    if (glob.sync(pattern).length > 0) {
      stream.add(gulp.src(pattern).pipe(processFile()));
    }
  });
  
  // Return null if no files match, otherwise return the stream
  return stream.isEmpty() ? null : stream;
}

Dynamic Stream Addition

Adding streams based on runtime conditions:

const merged = mergeStream();

// Add streams conditionally
if (process.env.NODE_ENV === 'development') {
  merged.add(devStream);
}

if (enableFeature) {
  merged.add(featureStream);
}

// Process only if streams were added
if (!merged.isEmpty()) {
  merged.pipe(destination);
}