Specialized functions for parsing JSON, stringifying objects, text replacement, and data collection with robust error handling and format conversion.
const es = require('event-stream');Parses JSON chunks in a stream, typically used after splitting data into lines. Provides configurable error handling for malformed JSON.
/**
* Parse JSON chunks with configurable error handling
* @param {Object} options - Configuration options
* @param {boolean} options.error - If true, emit error events; if false, log to console.error
* @returns {TransformStream} Stream that parses JSON data
*/
function parse(options);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Parse JSON lines
fs.createReadStream('data.jsonl')
.pipe(es.split())
.pipe(es.parse())
.pipe(es.map(function(obj, cb) {
console.log('Parsed object:', obj);
cb(null, obj);
}));
// With error emission
const errorParser = es.parse({ error: true });
errorParser.on('error', function(err) {
console.error('Parse error:', err.message);
});
es.readArray(['{"valid": true}', 'invalid json', '{"also": "valid"}'])
.pipe(errorParser)
.pipe(es.writeArray(function(err, result) {
console.log('Valid objects:', result);
}));
// Default error logging (to console.error)
es.readArray(['{"name": "Alice"}', 'broken', '{"name": "Bob"}'])
.pipe(es.parse()) // Logs errors but continues processing
.pipe(es.writeArray(function(err, result) {
console.log(result); // [{"name": "Alice"}, {"name": "Bob"}]
}));
// Empty lines are ignored
const jsonData = [
'{"id": 1}',
'', // Empty line - ignored
' ', // Whitespace only - ignored
'{"id": 2}'
];Converts JavaScript objects to JSON strings with newlines, making them compatible with parse() and line-based processing.
/**
* Convert objects to JSON strings with newlines
* @returns {TransformStream} Stream that stringifies objects
*/
function stringify();Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Stringify objects to file
const objects = [
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
{ name: 'Charlie', age: 35 }
];
es.readArray(objects)
.pipe(es.stringify())
.pipe(fs.createWriteStream('output.jsonl'));
// Round-trip with parse
es.readArray(objects)
.pipe(es.stringify())
.pipe(es.split())
.pipe(es.parse())
.pipe(es.writeArray(function(err, result) {
console.log('Round-trip result:', result);
}));
// Handle Buffer objects
const bufferData = [
Buffer.from('hello'),
{ text: 'world' },
Buffer.from('buffer data')
];
es.readArray(bufferData)
.pipe(es.stringify())
.pipe(es.writeArray(function(err, result) {
// Buffers are converted to strings before JSON.stringify
console.log(result);
}));Replaces all occurrences of a string or regular expression with a replacement string, working like string.split().join() but for streams.
/**
* Replace occurrences of text in stream data
* @param {string|RegExp} from - Text or pattern to replace
* @param {string} to - Replacement text
* @returns {TransformStream} Stream that performs text replacement
*/
function replace(from, to);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Simple string replacement
fs.createReadStream('template.txt')
.pipe(es.replace('{{NAME}}', 'Alice'))
.pipe(es.replace('{{AGE}}', '30'))
.pipe(fs.createWriteStream('output.txt'));
// Regular expression replacement
const htmlEscaper = es.replace(/[<>&"]/g, function(match) {
const escapes = {
'<': '<',
'>': '>',
'&': '&',
'"': '"'
};
return escapes[match];
});
es.readArray(['<script>alert("hello")</script>'])
.pipe(htmlEscaper)
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['<script>alert("hello")</script>']
}));
// Multiple replacements in pipeline
const textProcessor = es.pipeline(
es.replace(/\s+/g, ' '), // Normalize whitespace
es.replace(/^\s+|\s+$/g, ''), // Trim
es.replace(/old/g, 'new') // Replace content
);
// Case-insensitive replacement
const caseInsensitive = es.replace(/hello/gi, 'hi');Joins stream chunks with a separator, similar to Array.join(). Also provides legacy callback-based functionality.
/**
* Join stream chunks with separator
* @param {string} separator - String to insert between chunks
* @returns {TransformStream} Stream that joins chunks
*/
function join(separator);
/**
* Legacy: wait for all data and call callback (same as wait())
* @param {Function} callback - Function called with complete data
* @returns {TransformStream} Stream that collects and calls callback
*/
function join(callback);Usage Examples:
const es = require('event-stream');
// Join with separator
es.readArray(['apple', 'banana', 'cherry'])
.pipe(es.join(', '))
.pipe(es.writeArray(function(err, result) {
console.log(result); // ['apple, banana, cherry']
}));
// Join lines with newlines
es.readArray(['line 1', 'line 2', 'line 3'])
.pipe(es.join('\n'))
.pipe(process.stdout);
// Legacy callback usage (equivalent to wait())
es.readArray(['hello', ' ', 'world'])
.pipe(es.join(function(err, result) {
console.log('Complete text:', result); // 'hello world'
}));
// Use in text processing pipeline
const csvFormatter = es.pipeline(
es.split(','), // Split CSV fields
es.mapSync(field => field.trim()), // Clean fields
es.join(' | ') // Join with pipe separator
);Waits for the stream to end and joins all chunks into a single string or buffer, with optional callback for the complete data.
/**
* Collect all stream chunks into single string/buffer
* @param {Function} callback - Optional callback called with complete data
* @returns {TransformStream} Stream that emits single data event with all chunks
*/
function wait(callback);Usage Examples:
const es = require('event-stream');
const fs = require('fs');
// Read entire file as single string
fs.createReadStream('document.txt')
.pipe(es.wait(function(err, body) {
if (err) throw err;
console.log('File contents:', body);
}));
// Collect transformed data
es.readArray(['hello', ' ', 'world', '!'])
.pipe(es.mapSync(chunk => chunk.toUpperCase()))
.pipe(es.wait(function(err, result) {
console.log(result); // 'HELLO WORLD!'
}));
// Buffer handling
const binaryData = [
Buffer.from('hello'),
Buffer.from(' '),
Buffer.from('world')
];
es.readArray(binaryData)
.pipe(es.wait(function(err, buffer) {
console.log(buffer.toString()); // 'hello world'
console.log('Buffer length:', buffer.length);
}));
// Without callback - emits single data event
es.readArray(['a', 'b', 'c'])
.pipe(es.wait())
.on('data', function(completeData) {
console.log('All data:', completeData); // 'abc'
});The data processing functions handle various data formats:
Each function provides different error handling approaches: