Construct pipes of streams of events with functional programming patterns for Node.js
npx @tessl/cli install tessl/npm-event-stream@4.0.0Event Stream is a comprehensive Node.js library that provides utilities for creating and working with streams using functional programming patterns. It transforms, filters, splits, joins, and merges streams with an API that resembles array functions but operates on data laid out in time rather than memory.
npm install event-streamconst es = require('event-stream');Or import specific functions (all functions are properties of the main export):
const {
map, mapSync,
through, split, merge, concat,
pipeline, connect, pipe,
readArray, writeArray, collect,
readable, from, duplex, pause,
parse, stringify, replace, join, wait,
log, child, filterSync, flatmapSync,
Stream
} = require('event-stream');const es = require('event-stream');
const fs = require('fs');
// Transform and process line-by-line data
fs.createReadStream('data.txt')
.pipe(es.split()) // Split on newlines
.pipe(es.map(function (line, cb) { // Transform each line
const processed = line.toUpperCase();
cb(null, processed);
}))
.pipe(es.join('\n')) // Join with newlines
.pipe(process.stdout);
// Merge multiple streams
const stream1 = es.readArray([1, 2, 3]);
const stream2 = es.readArray([4, 5, 6]);
es.merge(stream1, stream2)
.pipe(es.writeArray(function(err, array) {
console.log('Merged data:', array);
}));Event Stream is built around the Node.js Stream API and provides:
Create readable streams from arrays, functions, or other data sources with proper pause/resume support.
// Create readable stream from array
function readArray(array: any[]): ReadableStream;
// Create readable stream from async function
function readable(func: Function, continueOnError?: boolean): ReadableStream;Transform stream data using synchronous and asynchronous functions with built-in error handling.
// Asynchronous transformation with callback
function map(asyncFunction: (data: any, callback: Function) => void): TransformStream;
// Synchronous transformation
function mapSync(syncFunction: (data: any) => any): TransformStream;
// Stream filtering
function filterSync(testFunction: (data: any) => boolean): TransformStream;Merge, connect, and combine multiple streams into unified data flows.
// Merge multiple streams
function merge(...streams: Stream[]): ReadableStream;
function merge(streamArray: Stream[]): ReadableStream;
// Connect streams in pipeline
function pipeline(...streams: Stream[]): Stream;Parse JSON, stringify objects, replace text, and collect stream data with specialized processing functions.
// Parse JSON lines
function parse(options?: {error?: boolean}): TransformStream;
// Stringify objects to JSON
function stringify(): TransformStream;
// Replace text in stream
function replace(from: string | RegExp, to: string): TransformStream;Additional utilities for debugging, child processes, and stream management.
// Debug stream data
function log(name?: string): TransformStream;
// Create duplex stream from child process
function child(childProcess: Object): DuplexStream;
// Create through stream with custom write/end handlers
function through(writeFunction?: Function, endFunction?: Function): TransformStream;// Re-exported from Node.js core
const Stream: typeof require('stream').Stream;
// Basic stream interfaces (conceptual - JavaScript doesn't have static typing)
interface ReadableStream {
readable: boolean;
pause(): void;
resume(): void;
destroy(): void;
pipe(destination: WritableStream): void;
}
interface WritableStream {
writable: boolean;
write(data: any): boolean;
end(): void;
destroy(): void;
}
interface TransformStream extends ReadableStream, WritableStream {}
interface DuplexStream extends ReadableStream, WritableStream {}