Functions for creating readable and writable streams from various data sources with proper pause/resume support and lifecycle management.
const es = require('event-stream');Creates a readable stream from an array, emitting each item as a data event while respecting pause and resume.
/**
* Create a readable stream from an array
* @param {Array} array - Array of items to emit as stream data
* @returns {ReadableStream} Stream that emits array items
* @throws {Error} If array parameter is not an array
*/
function readArray(array);Usage Examples:
const es = require('event-stream');
// Create stream from array
const numberStream = es.readArray([1, 2, 3, 4, 5]);
numberStream.pipe(es.writeArray(function(err, result) {
console.log(result); // [1, 2, 3, 4, 5]
}));
// With objects
const users = [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
];
es.readArray(users)
.pipe(es.map(function(user, cb) {
cb(null, user.name.toUpperCase());
}))
.pipe(es.writeArray(function(err, names) {
console.log(names); // ['ALICE', 'BOB']
}));Creates a writable stream that collects all data events into an array and calls a callback when the stream ends.
/**
* Create a writable stream that collects data into an array
* @param {Function} callback - Called with (error, array) when stream ends
* @returns {WritableStream} Stream that collects all written data
* @throws {Error} If callback is not a function
*/
function writeArray(callback);
// Alias for writeArray
function collect(callback);Usage Examples:
const es = require('event-stream');
// Collect stream data
const collector = es.writeArray(function(err, array) {
if (err) throw err;
console.log('Collected:', array);
});
// Write data to collector
collector.write('hello');
collector.write('world');
collector.end(); // Triggers callback with ['hello', 'world']
// Use with pipe
es.readArray(['a', 'b', 'c'])
.pipe(es.collect(function(err, result) {
console.log(result); // ['a', 'b', 'c']
}));Creates a readable stream that calls an async function repeatedly while the stream is not paused, allowing for dynamic data generation.
/**
* Create a readable stream from an async function
* @param {Function} func - Function called with (count, callback)
* @param {boolean} continueOnError - Whether to continue on errors (optional)
* @returns {ReadableStream} Stream that calls function for data
* @throws {Error} If func is not a function
*/
function readable(func, continueOnError);Usage Examples:
const es = require('event-stream');
// Generate sequential numbers
const numberGenerator = es.readable(function(count, callback) {
if (count >= 5) {
return this.emit('end'); // Stop after 5 items
}
// Simulate async operation
setTimeout(() => {
callback(null, count * 2); // Emit doubled count
}, 100);
});
numberGenerator.pipe(es.writeArray(function(err, result) {
console.log(result); // [0, 2, 4, 6, 8]
}));
// With error handling
const errorProneStream = es.readable(function(count, callback) {
if (count === 3) {
return callback(new Error('Test error'));
}
if (count >= 5) {
return this.emit('end');
}
callback(null, count);
}, true); // Continue on error
errorProneStream.on('error', function(err) {
console.log('Error caught:', err.message);
});Creates a transform stream with optional write and end functions, providing the foundation for most synchronous streams in event-stream.
/**
* Create a through stream with custom write and end handlers
* @param {Function} writeFunction - Called for each data chunk with (data)
* @param {Function} endFunction - Called when stream ends
* @returns {TransformStream} Through stream with custom handlers
*/
function through(writeFunction, endFunction);Usage Examples:
const es = require('event-stream');
// Simple pass-through
const passThrough = es.through(function(data) {
this.emit('data', data); // Re-emit data
});
// Transform data
const doubler = es.through(
function write(data) {
this.emit('data', data * 2);
},
function end() {
this.emit('end');
}
);
es.readArray([1, 2, 3])
.pipe(doubler)
.pipe(es.writeArray(function(err, result) {
console.log(result); // [2, 4, 6]
}));
// With flow control
const pauser = es.through(function(data) {
if (data > 3) {
this.pause(); // Pause stream
setTimeout(() => this.resume(), 1000); // Resume after delay
}
this.emit('data', data);
});All created streams have standard Node.js stream properties and methods: