CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-event-stream

Construct pipes of streams of events with functional programming patterns for Node.js

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

stream-combination.mddocs/

Stream Combination

Functions for merging, connecting, and combining multiple streams into unified data flows with proper backpressure handling and lifecycle management.

const es = require('event-stream');

Capabilities

Merge

Merges multiple streams into one readable stream, emitting data as soon as it arrives from any source stream without ordering guarantees.

/**
 * Merge multiple streams into a single stream
 * @param {...Stream} streams - Variable number of streams to merge
 * @returns {ReadableStream} Stream that emits data from all input streams
 */
function merge(...streams);

/**
 * Merge an array of streams into a single stream
 * @param {Stream[]} streamArray - Array of streams to merge
 * @returns {ReadableStream} Stream that emits data from all input streams
 */
function merge(streamArray);

// Alias for merge (concat is the same as merge)
const concat = merge;

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Merge multiple streams
const stream1 = es.readArray([1, 2, 3]);
const stream2 = es.readArray([4, 5, 6]);
const stream3 = es.readArray([7, 8, 9]);

es.merge(stream1, stream2, stream3)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [1, 2, 3, 4, 5, 6, 7, 8, 9] (order may vary)
  }));

// Merge from array
const streams = [
  fs.createReadStream('file1.txt'),
  fs.createReadStream('file2.txt'),
  fs.createReadStream('file3.txt')
];

es.merge(streams)
  .pipe(process.stdout);

// Merge process streams
es.merge(process.stderr, process.stdout)
  .pipe(fs.createWriteStream('output.log'));

// Empty merge
es.merge([]).pipe(es.writeArray(function(err, result) {
  console.log(result); // []
}));

// Using concat alias (same as merge)
es.concat(stream1, stream2, stream3)
  .pipe(es.writeArray(function(err, result) {
    console.log('Concatenated:', result);
  }));

Pipeline / Connect / Pipe

Connects multiple streams in sequence, where the output of each stream becomes the input of the next stream.

/**
 * Connect streams in pipeline sequence
 * @param {...Stream} streams - Streams to connect in order
 * @returns {Stream} Combined pipeline stream
 */
function pipeline(...streams);

// Aliases for pipeline (all three function names do the same thing)
const connect = pipeline;
const pipe = pipeline;

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Simple pipeline
const processor = es.pipeline(
  fs.createReadStream('input.txt'),
  es.split(),
  es.map(function(line, cb) {
    cb(null, line.toUpperCase());
  }),
  es.join('\n'),
  fs.createWriteStream('output.txt')
);

processor.on('error', console.error);
processor.on('end', () => console.log('Pipeline complete'));

// Transform pipeline
const dataProcessor = es.connect(
  es.readArray(['hello world', 'foo bar']),
  es.split(' '),
  es.filterSync(word => word.length > 3),
  es.mapSync(word => word.toUpperCase())
);

dataProcessor.pipe(es.writeArray(function(err, result) {
  console.log(result); // ['HELLO', 'WORLD']
}));

// Using pipe alias
const textProcessor = es.pipe(
  es.split('\n'),
  es.filterSync(line => line.trim().length > 0),
  es.mapSync(line => line.trim())
);

Duplex

Combines a writable stream and a readable stream into a single duplex stream, assuming they are connected in some way.

/**
 * Create duplex stream from separate readable and writable streams
 * @param {WritableStream} writeStream - Stream to write to
 * @param {ReadableStream} readStream - Stream to read from
 * @returns {DuplexStream} Combined duplex stream
 */
function duplex(writeStream, readStream);

Usage Examples:

const es = require('event-stream');
const cp = require('child_process');

// Create duplex from child process
const grep = cp.spawn('grep', ['pattern']);
const grepStream = es.duplex(grep.stdin, grep.stdout);

// Use as normal duplex stream
process.stdin
  .pipe(grepStream)
  .pipe(process.stdout);

// Duplex with transform streams
const processor = cp.spawn('sed', ['s/old/new/g']);
const sedStream = es.duplex(processor.stdin, processor.stdout);

es.readArray(['old text', 'more old stuff'])
  .pipe(es.join('\n'))
  .pipe(sedStream)
  .pipe(es.writeArray(function(err, result) {
    console.log(result);
  }));

Pause

Creates a stream that buffers all chunks when paused, providing flow control for stream processing.

/**
 * Create a pauseable stream that buffers data when paused
 * @returns {TransformStream} Stream with enhanced pause/resume behavior
 */
function pause();

Usage Examples:

const es = require('event-stream');

// Create pauseable stream
const pauseStream = es.pause();

// Buffer data while paused
pauseStream.pause();

es.readArray([1, 2, 3, 4, 5])
  .pipe(pauseStream)
  .pipe(es.writeArray(function(err, result) {
    console.log('Received after resume:', result);
  }));

// Resume after delay
setTimeout(() => {
  console.log('Resuming stream...');
  pauseStream.resume();
}, 2000);

// Use in pipeline for flow control
const controlledFlow = es.pipeline(
  es.readArray(largeDataSet),
  es.pause(), // Control flow here
  es.map(function(data, cb) {
    // Process data
    setTimeout(() => cb(null, data), 100);
  })
);

// Pause and resume based on conditions
controlledFlow.on('data', function(data) {
  if (needToSlowDown) {
    this.pause();
    setTimeout(() => this.resume(), 1000);
  }
});

Stream Lifecycle Management

All combination functions properly handle stream lifecycle events:

  • end: Emitted only when all input streams have ended
  • error: Propagated from any input stream to the combined stream
  • close: Emitted when streams are destroyed
  • destroy: Properly destroys all constituent streams

Backpressure Handling

Stream combination functions respect Node.js backpressure mechanisms:

  • merge: Pauses source streams when destination is not ready
  • pipeline: Maintains proper flow control between connected streams
  • duplex: Preserves backpressure signals between read and write sides
  • pause: Provides explicit flow control with buffering

docs

data-processing.md

index.md

stream-combination.md

stream-creation.md

stream-transformation.md

utility-functions.md

tile.json