Construct pipes of streams of events with functional programming patterns for Node.js
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Functions for transforming stream data using synchronous and asynchronous operations with built-in error handling and flow control.
const es = require('event-stream');Creates a transform stream from an asynchronous function that processes each data item with callback-based error handling.
/**
* Create a transform stream from an async function
* @param {Function} asyncFunction - Function called with (data, callback)
* @returns {TransformStream} Stream that transforms data asynchronously
*/
function map(asyncFunction);The callback function must be called with one of:
callback() - Drop this data (acts like filter)callback(null, newData) - Transform data to newDatacallback(error) - Emit an error for this itemUsage Examples:
const es = require('event-stream');
// Async transformation
const asyncTransformer = es.map(function(data, callback) {
// Simulate async operation
setTimeout(() => {
const result = data.toString().toUpperCase();
callback(null, result);
}, 10);
});
es.readArray(['hello', 'world'])
.pipe(asyncTransformer)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['HELLO', 'WORLD']
}));
// With error handling
const errorHandler = es.map(function(data, callback) {
if (typeof data !== 'string') {
return callback(new Error('Expected string'));
}
callback(null, data.length);
});
// Filter by not calling callback
const evenFilter = es.map(function(number, callback) {
if (number % 2 === 0) {
callback(null, number); // Keep even numbers
} else {
callback(); // Drop odd numbers
}
});Creates a transform stream using a synchronous function that processes data immediately with automatic error handling.
/**
* Create a transform stream with synchronous transformation
* @param {Function} syncFunction - Function that transforms data synchronously
* @returns {TransformStream} Stream that transforms data synchronously
*/
function mapSync(syncFunction);Usage Examples:
const es = require('event-stream');
// Synchronous transformation
const doubler = es.mapSync(function(data) {
return data * 2;
});
es.readArray([1, 2, 3, 4])
.pipe(doubler)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6, 8]
}));
// Object transformation
const userFormatter = es.mapSync(function(user) {
return {
id: user.id,
name: user.name.toUpperCase(),
active: true
};
});
// Return undefined to filter out data
const positiveOnly = es.mapSync(function(number) {
return number > 0 ? number : undefined;
});Creates a transform stream that filters data based on a synchronous predicate function.
/**
* Filter stream data based on a test function
* @param {Function} testFunction - Function that returns boolean for each item
* @returns {TransformStream} Stream that only emits items passing the test
*/
function filterSync(testFunction);Usage Examples:
const es = require('event-stream');
// Filter numbers
const evenNumbers = es.filterSync(function(number) {
return number % 2 === 0;
});
es.readArray([1, 2, 3, 4, 5, 6])
.pipe(evenNumbers)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6]
}));
// Filter objects
const activeUsers = es.filterSync(function(user) {
return user.active === true;
});
const users = [
{ name: 'Alice', active: true },
{ name: 'Bob', active: false },
{ name: 'Charlie', active: true }
];
es.readArray(users)
.pipe(activeUsers)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [{ name: 'Alice', active: true }, { name: 'Charlie', active: true }]
}));Creates a transform stream that applies a mapping function to nested data and flattens the results.
/**
* Map and flatten nested data structures
* @param {Function} mapperFunction - Function that maps each array element
* @returns {TransformStream} Stream that flattens mapped results
*/
function flatmapSync(mapperFunction);Usage Examples:
const es = require('event-stream');
// Flatten arrays of numbers
const multiplier = es.flatmapSync(function(number) {
return number * 2;
});
// Input: array of arrays, each element gets mapped
es.readArray([[1, 2], [3, 4]])
.pipe(multiplier)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6, 8]
}));
// Transform object properties
const propertyExtractor = es.flatmapSync(function(obj) {
return obj.name + '_processed';
});
es.readArray([
[{ name: 'item1' }, { name: 'item2' }],
[{ name: 'item3' }]
])
.pipe(propertyExtractor)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['item1_processed', 'item2_processed', 'item3_processed']
}));Breaks up a stream by a delimiter and reassembles it so that each split part becomes a separate chunk.
/**
* Split stream data by delimiter (string or regex)
* @param {string|RegExp} matcher - Delimiter to split on (defaults to '\n')
* @returns {TransformStream} Stream that emits split chunks
*/
function split(matcher);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Split on newlines (default)
fs.createReadStream('file.txt')
.pipe(es.split())
.pipe(es.map(function(line, cb) {
console.log('Line:', line);
cb(null, line);
}));
// Split on custom delimiter
const csvSplitter = es.split(',');
es.readArray(['a,b,c', 'd,e,f'])
.pipe(csvSplitter)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['a', 'b', 'c', 'd', 'e', 'f']
}));
// Keep line breaks using regex
fs.createReadStream('file.txt')
.pipe(es.split(/(\r?\n)/))
.pipe(es.map(function(chunk, cb) {
// Chunks will include line breaks
cb(null, chunk);
}));
// Split on regex pattern
const wordSplitter = es.split(/\s+/);All transformation functions include built-in error handling:
Streams continue processing after errors unless explicitly destroyed.