or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transforms.mdbasic-transforms.mdconsumption.mdflow-control.mdfunctional-programming.mdhigher-order-streams.mdindex.mdstream-creation.md
tile.json

higher-order-streams.mddocs/

Higher-Order Streams

Operations for working with streams of streams, including flattening, parallel processing, and merging.

Capabilities

Stream Flattening

Operations for working with nested streams and flattening them into single streams.

/**
 * Map each value to a stream and flatten the results
 * @param {Function} fn - Function that returns a stream for each value
 * @returns {Stream} Flattened stream of results
 */
Stream.prototype.flatMap(fn);

/**
 * Flatten a stream of streams/arrays sequentially (one level deep)
 * @returns {Stream} Flattened stream
 */
Stream.prototype.sequence();

/**
 * Alias for sequence()
 * @returns {Stream} Flattened stream
 */
Stream.prototype.series();

/**
 * Recursively flatten nested streams and arrays
 * @returns {Stream} Completely flattened stream
 */
Stream.prototype.flatten();

Usage Examples:

// FlatMap for async operations
const readFile = _.wrapCallback(require('fs').readFile);
_(['file1.txt', 'file2.txt'])
  .flatMap(filename => readFile(filename))
  .each(console.log); // Contents of both files

// Sequence streams
_([_([1, 2]), _([3, 4]), _([5, 6])])
  .sequence()
  .toArray(console.log); // [1, 2, 3, 4, 5, 6]

// Flatten nested structures
_([1, [2, 3], [[4]], [[[5]]]])
  .flatten()
  .toArray(console.log); // [1, 2, 3, 4, 5]

Parallel Processing

Process multiple streams concurrently with controlled parallelism.

/**
 * Process streams in parallel with concurrency limit
 * @param {number} concurrency - Maximum concurrent streams
 * @returns {Stream} Results in original order
 */
Stream.prototype.parallel(concurrency);

/**
 * Merge streams with concurrency limit
 * @param {number} limit - Maximum concurrent streams
 * @returns {Stream} Merged results
 */
Stream.prototype.mergeWithLimit(limit);

/**
 * Merge all streams without order preservation
 * @returns {Stream} Merged results
 */
Stream.prototype.merge();

Usage Examples:

const readFile = _.wrapCallback(require('fs').readFile);

// Read files in parallel, preserve order
_(['file1.txt', 'file2.txt', 'file3.txt'])
  .map(readFile)
  .parallel(2) // Max 2 concurrent reads
  .each(console.log); // Results in original order

// Merge streams without order preservation
const stream1 = _([1, 2, 3]);
const stream2 = _([4, 5, 6]);
_([stream1, stream2])
  .merge()
  .toArray(console.log); // [1, 4, 2, 5, 3, 6] (order may vary)

Stream Forking

Create multiple consumers for a single stream with shared backpressure.

/**
 * Fork stream for multiple consumers with shared backpressure
 * @returns {Stream} Forked stream
 */
Stream.prototype.fork();

/**
 * Create passive observer that doesn't affect source stream
 * @returns {Stream} Observer stream
 */
Stream.prototype.observe();

Usage Examples:

const source = _([1, 2, 3, 4]);

// Fork for parallel processing
const fork1 = source.fork().filter(x => x % 2 === 0);
const fork2 = source.fork().map(x => x * 2);

fork1.each(x => console.log('Even:', x));
fork2.each(x => console.log('Doubled:', x));

// Observer doesn't affect main stream
const main = source.map(x => x * 10);
const observer = source.observe().each(x => console.log('Observed:', x));

main.each(x => console.log('Main:', x));

Stream Combination

Combine multiple streams in various ways.

/**
 * Concatenate another stream to the end
 * @param {Stream|Array} other - Stream to concatenate
 * @returns {Stream} Concatenated stream
 */
Stream.prototype.concat(other);

/**
 * Zip with another stream into pairs
 * @param {Stream|Array} other - Stream to zip with
 * @returns {Stream} Stream of [value1, value2] pairs
 */
Stream.prototype.zip(other);

/**
 * Zip with multiple streams
 * @param {Array} streams - Array of streams to zip with
 * @returns {Stream} Stream of arrays with values from all streams
 */
Stream.prototype.zipAll(streams);

/**
 * Zip a stream of streams (internal use)
 * @returns {Stream} Zipped results
 */
Stream.prototype.zipAll0();

Usage Examples:

// Concatenation
_([1, 2]).concat([3, 4]).toArray(console.log); // [1, 2, 3, 4]

// Zip two streams
_([1, 2, 3]).zip(['a', 'b', 'c']).toArray(console.log);
// [[1, 'a'], [2, 'b'], [3, 'c']]

// Zip multiple streams
_([1, 2, 3]).zipAll([['a', 'b', 'c'], ['x', 'y', 'z']]).toArray(console.log);
// [[1, 'a', 'x'], [2, 'b', 'y'], [3, 'c', 'z']]

Stream Redirection

Change the source of a stream or provide alternatives.

/**
 * Use alternative stream if current stream is empty
 * @param {Stream|Array|Function} alternative - Alternative source
 * @returns {Stream} Stream with fallback
 */
Stream.prototype.otherwise(alternative);

/**
 * Append a value to the end of the stream
 * @param {*} value - Value to append
 * @returns {Stream} Stream with appended value
 */
Stream.prototype.append(value);

Usage Examples:

// Use alternative for empty streams
_([]).otherwise(['default']).toArray(console.log); // ['default']
_([1, 2]).otherwise(['default']).toArray(console.log); // [1, 2]

// Alternative with function
_([]).otherwise(() => _(['generated'])).toArray(console.log); // ['generated']

// Append value
_([1, 2, 3]).append(4).toArray(console.log); // [1, 2, 3, 4]

Stream Transformation

Transform streams using custom functions or Node.js duplex streams.

/**
 * Transform using function or Node.js duplex stream
 * @param {Function|Stream} transform - Transformation function or duplex stream
 * @returns {Stream} Transformed stream
 */
Stream.prototype.through(transform);

Usage Examples:

// Transform with function
function doubleAndFilter(stream) {
  return stream.map(x => x * 2).filter(x => x > 5);
}

_([1, 2, 3, 4]).through(doubleAndFilter).toArray(console.log); // [6, 8]

// Transform with Node.js duplex stream
const zlib = require('zlib');
const fs = require('fs');

_(fs.createReadStream('file.txt'))
  .through(zlib.createGzip())
  .pipe(fs.createWriteStream('file.txt.gz'));

Pipeline Creation

Create reusable transformation pipelines.

/**
 * Create a reusable transformation pipeline
 * @param {...Function} transforms - Series of transformation functions
 * @returns {Stream} Pipeline stream
 */
function _.pipeline(...transforms);

Usage Examples:

// Create reusable pipeline
const processNumbers = _.pipeline(
  _.map(x => parseInt(x)),
  _.filter(x => !isNaN(x)),
  _.map(x => x * 2)
);

// Use pipeline
_(['1', '2', 'invalid', '3'])
  .through(processNumbers)
  .toArray(console.log); // [2, 4, 6]

// Pipeline as standalone transform
const readable = require('fs').createReadStream('numbers.txt');
const writable = require('fs').createWriteStream('processed.txt');

readable.pipe(processNumbers).pipe(writable);

Advanced Patterns

Async Control Flow

Highland excels at async control flow patterns similar to the

async
library:

// Series execution (like async.series)
_([task1, task2, task3]).nfcall([]).series();

// Parallel with limit (like async.parallelLimit)
_([task1, task2, task3]).nfcall([]).parallel(2);

// Apply each (like async.applyEach)
_([fn1, fn2, fn3]).nfcall(['arg1', 'arg2']);

Error Recovery in Streams

// Recover from errors in parallel processing
_(urls)
  .map(url => _(fetch(url)).errors((err, push) => {
    push(null, { url, error: err.message });
  }))
  .parallel(3)
  .each(result => {
    if (result.error) {
      console.log('Failed:', result.url, result.error);
    } else {
      console.log('Success:', result);
    }
  });