CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-event-stream

Construct pipes of streams of events with functional programming patterns for Node.js

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

utility-functions.mddocs/

Utility Functions

Additional utilities for debugging, child process integration, and specialized stream operations that support the core streaming functionality.

const es = require('event-stream');

Capabilities

Log

Debug utility that logs all data passing through the stream to console.error, useful for monitoring stream flow and debugging pipelines.

/**
 * Log stream data to console.error for debugging
 * @param {string} name - Optional prefix for log messages
 * @returns {TransformStream} Stream that logs data and passes it through
 */
function log(name);

Usage Examples:

const es = require('event-stream');

// Basic logging
es.readArray([1, 2, 3])
  .pipe(es.log()) // Logs each item to console.error
  .pipe(es.writeArray(function(err, result) {
    console.log('Final result:', result);
  }));

// Named logging for pipeline debugging
es.readArray(['hello', 'world'])
  .pipe(es.log('input'))
  .pipe(es.mapSync(str => str.toUpperCase()))
  .pipe(es.log('after uppercase'))
  .pipe(es.filterSync(str => str.length > 4))
  .pipe(es.log('after filter'))
  .pipe(es.writeArray(function(err, result) {
    console.log(result);
  }));

// Multiple log points in complex pipeline
const dataProcessor = es.pipeline(
  es.readArray([1, 2, 3, 4, 5]),
  es.log('original'),
  es.mapSync(n => n * 2),
  es.log('doubled'),
  es.filterSync(n => n > 5),
  es.log('filtered')
);

Child

Creates a duplex stream from a child process, allowing stream-based communication with external processes.

/**
 * Create duplex stream from child process
 * @param {Object} childProcess - Child process with stdin/stdout
 * @returns {DuplexStream} Stream connected to child process
 */
function child(childProcess);

Usage Examples:

const es = require('event-stream');
const cp = require('child_process');

// Stream through grep
const grepProcess = cp.exec('grep "pattern"');
const grepStream = es.child(grepProcess);

es.readArray(['line with pattern', 'other line', 'pattern here too'])
  .pipe(es.join('\n'))
  .pipe(grepStream)
  .pipe(es.writeArray(function(err, result) {
    console.log('Grep results:', result);
  }));

// Stream through sed for text replacement
const sedProcess = cp.exec('sed s/old/new/g');
es.readArray(['old text', 'more old stuff'])
  .pipe(es.join('\n'))
  .pipe(es.child(sedProcess))
  .pipe(process.stdout);

// Pipe files through external command
const fs = require('fs');
const sortProcess = cp.exec('sort');

fs.createReadStream('unsorted.txt')
  .pipe(es.child(sortProcess))
  .pipe(fs.createWriteStream('sorted.txt'));

// Handle process errors
const failingProcess = cp.exec('nonexistent-command');
const processStream = es.child(failingProcess);

processStream.on('error', function(err) {
  console.error('Process error:', err);
});

Stream (Re-export)

Re-exports Node.js core Stream class for direct access to the underlying stream implementation.

/**
 * Node.js core Stream class
 * @type {typeof require('stream').Stream}
 */
const Stream = require('stream').Stream;

Usage Examples:

const es = require('event-stream');

// Access core Stream class
const coreStream = new es.Stream();
coreStream.readable = true;

// Create custom stream extending core Stream
function CustomStream() {
  es.Stream.call(this);
  this.readable = true;
  this.writable = false;
}
CustomStream.prototype = Object.create(es.Stream.prototype);

// Check if object is a stream
function isStream(obj) {
  return obj instanceof es.Stream;
}

// Use in stream detection
const testStream = es.readArray([1, 2, 3]);
console.log(isStream(testStream)); // true

Deprecated Functions

Pipeable (Deprecated)

This function is deprecated and throws an error when called. It was removed from the API and should not be used.

/**
 * @deprecated This function is deprecated and throws an error
 * @throws {Error} Always throws deprecation error
 */
function pipeable();

Migration:

// Don't use - will throw error
// es.pipeable(); // Error: [EVENT-STREAM] es.pipeable is deprecated

// Use pipeline/connect/pipe instead
const combined = es.pipeline(stream1, stream2, stream3);
// or
const combined = es.connect(stream1, stream2, stream3);

Re-exported Dependencies

Event Stream re-exports several useful stream utilities from its dependencies:

Through

/**
 * Create through stream (from 'through' package)
 * @param {Function} writeFunction - Optional write handler
 * @param {Function} endFunction - Optional end handler
 * @returns {TransformStream} Through stream
 */
const through = require('through');

From

Creates a readable stream from a function that generates data chunks on demand.

/**
 * Create readable stream from function (from 'from' package)
 * @param {Function} getChunk - Function called with (count, next) to generate chunks
 * @returns {ReadableStream} Generated readable stream
 */
function from(getChunk);

Usage Examples:

const es = require('event-stream');

// Create stream from function
const numberStream = es.from(function(count, next) {
  if (count >= 5) {
    return next(null, null); // End stream
  }
  next(null, count * 2); // Emit doubled count
});

numberStream.pipe(es.writeArray(function(err, result) {
  console.log(result); // [0, 2, 4, 6, 8]
}));

// Create stream with error handling
const errorStream = es.from(function(count, next) {
  if (count === 3) {
    return next(new Error('Test error'));
  }
  if (count >= 5) {
    return next(null, null);
  }
  next(null, count);
});

errorStream.on('error', function(err) {
  console.log('Error:', err.message);
});

Map (Async)

/**
 * Async map transformation (from 'map-stream' package)
 * @param {Function} asyncFunction - Async transformation function
 * @returns {TransformStream} Async map stream
 */
const map = require('map-stream');

Pause

/**
 * Pauseable stream (from 'pause-stream' package)
 * @returns {TransformStream} Pauseable transform stream
 */
const pause = require('pause-stream');

Split

/**
 * Stream splitter (from 'split' package)
 * @param {string|RegExp} matcher - Split delimiter
 * @returns {TransformStream} Splitting transform stream
 */
const split = require('split');

Pipeline/Stream Combiner

/**
 * Stream combiner (from 'stream-combiner' package)
 * @param {...Stream} streams - Streams to combine
 * @returns {Stream} Combined stream
 */
const pipeline = require('stream-combiner');

Duplex

/**
 * Duplex stream creator (from 'duplexer' package)
 * @param {WritableStream} writable - Writable side
 * @param {ReadableStream} readable - Readable side
 * @returns {DuplexStream} Combined duplex stream
 */
const duplex = require('duplexer');

Debugging Best Practices

Use utility functions effectively for stream debugging:

  1. Strategic Logging: Place log() calls at key pipeline points
  2. Named Logs: Use descriptive names to identify pipeline stages
  3. Process Integration: Use child() for external tool integration
  4. Error Handling: Monitor both stream and process errors
  5. Performance: Remove log() calls in production for better performance

Child Process Integration Patterns

Common patterns for using child processes with streams:

  • Text Processing: sed, awk, grep for text transformation
  • Data Sorting: sort, uniq for data organization
  • Compression: gzip, deflate for data compression
  • Format Conversion: jq, xmllint for data format changes
  • Validation: External validators and linters

docs

data-processing.md

index.md

stream-combination.md

stream-creation.md

stream-transformation.md

utility-functions.md

tile.json