Construct pipes of streams of events with functional programming patterns for Node.js
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Specialized functions for parsing JSON, stringifying objects, text replacement, and data collection with robust error handling and format conversion.
const es = require('event-stream');Parses JSON chunks in a stream, typically used after splitting data into lines. Provides configurable error handling for malformed JSON.
/**
* Parse JSON chunks with configurable error handling
* @param {Object} options - Configuration options
* @param {boolean} options.error - If true, emit error events; if false, log to console.error
* @returns {TransformStream} Stream that parses JSON data
*/
function parse(options);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Parse JSON lines
fs.createReadStream('data.jsonl')
.pipe(es.split())
.pipe(es.parse())
.pipe(es.map(function(obj, cb) {
console.log('Parsed object:', obj);
cb(null, obj);
}));
// With error emission
const errorParser = es.parse({ error: true });
errorParser.on('error', function(err) {
console.error('Parse error:', err.message);
});
es.readArray(['{"valid": true}', 'invalid json', '{"also": "valid"}'])
.pipe(errorParser)
.pipe(es.writeArray(function(err, result) {
console.log('Valid objects:', result);
}));
// Default error logging (to console.error)
es.readArray(['{"name": "Alice"}', 'broken', '{"name": "Bob"}'])
.pipe(es.parse()) // Logs errors but continues processing
.pipe(es.writeArray(function(err, result) {
console.log(result); // [{"name": "Alice"}, {"name": "Bob"}]
}));
// Empty lines are ignored
const jsonData = [
'{"id": 1}',
'', // Empty line - ignored
' ', // Whitespace only - ignored
'{"id": 2}'
];Converts JavaScript objects to JSON strings with newlines, making them compatible with parse() and line-based processing.
/**
* Convert objects to JSON strings with newlines
* @returns {TransformStream} Stream that stringifies objects
*/
function stringify();Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Stringify objects to file
const objects = [
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
{ name: 'Charlie', age: 35 }
];
es.readArray(objects)
.pipe(es.stringify())
.pipe(fs.createWriteStream('output.jsonl'));
// Round-trip with parse
es.readArray(objects)
.pipe(es.stringify())
.pipe(es.split())
.pipe(es.parse())
.pipe(es.writeArray(function(err, result) {
console.log('Round-trip result:', result);
}));
// Handle Buffer objects
const bufferData = [
Buffer.from('hello'),
{ text: 'world' },
Buffer.from('buffer data')
];
es.readArray(bufferData)
.pipe(es.stringify())
.pipe(es.writeArray(function(err, result) {
// Buffers are converted to strings before JSON.stringify
console.log(result);
}));Replaces all occurrences of a string or regular expression with a replacement string, working like string.split().join() but for streams.
/**
* Replace occurrences of text in stream data
* @param {string|RegExp} from - Text or pattern to replace
* @param {string} to - Replacement text
* @returns {TransformStream} Stream that performs text replacement
*/
function replace(from, to);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Simple string replacement
fs.createReadStream('template.txt')
.pipe(es.replace('{{NAME}}', 'Alice'))
.pipe(es.replace('{{AGE}}', '30'))
.pipe(fs.createWriteStream('output.txt'));
// Regular expression replacement
const htmlEscaper = es.replace(/[<>&"]/g, function(match) {
const escapes = {
'<': '<',
'>': '>',
'&': '&',
'"': '"'
};
return escapes[match];
});
es.readArray(['<script>alert("hello")</script>'])
.pipe(htmlEscaper)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['<script>alert("hello")</script>']
}));
// Multiple replacements in pipeline
const textProcessor = es.pipeline(
es.replace(/\s+/g, ' '), // Normalize whitespace
es.replace(/^\s+|\s+$/g, ''), // Trim
es.replace(/old/g, 'new') // Replace content
);
// Case-insensitive replacement
const caseInsensitive = es.replace(/hello/gi, 'hi');Joins stream chunks with a separator, similar to Array.join(). Also provides legacy callback-based functionality.
/**
* Join stream chunks with separator
* @param {string} separator - String to insert between chunks
* @returns {TransformStream} Stream that joins chunks
*/
function join(separator);
/**
* Legacy: wait for all data and call callback (same as wait())
* @param {Function} callback - Function called with complete data
* @returns {TransformStream} Stream that collects and calls callback
*/
function join(callback);Usage Examples:
const es = require('event-stream');
// Join with separator
es.readArray(['apple', 'banana', 'cherry'])
.pipe(es.join(', '))
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['apple, banana, cherry']
}));
// Join lines with newlines
es.readArray(['line 1', 'line 2', 'line 3'])
.pipe(es.join('\n'))
.pipe(process.stdout);
// Legacy callback usage (equivalent to wait())
es.readArray(['hello', ' ', 'world'])
.pipe(es.join(function(err, result) {
console.log('Complete text:', result); // 'hello world'
}));
// Use in text processing pipeline
const csvFormatter = es.pipeline(
es.split(','), // Split CSV fields
es.mapSync(field => field.trim()), // Clean fields
es.join(' | ') // Join with pipe separator
);Waits for the stream to end and joins all chunks into a single string or buffer, with optional callback for the complete data.
/**
* Collect all stream chunks into single string/buffer
* @param {Function} callback - Optional callback called with complete data
* @returns {TransformStream} Stream that emits single data event with all chunks
*/
function wait(callback);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Read entire file as single string
fs.createReadStream('document.txt')
.pipe(es.wait(function(err, body) {
if (err) throw err;
console.log('File contents:', body);
}));
// Collect transformed data
es.readArray(['hello', ' ', 'world', '!'])
.pipe(es.mapSync(chunk => chunk.toUpperCase()))
.pipe(es.wait(function(err, result) {
console.log(result); // 'HELLO WORLD!'
}));
// Buffer handling
const binaryData = [
Buffer.from('hello'),
Buffer.from(' '),
Buffer.from('world')
];
es.readArray(binaryData)
.pipe(es.wait(function(err, buffer) {
console.log(buffer.toString()); // 'hello world'
console.log('Buffer length:', buffer.length);
}));
// Without callback - emits single data event
es.readArray(['a', 'b', 'c'])
.pipe(es.wait())
.on('data', function(completeData) {
console.log('All data:', completeData); // 'abc'
});The data processing functions handle various data formats:
Each function provides different error handling approaches: