Operations for working with streams of streams, including flattening, parallel processing, and merging.
Operations for working with nested streams and flattening them into single streams.
/**
* Map each value to a stream and flatten the results
* @param {Function} fn - Function that returns a stream for each value
* @returns {Stream} Flattened stream of results
*/
Stream.prototype.flatMap(fn);
/**
* Flatten a stream of streams/arrays sequentially (one level deep)
* @returns {Stream} Flattened stream
*/
Stream.prototype.sequence();
/**
* Alias for sequence()
* @returns {Stream} Flattened stream
*/
Stream.prototype.series();
/**
* Recursively flatten nested streams and arrays
* @returns {Stream} Completely flattened stream
*/
Stream.prototype.flatten();Usage Examples:
// FlatMap for async operations
const readFile = _.wrapCallback(require('fs').readFile);
_(['file1.txt', 'file2.txt'])
.flatMap(filename => readFile(filename))
.each(console.log); // Contents of both files
// Sequence streams
_([_([1, 2]), _([3, 4]), _([5, 6])])
.sequence()
.toArray(console.log); // [1, 2, 3, 4, 5, 6]
// Flatten nested structures
_([1, [2, 3], [[4]], [[[5]]]])
.flatten()
.toArray(console.log); // [1, 2, 3, 4, 5]Process multiple streams concurrently with controlled parallelism.
/**
* Process streams in parallel with concurrency limit
* @param {number} concurrency - Maximum concurrent streams
* @returns {Stream} Results in original order
*/
Stream.prototype.parallel(concurrency);
/**
* Merge streams with concurrency limit
* @param {number} limit - Maximum concurrent streams
* @returns {Stream} Merged results
*/
Stream.prototype.mergeWithLimit(limit);
/**
* Merge all streams without order preservation
* @returns {Stream} Merged results
*/
Stream.prototype.merge();Usage Examples:
const readFile = _.wrapCallback(require('fs').readFile);
// Read files in parallel, preserve order
_(['file1.txt', 'file2.txt', 'file3.txt'])
.map(readFile)
.parallel(2) // Max 2 concurrent reads
.each(console.log); // Results in original order
// Merge streams without order preservation
const stream1 = _([1, 2, 3]);
const stream2 = _([4, 5, 6]);
_([stream1, stream2])
.merge()
.toArray(console.log); // [1, 4, 2, 5, 3, 6] (order may vary)Create multiple consumers for a single stream with shared backpressure.
/**
* Fork stream for multiple consumers with shared backpressure
* @returns {Stream} Forked stream
*/
Stream.prototype.fork();
/**
* Create passive observer that doesn't affect source stream
* @returns {Stream} Observer stream
*/
Stream.prototype.observe();Usage Examples:
const source = _([1, 2, 3, 4]);
// Fork for parallel processing
const fork1 = source.fork().filter(x => x % 2 === 0);
const fork2 = source.fork().map(x => x * 2);
fork1.each(x => console.log('Even:', x));
fork2.each(x => console.log('Doubled:', x));
// Observer doesn't affect main stream
const main = source.map(x => x * 10);
const observer = source.observe().each(x => console.log('Observed:', x));
main.each(x => console.log('Main:', x));Combine multiple streams in various ways.
/**
* Concatenate another stream to the end
* @param {Stream|Array} other - Stream to concatenate
* @returns {Stream} Concatenated stream
*/
Stream.prototype.concat(other);
/**
* Zip with another stream into pairs
* @param {Stream|Array} other - Stream to zip with
* @returns {Stream} Stream of [value1, value2] pairs
*/
Stream.prototype.zip(other);
/**
* Zip with multiple streams
* @param {Array} streams - Array of streams to zip with
* @returns {Stream} Stream of arrays with values from all streams
*/
Stream.prototype.zipAll(streams);
/**
* Zip a stream of streams (internal use)
* @returns {Stream} Zipped results
*/
Stream.prototype.zipAll0();Usage Examples:
// Concatenation
_([1, 2]).concat([3, 4]).toArray(console.log); // [1, 2, 3, 4]
// Zip two streams
_([1, 2, 3]).zip(['a', 'b', 'c']).toArray(console.log);
// [[1, 'a'], [2, 'b'], [3, 'c']]
// Zip multiple streams
_([1, 2, 3]).zipAll([['a', 'b', 'c'], ['x', 'y', 'z']]).toArray(console.log);
// [[1, 'a', 'x'], [2, 'b', 'y'], [3, 'c', 'z']]Change the source of a stream or provide alternatives.
/**
* Use alternative stream if current stream is empty
* @param {Stream|Array|Function} alternative - Alternative source
* @returns {Stream} Stream with fallback
*/
Stream.prototype.otherwise(alternative);
/**
* Append a value to the end of the stream
* @param {*} value - Value to append
* @returns {Stream} Stream with appended value
*/
Stream.prototype.append(value);Usage Examples:
// Use alternative for empty streams
_([]).otherwise(['default']).toArray(console.log); // ['default']
_([1, 2]).otherwise(['default']).toArray(console.log); // [1, 2]
// Alternative with function
_([]).otherwise(() => _(['generated'])).toArray(console.log); // ['generated']
// Append value
_([1, 2, 3]).append(4).toArray(console.log); // [1, 2, 3, 4]Transform streams using custom functions or Node.js duplex streams.
/**
* Transform using function or Node.js duplex stream
* @param {Function|Stream} transform - Transformation function or duplex stream
* @returns {Stream} Transformed stream
*/
Stream.prototype.through(transform);Usage Examples:
// Transform with function
function doubleAndFilter(stream) {
return stream.map(x => x * 2).filter(x => x > 5);
}
_([1, 2, 3, 4]).through(doubleAndFilter).toArray(console.log); // [6, 8]
// Transform with Node.js duplex stream
const zlib = require('zlib');
const fs = require('fs');
_(fs.createReadStream('file.txt'))
.through(zlib.createGzip())
.pipe(fs.createWriteStream('file.txt.gz'));Create reusable transformation pipelines.
/**
* Create a reusable transformation pipeline
* @param {...Function} transforms - Series of transformation functions
* @returns {Stream} Pipeline stream
*/
function _.pipeline(...transforms);Usage Examples:
// Create reusable pipeline
const processNumbers = _.pipeline(
_.map(x => parseInt(x)),
_.filter(x => !isNaN(x)),
_.map(x => x * 2)
);
// Use pipeline
_(['1', '2', 'invalid', '3'])
.through(processNumbers)
.toArray(console.log); // [2, 4, 6]
// Pipeline as standalone transform
const readable = require('fs').createReadStream('numbers.txt');
const writable = require('fs').createWriteStream('processed.txt');
readable.pipe(processNumbers).pipe(writable);Highland excels at async control flow patterns similar to the
async// Series execution (like async.series)
_([task1, task2, task3]).nfcall([]).series();
// Parallel with limit (like async.parallelLimit)
_([task1, task2, task3]).nfcall([]).parallel(2);
// Apply each (like async.applyEach)
_([fn1, fn2, fn3]).nfcall(['arg1', 'arg2']);// Recover from errors in parallel processing
_(urls)
.map(url => _(fetch(url)).errors((err, push) => {
push(null, { url, error: err.message });
}))
.parallel(3)
.each(result => {
if (result.error) {
console.log('Failed:', result.url, result.error);
} else {
console.log('Success:', result);
}
});