or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mdindex.mdstream-combination.mdstream-creation.mdstream-transformation.mdutility-functions.md
tile.json

stream-combination.mddocs/

Stream Combination

Functions for merging, connecting, and combining multiple streams into unified data flows with proper backpressure handling and lifecycle management.

const es = require('event-stream');

Capabilities

Merge

Merges multiple streams into one readable stream, emitting data as soon as it arrives from any source stream without ordering guarantees.

/**
 * Merge multiple streams into a single stream
 * @param {...Stream} streams - Variable number of streams to merge
 * @returns {ReadableStream} Stream that emits data from all input streams
 */
function merge(...streams);

/**
 * Merge an array of streams into a single stream
 * @param {Stream[]} streamArray - Array of streams to merge
 * @returns {ReadableStream} Stream that emits data from all input streams
 */
function merge(streamArray);

// Alias for merge (concat is the same as merge)
const concat = merge;

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Merge multiple streams
const stream1 = es.readArray([1, 2, 3]);
const stream2 = es.readArray([4, 5, 6]);
const stream3 = es.readArray([7, 8, 9]);

es.merge(stream1, stream2, stream3)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [1, 2, 3, 4, 5, 6, 7, 8, 9] (order may vary)
  }));

// Merge from array
const streams = [
  fs.createReadStream('file1.txt'),
  fs.createReadStream('file2.txt'),
  fs.createReadStream('file3.txt')
];

es.merge(streams)
  .pipe(process.stdout);

// Merge process streams
es.merge(process.stderr, process.stdout)
  .pipe(fs.createWriteStream('output.log'));

// Empty merge
es.merge([]).pipe(es.writeArray(function(err, result) {
  console.log(result); // []
}));

// Using concat alias (same as merge)
es.concat(stream1, stream2, stream3)
  .pipe(es.writeArray(function(err, result) {
    console.log('Concatenated:', result);
  }));

Pipeline / Connect / Pipe

Connects multiple streams in sequence, where the output of each stream becomes the input of the next stream.

/**
 * Connect streams in pipeline sequence
 * @param {...Stream} streams - Streams to connect in order
 * @returns {Stream} Combined pipeline stream
 */
function pipeline(...streams);

// Aliases for pipeline (all three function names do the same thing)
const connect = pipeline;
const pipe = pipeline;

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Simple pipeline
const processor = es.pipeline(
  fs.createReadStream('input.txt'),
  es.split(),
  es.map(function(line, cb) {
    cb(null, line.toUpperCase());
  }),
  es.join('\n'),
  fs.createWriteStream('output.txt')
);

processor.on('error', console.error);
processor.on('end', () => console.log('Pipeline complete'));

// Transform pipeline
const dataProcessor = es.connect(
  es.readArray(['hello world', 'foo bar']),
  es.split(' '),
  es.filterSync(word => word.length > 3),
  es.mapSync(word => word.toUpperCase())
);

dataProcessor.pipe(es.writeArray(function(err, result) {
  console.log(result); // ['HELLO', 'WORLD']
}));

// Using pipe alias
const textProcessor = es.pipe(
  es.split('\n'),
  es.filterSync(line => line.trim().length > 0),
  es.mapSync(line => line.trim())
);

Duplex

Combines a writable stream and a readable stream into a single duplex stream, assuming they are connected in some way.

/**
 * Create duplex stream from separate readable and writable streams
 * @param {WritableStream} writeStream - Stream to write to
 * @param {ReadableStream} readStream - Stream to read from
 * @returns {DuplexStream} Combined duplex stream
 */
function duplex(writeStream, readStream);

Usage Examples:

const es = require('event-stream');
const cp = require('child_process');

// Create duplex from child process
const grep = cp.spawn('grep', ['pattern']);
const grepStream = es.duplex(grep.stdin, grep.stdout);

// Use as normal duplex stream
process.stdin
  .pipe(grepStream)
  .pipe(process.stdout);

// Duplex with transform streams
const processor = cp.spawn('sed', ['s/old/new/g']);
const sedStream = es.duplex(processor.stdin, processor.stdout);

es.readArray(['old text', 'more old stuff'])
  .pipe(es.join('\n'))
  .pipe(sedStream)
  .pipe(es.writeArray(function(err, result) {
    console.log(result);
  }));

Pause

Creates a stream that buffers all chunks when paused, providing flow control for stream processing.

/**
 * Create a pauseable stream that buffers data when paused
 * @returns {TransformStream} Stream with enhanced pause/resume behavior
 */
function pause();

Usage Examples:

const es = require('event-stream');

// Create pauseable stream
const pauseStream = es.pause();

// Buffer data while paused
pauseStream.pause();

es.readArray([1, 2, 3, 4, 5])
  .pipe(pauseStream)
  .pipe(es.writeArray(function(err, result) {
    console.log('Received after resume:', result);
  }));

// Resume after delay
setTimeout(() => {
  console.log('Resuming stream...');
  pauseStream.resume();
}, 2000);

// Use in pipeline for flow control
const controlledFlow = es.pipeline(
  es.readArray(largeDataSet),
  es.pause(), // Control flow here
  es.map(function(data, cb) {
    // Process data
    setTimeout(() => cb(null, data), 100);
  })
);

// Pause and resume based on conditions
controlledFlow.on('data', function(data) {
  if (needToSlowDown) {
    this.pause();
    setTimeout(() => this.resume(), 1000);
  }
});

Stream Lifecycle Management

All combination functions properly handle stream lifecycle events:

  • end: Emitted only when all input streams have ended
  • error: Propagated from any input stream to the combined stream
  • close: Emitted when streams are destroyed
  • destroy: Properly destroys all constituent streams

Backpressure Handling

Stream combination functions respect Node.js backpressure mechanisms:

  • merge: Pauses source streams when destination is not ready
  • pipeline: Maintains proper flow control between connected streams
  • duplex: Preserves backpressure signals between read and write sides
  • pause: Provides explicit flow control with buffering