or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mdindex.mdstream-combination.mdstream-creation.mdstream-transformation.mdutility-functions.md
tile.json

tessl/npm-event-stream

Construct pipes of streams of events with functional programming patterns for Node.js

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/event-stream@4.0.x

To install, run

npx @tessl/cli install tessl/npm-event-stream@4.0.0

index.mddocs/

Event Stream

Event Stream is a comprehensive Node.js library that provides utilities for creating and working with streams using functional programming patterns. It transforms, filters, splits, joins, and merges streams with an API that resembles array functions but operates on data laid out in time rather than memory.

Package Information

  • Package Name: event-stream
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install event-stream

Core Imports

const es = require('event-stream');

Or import specific functions (all functions are properties of the main export):

const { 
  map, mapSync, 
  through, split, merge, concat,
  pipeline, connect, pipe,
  readArray, writeArray, collect,
  readable, from, duplex, pause,
  parse, stringify, replace, join, wait,
  log, child, filterSync, flatmapSync,
  Stream
} = require('event-stream');

Basic Usage

const es = require('event-stream');
const fs = require('fs');

// Transform and process line-by-line data
fs.createReadStream('data.txt')
  .pipe(es.split())                    // Split on newlines
  .pipe(es.map(function (line, cb) {   // Transform each line
    const processed = line.toUpperCase();
    cb(null, processed);
  }))
  .pipe(es.join('\n'))                 // Join with newlines
  .pipe(process.stdout);

// Merge multiple streams
const stream1 = es.readArray([1, 2, 3]);
const stream2 = es.readArray([4, 5, 6]);
es.merge(stream1, stream2)
  .pipe(es.writeArray(function(err, array) {
    console.log('Merged data:', array);
  }));

Architecture

Event Stream is built around the Node.js Stream API and provides:

  • Stream Creation: Functions to create readable and writable streams from various sources
  • Stream Transformation: Synchronous and asynchronous data transformation utilities
  • Stream Combination: Tools to merge, connect, and combine multiple streams
  • Data Processing: Specialized functions for JSON parsing, string manipulation, and data collection
  • Compatibility: Works with both Node.js 0.8 and 0.10+ stream APIs
  • Functional Programming: Array-like methods adapted for streaming data

Capabilities

Stream Creation and Reading

Create readable streams from arrays, functions, or other data sources with proper pause/resume support.

// Create readable stream from array
function readArray(array: any[]): ReadableStream;

// Create readable stream from async function
function readable(func: Function, continueOnError?: boolean): ReadableStream;

Stream Creation

Stream Transformation

Transform stream data using synchronous and asynchronous functions with built-in error handling.

// Asynchronous transformation with callback
function map(asyncFunction: (data: any, callback: Function) => void): TransformStream;

// Synchronous transformation
function mapSync(syncFunction: (data: any) => any): TransformStream;

// Stream filtering
function filterSync(testFunction: (data: any) => boolean): TransformStream;

Stream Transformation

Stream Combination

Merge, connect, and combine multiple streams into unified data flows.

// Merge multiple streams
function merge(...streams: Stream[]): ReadableStream;
function merge(streamArray: Stream[]): ReadableStream;

// Connect streams in pipeline
function pipeline(...streams: Stream[]): Stream;

Stream Combination

Data Processing

Parse JSON, stringify objects, replace text, and collect stream data with specialized processing functions.

// Parse JSON lines
function parse(options?: {error?: boolean}): TransformStream;

// Stringify objects to JSON
function stringify(): TransformStream;

// Replace text in stream
function replace(from: string | RegExp, to: string): TransformStream;

Data Processing

Utility Functions

Additional utilities for debugging, child processes, and stream management.

// Debug stream data
function log(name?: string): TransformStream;

// Create duplex stream from child process
function child(childProcess: Object): DuplexStream;

// Create through stream with custom write/end handlers
function through(writeFunction?: Function, endFunction?: Function): TransformStream;

Utility Functions

Types

// Re-exported from Node.js core
const Stream: typeof require('stream').Stream;

// Basic stream interfaces (conceptual - JavaScript doesn't have static typing)
interface ReadableStream {
  readable: boolean;
  pause(): void;
  resume(): void;
  destroy(): void;
  pipe(destination: WritableStream): void;
}

interface WritableStream {
  writable: boolean;
  write(data: any): boolean;
  end(): void;
  destroy(): void;
}

interface TransformStream extends ReadableStream, WritableStream {}

interface DuplexStream extends ReadableStream, WritableStream {}