Construct pipes of streams of events with functional programming patterns for Node.js
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Functions for creating readable and writable streams from various data sources with proper pause/resume support and lifecycle management.
const es = require('event-stream');Creates a readable stream from an array, emitting each item as a data event while respecting pause and resume.
/**
* Create a readable stream from an array
* @param {Array} array - Array of items to emit as stream data
* @returns {ReadableStream} Stream that emits array items
* @throws {Error} If array parameter is not an array
*/
function readArray(array);Usage Examples:
const es = require('event-stream');
// Create stream from array
const numberStream = es.readArray([1, 2, 3, 4, 5]);
numberStream.pipe(es.writeArray(function(err, result) {
console.log(result); // [1, 2, 3, 4, 5]
}));
// With objects
const users = [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
];
es.readArray(users)
.pipe(es.map(function(user, cb) {
cb(null, user.name.toUpperCase());
}))
.pipe(es.writeArray(function(err, names) {
console.log(names); // ['ALICE', 'BOB']
}));Creates a writable stream that collects all data events into an array and calls a callback when the stream ends.
/**
* Create a writable stream that collects data into an array
* @param {Function} callback - Called with (error, array) when stream ends
* @returns {WritableStream} Stream that collects all written data
* @throws {Error} If callback is not a function
*/
function writeArray(callback);
// Alias for writeArray
function collect(callback);Usage Examples:
const es = require('event-stream');
// Collect stream data
const collector = es.writeArray(function(err, array) {
if (err) throw err;
console.log('Collected:', array);
});
// Write data to collector
collector.write('hello');
collector.write('world');
collector.end(); // Triggers callback with ['hello', 'world']
// Use with pipe
es.readArray(['a', 'b', 'c'])
.pipe(es.collect(function(err, result) {
console.log(result); // ['a', 'b', 'c']
}));Creates a readable stream that calls an async function repeatedly while the stream is not paused, allowing for dynamic data generation.
/**
* Create a readable stream from an async function
* @param {Function} func - Function called with (count, callback)
* @param {boolean} continueOnError - Whether to continue on errors (optional)
* @returns {ReadableStream} Stream that calls function for data
* @throws {Error} If func is not a function
*/
function readable(func, continueOnError);Usage Examples:
const es = require('event-stream');
// Generate sequential numbers
const numberGenerator = es.readable(function(count, callback) {
if (count >= 5) {
return this.emit('end'); // Stop after 5 items
}
// Simulate async operation
setTimeout(() => {
callback(null, count * 2); // Emit doubled count
}, 100);
});
numberGenerator.pipe(es.writeArray(function(err, result) {
console.log(result); // [0, 2, 4, 6, 8]
}));
// With error handling
const errorProneStream = es.readable(function(count, callback) {
if (count === 3) {
return callback(new Error('Test error'));
}
if (count >= 5) {
return this.emit('end');
}
callback(null, count);
}, true); // Continue on error
errorProneStream.on('error', function(err) {
console.log('Error caught:', err.message);
});Creates a transform stream with optional write and end functions, providing the foundation for most synchronous streams in event-stream.
/**
* Create a through stream with custom write and end handlers
* @param {Function} writeFunction - Called for each data chunk with (data)
* @param {Function} endFunction - Called when stream ends
* @returns {TransformStream} Through stream with custom handlers
*/
function through(writeFunction, endFunction);Usage Examples:
const es = require('event-stream');
// Simple pass-through
const passThrough = es.through(function(data) {
this.emit('data', data); // Re-emit data
});
// Transform data
const doubler = es.through(
function write(data) {
this.emit('data', data * 2);
},
function end() {
this.emit('end');
}
);
es.readArray([1, 2, 3])
.pipe(doubler)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6]
}));
// With flow control
const pauser = es.through(function(data) {
if (data > 3) {
this.pause(); // Pause stream
setTimeout(() => this.resume(), 1000); // Resume after delay
}
this.emit('data', data);
});All created streams have standard Node.js stream properties and methods: