CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-merge-stream

Create a stream that emits events from multiple other streams

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

merge-stream

Merge-stream is a lightweight Node.js streaming utility that enables merging (interleaving) multiple readable streams into a single output stream. Built on Node.js Streams3 API using PassThrough streams, it provides a simple yet powerful interface for stream aggregation with automatic error handling and lifecycle management.

Package Information

  • Package Name: merge-stream
  • Package Type: npm
  • Language: JavaScript (Node.js)
  • Installation: npm install merge-stream

Core Imports

const mergeStream = require('merge-stream');

For ES modules:

import mergeStream from 'merge-stream';

Basic Usage

const mergeStream = require('merge-stream');
const fs = require('fs');

// Merge multiple readable streams
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');

const merged = mergeStream(stream1, stream2);

merged.on('data', (chunk) => {
  console.log('Data:', chunk.toString());
});

merged.on('end', () => {
  console.log('All streams completed');
});

// Dynamic addition of streams
const stream3 = fs.createReadStream('file3.txt');
merged.add(stream3);

// Check if merged stream has any sources
if (!merged.isEmpty()) {
  // Process the merged stream
  merged.pipe(process.stdout);
}

Architecture

merge-stream is built around several key components:

  • PassThrough Stream: Uses Node.js PassThrough stream in object mode as the base
  • Source Management: Tracks active source streams and manages their lifecycle
  • Event Propagation: Forwards data, error, and end events from source streams
  • Dynamic Addition: Allows adding new source streams during runtime
  • Auto-cleanup: Automatically removes ended streams and closes output when no sources remain

Capabilities

Stream Creation and Merging

Creates a merged stream from multiple source streams with variadic argument support.

/**
 * Creates a merged stream from multiple source streams
 * @param {...(Stream|Stream[])} streams - Source streams or arrays of streams to merge
 * @returns {PassThrough} PassThrough stream with additional methods (add, isEmpty)
 */
function mergeStream(...streams);

The returned stream is a PassThrough stream in object mode with the following properties:

  • objectMode: true - Handles both object and buffer data
  • maxListeners: 0 - Unlimited event listeners to prevent warnings
  • Additional methods: add() and isEmpty()

Usage Examples:

const mergeStream = require('merge-stream');

// Create merged stream with initial sources
const merged = mergeStream(stream1, stream2);

// Create empty merged stream
const empty = mergeStream();

// Create with array of streams
const fromArray = mergeStream([stream1, stream2, stream3]);

// Mixed arguments
const mixed = mergeStream(stream1, [stream2, stream3], stream4);

Dynamic Stream Addition

Dynamically adds more source streams to an existing merged stream.

/**
 * Dynamically adds source streams to the merged stream
 * @param {Stream|Stream[]} source - A readable stream or array of streams to add
 * @returns {PassThrough} Returns this for method chaining
 */
merged.add(source);

Usage Examples:

const merged = mergeStream();

// Add single stream
merged.add(stream1);

// Add array of streams
merged.add([stream2, stream3]);

// Method chaining
merged
  .add(stream1)
  .add([stream2, stream3])
  .add(stream4);

Stream State Checking

Checks if the merged stream has any active source streams.

/**
 * Checks if the merged stream has any active source streams
 * @returns {boolean} true if no sources are active, false otherwise
 */
merged.isEmpty();

Usage Examples:

const merged = mergeStream();

console.log(merged.isEmpty()); // true

merged.add(stream1);
console.log(merged.isEmpty()); // false

// Conditional processing based on stream state
return merged.isEmpty() ? null : merged;

Stream Lifecycle and Event Handling

The merged stream automatically handles the complete lifecycle of source streams:

Automatic Event Management

  • Data Events: All data from source streams is forwarded to the merged stream
  • End Events: Source streams are automatically removed when they end
  • Error Events: Errors from source streams are propagated to the merged stream
  • Unpipe Events: Streams that are manually unpiped are removed from tracking

End Behavior

The merged stream ends automatically when:

  • All source streams have ended
  • No more source streams are active
  • The merged stream is readable (not already ended)

Error Propagation

Errors from any source stream are automatically propagated to the merged stream without terminating other active sources.

merged.on('error', (err) => {
  console.error('Stream error:', err.message);
  // Error came from one of the source streams
  // Other source streams continue operating normally
});

Advanced Usage Patterns

Gulp Integration

Commonly used in Gulp build tasks to combine multiple processing streams:

const gulp = require('gulp');
const htmlValidator = require('gulp-w3c-html-validator');
const jsHint = require('gulp-jshint');
const mergeStream = require('merge-stream');

function lint() {
  return mergeStream(
    gulp.src('src/*.html')
      .pipe(htmlValidator())
      .pipe(htmlValidator.reporter()),
    gulp.src('src/*.js')
      .pipe(jsHint())
      .pipe(jsHint.reporter())
  );
}

Conditional Stream Processing

Using isEmpty() for conditional returns in build systems:

function processFiles(patterns) {
  const stream = mergeStream();
  
  patterns.forEach(pattern => {
    if (glob.sync(pattern).length > 0) {
      stream.add(gulp.src(pattern).pipe(processFile()));
    }
  });
  
  // Return null if no files match, otherwise return the stream
  return stream.isEmpty() ? null : stream;
}

Dynamic Stream Addition

Adding streams based on runtime conditions:

const merged = mergeStream();

// Add streams conditionally
if (process.env.NODE_ENV === 'development') {
  merged.add(devStream);
}

if (enableFeature) {
  merged.add(featureStream);
}

// Process only if streams were added
if (!merged.isEmpty()) {
  merged.pipe(destination);
}

docs

index.md

tile.json