Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Operations for working with streams of streams, including flattening, parallel processing, and merging.
Operations for working with nested streams and flattening them into single streams.
/**
* Map each value to a stream and flatten the results
* @param {Function} fn - Function that returns a stream for each value
* @returns {Stream} Flattened stream of results
*/
Stream.prototype.flatMap(fn);
/**
* Flatten a stream of streams/arrays sequentially (one level deep)
* @returns {Stream} Flattened stream
*/
Stream.prototype.sequence();
/**
* Alias for sequence()
* @returns {Stream} Flattened stream
*/
Stream.prototype.series();
/**
* Recursively flatten nested streams and arrays
* @returns {Stream} Completely flattened stream
*/
Stream.prototype.flatten();Usage Examples:
// FlatMap for async operations
const readFile = _.wrapCallback(require('fs').readFile);
_(['file1.txt', 'file2.txt'])
.flatMap(filename => readFile(filename))
.each(console.log); // Contents of both files
// Sequence streams
_([_([1, 2]), _([3, 4]), _([5, 6])])
.sequence()
.toArray(console.log); // [1, 2, 3, 4, 5, 6]
// Flatten nested structures
_([1, [2, 3], [[4]], [[[5]]]])
.flatten()
.toArray(console.log); // [1, 2, 3, 4, 5]Process multiple streams concurrently with controlled parallelism.
/**
* Process streams in parallel with concurrency limit
* @param {number} concurrency - Maximum concurrent streams
* @returns {Stream} Results in original order
*/
Stream.prototype.parallel(concurrency);
/**
* Merge streams with concurrency limit
* @param {number} limit - Maximum concurrent streams
* @returns {Stream} Merged results
*/
Stream.prototype.mergeWithLimit(limit);
/**
* Merge all streams without order preservation
* @returns {Stream} Merged results
*/
Stream.prototype.merge();Usage Examples:
const readFile = _.wrapCallback(require('fs').readFile);
// Read files in parallel, preserve order
_(['file1.txt', 'file2.txt', 'file3.txt'])
.map(readFile)
.parallel(2) // Max 2 concurrent reads
.each(console.log); // Results in original order
// Merge streams without order preservation
const stream1 = _([1, 2, 3]);
const stream2 = _([4, 5, 6]);
_([stream1, stream2])
.merge()
.toArray(console.log); // [1, 4, 2, 5, 3, 6] (order may vary)Create multiple consumers for a single stream with shared backpressure.
/**
* Fork stream for multiple consumers with shared backpressure
* @returns {Stream} Forked stream
*/
Stream.prototype.fork();
/**
* Create passive observer that doesn't affect source stream
* @returns {Stream} Observer stream
*/
Stream.prototype.observe();Usage Examples:
const source = _([1, 2, 3, 4]);
// Fork for parallel processing
const fork1 = source.fork().filter(x => x % 2 === 0);
const fork2 = source.fork().map(x => x * 2);
fork1.each(x => console.log('Even:', x));
fork2.each(x => console.log('Doubled:', x));
// Observer doesn't affect main stream
const main = source.map(x => x * 10);
const observer = source.observe().each(x => console.log('Observed:', x));
main.each(x => console.log('Main:', x));Combine multiple streams in various ways.
/**
* Concatenate another stream to the end
* @param {Stream|Array} other - Stream to concatenate
* @returns {Stream} Concatenated stream
*/
Stream.prototype.concat(other);
/**
* Zip with another stream into pairs
* @param {Stream|Array} other - Stream to zip with
* @returns {Stream} Stream of [value1, value2] pairs
*/
Stream.prototype.zip(other);
/**
* Zip with multiple streams
* @param {Array} streams - Array of streams to zip with
* @returns {Stream} Stream of arrays with values from all streams
*/
Stream.prototype.zipAll(streams);
/**
* Zip a stream of streams (internal use)
* @returns {Stream} Zipped results
*/
Stream.prototype.zipAll0();Usage Examples:
// Concatenation
_([1, 2]).concat([3, 4]).toArray(console.log); // [1, 2, 3, 4]
// Zip two streams
_([1, 2, 3]).zip(['a', 'b', 'c']).toArray(console.log);
// [[1, 'a'], [2, 'b'], [3, 'c']]
// Zip multiple streams
_([1, 2, 3]).zipAll([['a', 'b', 'c'], ['x', 'y', 'z']]).toArray(console.log);
// [[1, 'a', 'x'], [2, 'b', 'y'], [3, 'c', 'z']]Change the source of a stream or provide alternatives.
/**
* Use alternative stream if current stream is empty
* @param {Stream|Array|Function} alternative - Alternative source
* @returns {Stream} Stream with fallback
*/
Stream.prototype.otherwise(alternative);
/**
* Append a value to the end of the stream
* @param {*} value - Value to append
* @returns {Stream} Stream with appended value
*/
Stream.prototype.append(value);Usage Examples:
// Use alternative for empty streams
_([]).otherwise(['default']).toArray(console.log); // ['default']
_([1, 2]).otherwise(['default']).toArray(console.log); // [1, 2]
// Alternative with function
_([]).otherwise(() => _(['generated'])).toArray(console.log); // ['generated']
// Append value
_([1, 2, 3]).append(4).toArray(console.log); // [1, 2, 3, 4]Transform streams using custom functions or Node.js duplex streams.
/**
* Transform using function or Node.js duplex stream
* @param {Function|Stream} transform - Transformation function or duplex stream
* @returns {Stream} Transformed stream
*/
Stream.prototype.through(transform);Usage Examples:
// Transform with function
function doubleAndFilter(stream) {
return stream.map(x => x * 2).filter(x => x > 5);
}
_([1, 2, 3, 4]).through(doubleAndFilter).toArray(console.log); // [6, 8]
// Transform with Node.js duplex stream
const zlib = require('zlib');
const fs = require('fs');
_(fs.createReadStream('file.txt'))
.through(zlib.createGzip())
.pipe(fs.createWriteStream('file.txt.gz'));Create reusable transformation pipelines.
/**
* Create a reusable transformation pipeline
* @param {...Function} transforms - Series of transformation functions
* @returns {Stream} Pipeline stream
*/
function _.pipeline(...transforms);Usage Examples:
// Create reusable pipeline
const processNumbers = _.pipeline(
_.map(x => parseInt(x)),
_.filter(x => !isNaN(x)),
_.map(x => x * 2)
);
// Use pipeline
_(['1', '2', 'invalid', '3'])
.through(processNumbers)
.toArray(console.log); // [2, 4, 6]
// Pipeline as standalone transform
const readable = require('fs').createReadStream('numbers.txt');
const writable = require('fs').createWriteStream('processed.txt');
readable.pipe(processNumbers).pipe(writable);Highland excels at async control flow patterns similar to the async library:
// Series execution (like async.series)
_([task1, task2, task3]).nfcall([]).series();
// Parallel with limit (like async.parallelLimit)
_([task1, task2, task3]).nfcall([]).parallel(2);
// Apply each (like async.applyEach)
_([fn1, fn2, fn3]).nfcall(['arg1', 'arg2']);// Recover from errors in parallel processing
_(urls)
.map(url => _(fetch(url)).errors((err, push) => {
push(null, { url, error: err.message });
}))
.parallel(3)
.each(result => {
if (result.error) {
console.log('Failed:', result.url, result.error);
} else {
console.log('Success:', result);
}
});