or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-combined-stream

A stream that emits multiple other streams one after another

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/combined-stream@1.0.x

To install, run

npx @tessl/cli install tessl/npm-combined-stream@1.0.0

index.mddocs/

Combined Stream

Combined Stream is a Node.js streaming utility that enables combining multiple streams into a single stream that emits them sequentially one after another. It provides flexible stream management with configurable back pressure control, memory management via buffer size limits, and supports both immediate and lazy stream loading.

Package Information

  • Package Name: combined-stream
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install combined-stream

Core Imports

const CombinedStream = require('combined-stream');

Basic Usage

const CombinedStream = require('combined-stream');
const fs = require('fs');

// Create a combined stream
const combinedStream = CombinedStream.create();

// Append multiple streams
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

// Pipe to destination
combinedStream.pipe(fs.createWriteStream('combined.txt'));

Capabilities

Stream Creation

Create a new combined stream instance with optional configuration.

/**
 * Create a new CombinedStream instance
 * @param {Object} [options] - Configuration options
 * @param {number} [options.maxDataSize] - Maximum buffer size in bytes (default: 2MB)
 * @param {boolean} [options.pauseStreams] - Whether to pause streams (default: true)
 * @returns {CombinedStream} New CombinedStream instance
 */
CombinedStream.create(options);

Usage Examples:

// Basic creation
const combinedStream = CombinedStream.create();

// With custom options
const combinedStream = CombinedStream.create({
  maxDataSize: 5 * 1024 * 1024,  // 5MB limit
  pauseStreams: false            // Don't pause source streams
});

Stream Testing

Test whether an object is stream-like for internal processing.

/**
 * Test if an object is stream-like
 * @param {any} stream - Object to test
 * @returns {boolean} True if stream-like, false otherwise
 */
CombinedStream.isStreamLike(stream);

Stream Appending

Append streams, strings, buffers, or callback functions to the combined stream.

/**
 * Append a stream, string, buffer, or callback to the combined stream
 * @param {Stream|Function|string|Buffer} stream - Item to append
 * @returns {CombinedStream} This instance for chaining
 */
combinedStream.append(stream);

Usage Examples:

const combinedStream = CombinedStream.create();

// Append file streams
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

// Append strings and buffers
combinedStream.append('Hello ');
combinedStream.append(Buffer.from('World!'));

// Append lazy streams via callback
combinedStream.append(function(next) {
  // Stream is created only when needed
  next(fs.createReadStream('lazy-file.txt'));
});

// Method chaining
combinedStream
  .append(fs.createReadStream('file1.txt'))
  .append('separator')
  .append(fs.createReadStream('file2.txt'));

Stream Piping

Pipe the combined stream to a destination and start flowing.

/**
 * Pipe combined stream to destination and start processing
 * @param {Stream} dest - Destination stream
 * @param {Object} [options] - Pipe options
 * @returns {Stream} The destination stream
 */
combinedStream.pipe(dest, options);

Flow Control

Control stream processing with pause and resume operations.

/**
 * Pause stream processing (if pauseStreams is enabled)
 */
combinedStream.pause();

/**
 * Resume stream processing and start draining queued streams
 */
combinedStream.resume();

Usage Example:

const combinedStream = CombinedStream.create({pauseStreams: true});

// Add streams
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

// Control processing
combinedStream.pause();   // Pause current stream
combinedStream.resume();  // Resume processing

Stream Termination

End or destroy the combined stream.

/**
 * End the stream gracefully, emitting 'end' event
 */
combinedStream.end();

/**
 * Destroy the stream immediately, emitting 'close' event
 */
combinedStream.destroy();

Manual Data Writing

Write data directly to the stream (primarily for internal use).

/**
 * Write data to stream (emits 'data' event)
 * @param {any} data - Data to write
 */
combinedStream.write(data);

Configuration Options

Available options for CombinedStream.create():

interface CombinedStreamOptions {
  /** Maximum buffer size in bytes (default: 2 * 1024 * 1024) */
  maxDataSize?: number;
  /** Whether to pause underlying streams (default: true) */
  pauseStreams?: boolean;
}

Properties

interface CombinedStream {
  /** Whether the stream accepts writes */
  writable: boolean;
  /** Whether the stream produces data */
  readable: boolean;
  /** Current buffer size in bytes */
  dataSize: number;
  /** Maximum allowed buffer size */
  maxDataSize: number;
  /** Whether to pause underlying streams */
  pauseStreams: boolean;
}

Events

Combined streams emit standard Node.js stream events:

  • 'data' - Emitted when data is available from any appended stream
  • 'end' - Emitted when all streams have ended
  • 'close' - Emitted when stream is destroyed
  • 'error' - Emitted when an error occurs in any appended stream
  • 'pause' - Emitted when stream is paused
  • 'resume' - Emitted when stream resumes

Usage Example:

const combinedStream = CombinedStream.create();

combinedStream.on('data', (chunk) => {
  console.log('Received:', chunk.toString());
});

combinedStream.on('end', () => {
  console.log('All streams completed');
});

combinedStream.on('error', (err) => {
  console.error('Stream error:', err);
});

Error Handling

Combined Stream automatically handles errors from appended streams and re-emits them. When maxDataSize is exceeded, an error is emitted:

const combinedStream = CombinedStream.create({maxDataSize: 1024});

combinedStream.on('error', (err) => {
  if (err.message.includes('maxDataSize')) {
    console.error('Buffer limit exceeded');
  }
});

Advanced Usage Patterns

Lazy Stream Loading

Use callback functions to defer stream creation until needed:

const combinedStream = CombinedStream.create();

combinedStream.append(function(next) {
  // This runs only when it's time to process this stream
  const stream = someExpensiveStreamCreation();
  next(stream);
});

Memory Management

Control memory usage with maxDataSize and monitoring:

const combinedStream = CombinedStream.create({
  maxDataSize: 10 * 1024 * 1024  // 10MB limit
});

// Monitor buffer size
setInterval(() => {
  console.log('Buffer size:', combinedStream.dataSize, 'bytes');
}, 1000);

Back Pressure Control

Manage stream pressure with pauseStreams setting:

// Immediate processing (no back pressure)
const fastStream = CombinedStream.create({pauseStreams: false});

// Controlled processing (with back pressure)
const controlledStream = CombinedStream.create({pauseStreams: true});