Functions for transforming stream data using synchronous and asynchronous operations with built-in error handling and flow control.
const es = require('event-stream');Creates a transform stream from an asynchronous function that processes each data item with callback-based error handling.
/**
* Create a transform stream from an async function
* @param {Function} asyncFunction - Function called with (data, callback)
* @returns {TransformStream} Stream that transforms data asynchronously
*/
function map(asyncFunction);The callback function must be called with one of:
callback() - Drop this data (acts like filter)callback(null, newData) - Transform data to newDatacallback(error) - Emit an error for this itemUsage Examples:
const es = require('event-stream');
// Async transformation
const asyncTransformer = es.map(function(data, callback) {
// Simulate async operation
setTimeout(() => {
const result = data.toString().toUpperCase();
callback(null, result);
}, 10);
});
es.readArray(['hello', 'world'])
.pipe(asyncTransformer)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['HELLO', 'WORLD']
}));
// With error handling
const errorHandler = es.map(function(data, callback) {
if (typeof data !== 'string') {
return callback(new Error('Expected string'));
}
callback(null, data.length);
});
// Filter by not calling callback
const evenFilter = es.map(function(number, callback) {
if (number % 2 === 0) {
callback(null, number); // Keep even numbers
} else {
callback(); // Drop odd numbers
}
});Creates a transform stream using a synchronous function that processes data immediately with automatic error handling.
/**
* Create a transform stream with synchronous transformation
* @param {Function} syncFunction - Function that transforms data synchronously
* @returns {TransformStream} Stream that transforms data synchronously
*/
function mapSync(syncFunction);Usage Examples:
const es = require('event-stream');
// Synchronous transformation
const doubler = es.mapSync(function(data) {
return data * 2;
});
es.readArray([1, 2, 3, 4])
.pipe(doubler)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6, 8]
}));
// Object transformation
const userFormatter = es.mapSync(function(user) {
return {
id: user.id,
name: user.name.toUpperCase(),
active: true
};
});
// Return undefined to filter out data
const positiveOnly = es.mapSync(function(number) {
return number > 0 ? number : undefined;
});Creates a transform stream that filters data based on a synchronous predicate function.
/**
* Filter stream data based on a test function
* @param {Function} testFunction - Function that returns boolean for each item
* @returns {TransformStream} Stream that only emits items passing the test
*/
function filterSync(testFunction);Usage Examples:
const es = require('event-stream');
// Filter numbers
const evenNumbers = es.filterSync(function(number) {
return number % 2 === 0;
});
es.readArray([1, 2, 3, 4, 5, 6])
.pipe(evenNumbers)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6]
}));
// Filter objects
const activeUsers = es.filterSync(function(user) {
return user.active === true;
});
const users = [
{ name: 'Alice', active: true },
{ name: 'Bob', active: false },
{ name: 'Charlie', active: true }
];
es.readArray(users)
.pipe(activeUsers)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [{ name: 'Alice', active: true }, { name: 'Charlie', active: true }]
}));Creates a transform stream that applies a mapping function to nested data and flattens the results.
/**
* Map and flatten nested data structures
* @param {Function} mapperFunction - Function that maps each array element
* @returns {TransformStream} Stream that flattens mapped results
*/
function flatmapSync(mapperFunction);Usage Examples:
const es = require('event-stream');
// Flatten arrays of numbers
const multiplier = es.flatmapSync(function(number) {
return number * 2;
});
// Input: array of arrays, each element gets mapped
es.readArray([[1, 2], [3, 4]])
.pipe(multiplier)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6, 8]
}));
// Transform object properties
const propertyExtractor = es.flatmapSync(function(obj) {
return obj.name + '_processed';
});
es.readArray([
[{ name: 'item1' }, { name: 'item2' }],
[{ name: 'item3' }]
])
.pipe(propertyExtractor)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['item1_processed', 'item2_processed', 'item3_processed']
}));Breaks up a stream by a delimiter and reassembles it so that each split part becomes a separate chunk.
/**
* Split stream data by delimiter (string or regex)
* @param {string|RegExp} matcher - Delimiter to split on (defaults to '\n')
* @returns {TransformStream} Stream that emits split chunks
*/
function split(matcher);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Split on newlines (default)
fs.createReadStream('file.txt')
.pipe(es.split())
.pipe(es.map(function(line, cb) {
console.log('Line:', line);
cb(null, line);
}));
// Split on custom delimiter
const csvSplitter = es.split(',');
es.readArray(['a,b,c', 'd,e,f'])
.pipe(csvSplitter)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['a', 'b', 'c', 'd', 'e', 'f']
}));
// Keep line breaks using regex
fs.createReadStream('file.txt')
.pipe(es.split(/(\r?\n)/))
.pipe(es.map(function(chunk, cb) {
// Chunks will include line breaks
cb(null, chunk);
}));
// Split on regex pattern
const wordSplitter = es.split(/\s+/);All transformation functions include built-in error handling:
Streams continue processing after errors unless explicitly destroyed.