CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-highland

Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

higher-order-streams.mddocs/

Higher-Order Streams

Operations for working with streams of streams, including flattening, parallel processing, and merging.

Capabilities

Stream Flattening

Operations for working with nested streams and flattening them into single streams.

/**
 * Map each value to a stream and flatten the results
 * @param {Function} fn - Function that returns a stream for each value
 * @returns {Stream} Flattened stream of results
 */
Stream.prototype.flatMap(fn);

/**
 * Flatten a stream of streams/arrays sequentially (one level deep)
 * @returns {Stream} Flattened stream
 */
Stream.prototype.sequence();

/**
 * Alias for sequence()
 * @returns {Stream} Flattened stream
 */
Stream.prototype.series();

/**
 * Recursively flatten nested streams and arrays
 * @returns {Stream} Completely flattened stream
 */
Stream.prototype.flatten();

Usage Examples:

// FlatMap for async operations
const readFile = _.wrapCallback(require('fs').readFile);
_(['file1.txt', 'file2.txt'])
  .flatMap(filename => readFile(filename))
  .each(console.log); // Contents of both files

// Sequence streams
_([_([1, 2]), _([3, 4]), _([5, 6])])
  .sequence()
  .toArray(console.log); // [1, 2, 3, 4, 5, 6]

// Flatten nested structures
_([1, [2, 3], [[4]], [[[5]]]])
  .flatten()
  .toArray(console.log); // [1, 2, 3, 4, 5]

Parallel Processing

Process multiple streams concurrently with controlled parallelism.

/**
 * Process streams in parallel with concurrency limit
 * @param {number} concurrency - Maximum concurrent streams
 * @returns {Stream} Results in original order
 */
Stream.prototype.parallel(concurrency);

/**
 * Merge streams with concurrency limit
 * @param {number} limit - Maximum concurrent streams
 * @returns {Stream} Merged results
 */
Stream.prototype.mergeWithLimit(limit);

/**
 * Merge all streams without order preservation
 * @returns {Stream} Merged results
 */
Stream.prototype.merge();

Usage Examples:

const readFile = _.wrapCallback(require('fs').readFile);

// Read files in parallel, preserve order
_(['file1.txt', 'file2.txt', 'file3.txt'])
  .map(readFile)
  .parallel(2) // Max 2 concurrent reads
  .each(console.log); // Results in original order

// Merge streams without order preservation
const stream1 = _([1, 2, 3]);
const stream2 = _([4, 5, 6]);
_([stream1, stream2])
  .merge()
  .toArray(console.log); // [1, 4, 2, 5, 3, 6] (order may vary)

Stream Forking

Create multiple consumers for a single stream with shared backpressure.

/**
 * Fork stream for multiple consumers with shared backpressure
 * @returns {Stream} Forked stream
 */
Stream.prototype.fork();

/**
 * Create passive observer that doesn't affect source stream
 * @returns {Stream} Observer stream
 */
Stream.prototype.observe();

Usage Examples:

const source = _([1, 2, 3, 4]);

// Fork for parallel processing
const fork1 = source.fork().filter(x => x % 2 === 0);
const fork2 = source.fork().map(x => x * 2);

fork1.each(x => console.log('Even:', x));
fork2.each(x => console.log('Doubled:', x));

// Observer doesn't affect main stream
const main = source.map(x => x * 10);
const observer = source.observe().each(x => console.log('Observed:', x));

main.each(x => console.log('Main:', x));

Stream Combination

Combine multiple streams in various ways.

/**
 * Concatenate another stream to the end
 * @param {Stream|Array} other - Stream to concatenate
 * @returns {Stream} Concatenated stream
 */
Stream.prototype.concat(other);

/**
 * Zip with another stream into pairs
 * @param {Stream|Array} other - Stream to zip with
 * @returns {Stream} Stream of [value1, value2] pairs
 */
Stream.prototype.zip(other);

/**
 * Zip with multiple streams
 * @param {Array} streams - Array of streams to zip with
 * @returns {Stream} Stream of arrays with values from all streams
 */
Stream.prototype.zipAll(streams);

