or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mdindex.mdstream-combination.mdstream-creation.mdstream-transformation.mdutility-functions.md
tile.json

utility-functions.mddocs/

Utility Functions

Additional utilities for debugging, child process integration, and specialized stream operations that support the core streaming functionality.

const es = require('event-stream');

Capabilities

Log

Debug utility that logs all data passing through the stream to console.error, useful for monitoring stream flow and debugging pipelines.

/**
 * Log stream data to console.error for debugging
 * @param {string} name - Optional prefix for log messages
 * @returns {TransformStream} Stream that logs data and passes it through
 */
function log(name);

Usage Examples:

const es = require('event-stream');

// Basic logging
es.readArray([1, 2, 3])
  .pipe(es.log()) // Logs each item to console.error
  .pipe(es.writeArray(function(err, result) {
    console.log('Final result:', result);
  }));

// Named logging for pipeline debugging
es.readArray(['hello', 'world'])
  .pipe(es.log('input'))
  .pipe(es.mapSync(str => str.toUpperCase()))
  .pipe(es.log('after uppercase'))
  .pipe(es.filterSync(str => str.length > 4))
  .pipe(es.log('after filter'))
  .pipe(es.writeArray(function(err, result) {
    console.log(result);
  }));

// Multiple log points in complex pipeline
const dataProcessor = es.pipeline(
  es.readArray([1, 2, 3, 4, 5]),
  es.log('original'),
  es.mapSync(n => n * 2),
  es.log('doubled'),
  es.filterSync(n => n > 5),
  es.log('filtered')
);

Child

Creates a duplex stream from a child process, allowing stream-based communication with external processes.

/**
 * Create duplex stream from child process
 * @param {Object} childProcess - Child process with stdin/stdout
 * @returns {DuplexStream} Stream connected to child process
 */
function child(childProcess);

Usage Examples:

const es = require('event-stream');
const cp = require('child_process');

// Stream through grep
const grepProcess = cp.exec('grep "pattern"');
const grepStream = es.child(grepProcess);

es.readArray(['line with pattern', 'other line', 'pattern here too'])
  .pipe(es.join('\n'))
  .pipe(grepStream)
  .pipe(es.writeArray(function(err, result) {
    console.log('Grep results:', result);
  }));

// Stream through sed for text replacement
const sedProcess = cp.exec('sed s/old/new/g');
es.readArray(['old text', 'more old stuff'])
  .pipe(es.join('\n'))
  .pipe(es.child(sedProcess))
  .pipe(process.stdout);

// Pipe files through external command
const fs = require('fs');
const sortProcess = cp.exec('sort');

fs.createReadStream('unsorted.txt')
  .pipe(es.child(sortProcess))
  .pipe(fs.createWriteStream('sorted.txt'));

// Handle process errors
const failingProcess = cp.exec('nonexistent-command');
const processStream = es.child(failingProcess);

processStream.on('error', function(err) {
  console.error('Process error:', err);
});

Stream (Re-export)

Re-exports Node.js core Stream class for direct access to the underlying stream implementation.

/**
 * Node.js core Stream class
 * @type {typeof require('stream').Stream}
 */
const Stream = require('stream').Stream;

Usage Examples:

const es = require('event-stream');

// Access core Stream class
const coreStream = new es.Stream();
coreStream.readable = true;

// Create custom stream extending core Stream
function CustomStream() {
  es.Stream.call(this);
  this.readable = true;
  this.writable = false;
}
CustomStream.prototype = Object.create(es.Stream.prototype);

// Check if object is a stream
function isStream(obj) {
  return obj instanceof es.Stream;
}

// Use in stream detection
const testStream = es.readArray([1, 2, 3]);
console.log(isStream(testStream)); // true

Deprecated Functions

Pipeable (Deprecated)

This function is deprecated and throws an error when called. It was removed from the API and should not be used.

/**
 * @deprecated This function is deprecated and throws an error
 * @throws {Error} Always throws deprecation error
 */
function pipeable();

Migration:

// Don't use - will throw error
// es.pipeable(); // Error: [EVENT-STREAM] es.pipeable is deprecated

// Use pipeline/connect/pipe instead
const combined = es.pipeline(stream1, stream2, stream3);
// or
const combined = es.connect(stream1, stream2, stream3);

Re-exported Dependencies

Event Stream re-exports several useful stream utilities from its dependencies:

Through

/**
 * Create through stream (from 'through' package)
 * @param {Function} writeFunction - Optional write handler
 * @param {Function} endFunction - Optional end handler
 * @returns {TransformStream} Through stream
 */
const through = require('through');

From

Creates a readable stream from a function that generates data chunks on demand.

/**
 * Create readable stream from function (from 'from' package)
 * @param {Function} getChunk - Function called with (count, next) to generate chunks
 * @returns {ReadableStream} Generated readable stream
 */
function from(getChunk);

Usage Examples:

const es = require('event-stream');

// Create stream from function
const numberStream = es.from(function(count, next) {
  if (count >= 5) {
    return next(null, null); // End stream
  }
  next(null, count * 2); // Emit doubled count
});

numberStream.pipe(es.writeArray(function(err, result) {
  console.log(result); // [0, 2, 4, 6, 8]
}));

// Create stream with error handling
const errorStream = es.from(function(count, next) {
  if (count === 3) {
    return next(new Error('Test error'));
  }
  if (count >= 5) {
    return next(null, null);
  }
  next(null, count);
});

errorStream.on('error', function(err) {
  console.log('Error:', err.message);
});

Map (Async)

/**
 * Async map transformation (from 'map-stream' package)
 * @param {Function} asyncFunction - Async transformation function
 * @returns {TransformStream} Async map stream
 */
const map = require('map-stream');

Pause

/**
 * Pauseable stream (from 'pause-stream' package)
 * @returns {TransformStream} Pauseable transform stream
 */
const pause = require('pause-stream');

Split

/**
 * Stream splitter (from 'split' package)
 * @param {string|RegExp} matcher - Split delimiter
 * @returns {TransformStream} Splitting transform stream
 */
const split = require('split');

Pipeline/Stream Combiner

/**
 * Stream combiner (from 'stream-combiner' package)
 * @param {...Stream} streams - Streams to combine
 * @returns {Stream} Combined stream
 */
const pipeline = require('stream-combiner');

Duplex

/**
 * Duplex stream creator (from 'duplexer' package)
 * @param {WritableStream} writable - Writable side
 * @param {ReadableStream} readable - Readable side
 * @returns {DuplexStream} Combined duplex stream
 */
const duplex = require('duplexer');

Debugging Best Practices

Use utility functions effectively for stream debugging:

  1. Strategic Logging: Place log() calls at key pipeline points
  2. Named Logs: Use descriptive names to identify pipeline stages
  3. Process Integration: Use child() for external tool integration
  4. Error Handling: Monitor both stream and process errors
  5. Performance: Remove log() calls in production for better performance

Child Process Integration Patterns

Common patterns for using child processes with streams:

  • Text Processing: sed, awk, grep for text transformation
  • Data Sorting: sort, uniq for data organization
  • Compression: gzip, deflate for data compression
  • Format Conversion: jq, xmllint for data format changes
  • Validation: External validators and linters