/**
 * Zip a stream of streams (internal use)
 * @returns {Stream} Zipped results
 */
Stream.prototype.zipAll0();

Usage Examples:

// Concatenation
_([1, 2]).concat([3, 4]).toArray(console.log); // [1, 2, 3, 4]

// Zip two streams
_([1, 2, 3]).zip(['a', 'b', 'c']).toArray(console.log);
// [[1, 'a'], [2, 'b'], [3, 'c']]

// Zip multiple streams
_([1, 2, 3]).zipAll([['a', 'b', 'c'], ['x', 'y', 'z']]).toArray(console.log);
// [[1, 'a', 'x'], [2, 'b', 'y'], [3, 'c', 'z']]

Stream Redirection

Change the source of a stream or provide alternatives.

/**
 * Use alternative stream if current stream is empty
 * @param {Stream|Array|Function} alternative - Alternative source
 * @returns {Stream} Stream with fallback
 */
Stream.prototype.otherwise(alternative);

/**
 * Append a value to the end of the stream
 * @param {*} value - Value to append
 * @returns {Stream} Stream with appended value
 */
Stream.prototype.append(value);

Usage Examples:

// Use alternative for empty streams
_([]).otherwise(['default']).toArray(console.log); // ['default']
_([1, 2]).otherwise(['default']).toArray(console.log); // [1, 2]

// Alternative with function
_([]).otherwise(() => _(['generated'])).toArray(console.log); // ['generated']

// Append value
_([1, 2, 3]).append(4).toArray(console.log); // [1, 2, 3, 4]

Stream Transformation

Transform streams using custom functions or Node.js duplex streams.

/**
 * Transform using function or Node.js duplex stream
 * @param {Function|Stream} transform - Transformation function or duplex stream
 * @returns {Stream} Transformed stream
 */
Stream.prototype.through(transform);

Usage Examples:

// Transform with function
function doubleAndFilter(stream) {
  return stream.map(x => x * 2).filter(x => x > 5);
}

_([1, 2, 3, 4]).through(doubleAndFilter).toArray(console.log); // [6, 8]

// Transform with Node.js duplex stream
const zlib = require('zlib');
const fs = require('fs');

_(fs.createReadStream('file.txt'))
  .through(zlib.createGzip())
  .pipe(fs.createWriteStream('file.txt.gz'));

Pipeline Creation

Create reusable transformation pipelines.

/**
 * Create a reusable transformation pipeline
 * @param {...Function} transforms - Series of transformation functions
 * @returns {Stream} Pipeline stream
 */
function _.pipeline(...transforms);

Usage Examples:

// Create reusable pipeline
const processNumbers = _.pipeline(
  _.map(x => parseInt(x)),
  _.filter(x => !isNaN(x)),
  _.map(x => x * 2)
);

// Use pipeline
_(['1', '2', 'invalid', '3'])
  .through(processNumbers)
  .toArray(console.log); // [2, 4, 6]

// Pipeline as standalone transform
const readable = require('fs').createReadStream('numbers.txt');
const writable = require('fs').createWriteStream('processed.txt');

readable.pipe(processNumbers).pipe(writable);

Advanced Patterns

Async Control Flow

Highland excels at async control flow patterns similar to the async library:

// Series execution (like async.series)
_([task1, task2, task3]).nfcall([]).series();

// Parallel with limit (like async.parallelLimit)
_([task1, task2, task3]).nfcall([]).parallel(2);

// Apply each (like async.applyEach)
_([fn1, fn2, fn3]).nfcall(['arg1', 'arg2']);

Error Recovery in Streams

// Recover from errors in parallel processing
_(urls)
  .map(url => _(fetch(url)).errors((err, push) => {
    push(null, { url, error: err.message });
  }))
  .parallel(3)
  .each(result => {
    if (result.error) {
      console.log('Failed:', result.url, result.error);
    } else {
      console.log('Success:', result);
    }
  });

docs

advanced-transforms.md

basic-transforms.md

consumption.md

flow-control.md

functional-programming.md

higher-order-streams.md

index.md

stream-creation.md

tile.json