CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-highland

Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

flow-control.mddocs/

Flow Control

Methods for controlling stream execution, timing, and data flow including throttling, debouncing, and backpressure management.

Capabilities

Rate Limiting

Control the rate of data flow through streams.

/**
 * Limit number of values per time window
 * @param {number} count - Maximum values per window
 * @param {number} ms - Time window in milliseconds
 * @returns {Stream} Rate-limited stream
 */
Stream.prototype.ratelimit(count, ms);

/**
 * Only emit one value per time interval
 * @param {number} ms - Minimum milliseconds between values
 * @returns {Stream} Throttled stream
 */
Stream.prototype.throttle(ms);

/**
 * Wait for quiet period before emitting latest value
 * @param {number} ms - Milliseconds to wait for quiet period
 * @returns {Stream} Debounced stream
 */
Stream.prototype.debounce(ms);

Usage Examples:

// Rate limiting - max 5 values per second
_([1, 2, 3, 4, 5, 6, 7, 8])
  .ratelimit(5, 1000)
  .each(console.log); // First 5 immediately, then 3 more after 1 second

// Throttling - max one value per 100ms
_('mousemove', document)
  .throttle(100)
  .each(handleMouseMove); // At most 10 events per second

// Debouncing - wait 300ms after last keypress
_('keyup', searchInput)
  .debounce(300)
  .each(performSearch); // Only search after user stops typing

Stream Consumption Control

Control how values are consumed from streams.

/**
 * Get latest value when queried (non-blocking)
 * @returns {Stream} Stream that provides latest value on demand
 */
Stream.prototype.latest();

Usage Examples:

// Always get latest mouse position when needed
const mousePosition = _('mousemove', document)
  .map(e => ({ x: e.clientX, y: e.clientY }))
  .latest();

// Some slow operation that needs current mouse position
function slowOperation() {
  return mousePosition.take(1).toPromise(Promise);
}

slowOperation().then(position => {
  console.log('Current mouse:', position);
});

Low-Level Stream Control

Direct control over stream consumption and generation.

/**
 * Low-level stream consumption with manual flow control
 * @param {Function} consumer - (err, value, push, next) => void
 * @returns {Stream} New stream controlled by consumer
 */
Stream.prototype.consume(consumer);

/**
 * Pull single value from stream
 * @param {Function} callback - (err, value) => void
 * @returns {void}
 */
Stream.prototype.pull(callback);

Usage Examples:

// Custom transform with consume
const customTransform = source => source.consume((err, x, push, next) => {
  if (err) {
    push(err);
    next();
  } else if (x === _.nil) {
    push(null, _.nil);
  } else {
    // Custom logic here
    if (x > 0) {
      push(null, x * 2);
    }
    next(); // Continue processing
  }
});

// Pull single values
const stream = _([1, 2, 3]);
stream.pull((err, value) => {
  console.log('First value:', value); // 1
  
  stream.pull((err, value) => {
    console.log('Second value:', value); // 2
  });
});

Stream State Management

Control stream execution state.

/**
 * Pause stream processing
 * @returns {void}
 */
Stream.prototype.pause();

/**
 * Resume stream processing
 * @returns {void}
 */
Stream.prototype.resume();

/**
 * End stream by emitting nil
 * @returns {void}
 */
Stream.prototype.end();

/**
 * Destroy stream and clean up resources
 * @returns {void}
 */
Stream.prototype.destroy();

/**
 * Write value to stream (for writable streams)
 * @param {*} value - Value to write
 * @returns {boolean} False if stream is paused
 */
Stream.prototype.write(value);

Usage Examples:

// Manual stream control
const stream = _([1, 2, 3, 4, 5]);

stream.each(value => {
  console.log(value);
  if (value === 3) {
    stream.pause();
    setTimeout(() => {
      console.log('Resuming...');
      stream.resume();
    }, 1000);
  }
});

// Writable stream pattern
const writableStream = _();
writableStream.each(console.log);

writableStream.write(1);
writableStream.write(2);
writableStream.end(); // or writableStream.write(_.nil);

Error Handling

Control error propagation and recovery in streams.

/**
 * Handle errors without stopping stream
 * @param {Function} handler - (err, push) => void
 * @returns {Stream} Stream with error handling
 */
Stream.prototype.errors(handler);

/**
 * Stop stream on first error
 * @param {Function} handler - (err, push) => void
 * @returns {Stream} Stream that stops on error
 */
Stream.prototype.stopOnError(handler);

Usage Examples:

// Continue processing despite errors
_([1, 'invalid', 3, 'bad', 5])
  .map(x => {
    if (typeof x !== 'number') throw new Error('Not a number');
    return x * 2;
  })
  .errors((err, push) => {
    console.log('Error:', err.message);
    push(null, 0); // Emit default value
  })
  .toArray(console.log); // [2, 0, 6, 0, 10]

// Stop on first error
_([1, 2, 'invalid', 4, 5])
  .map(x => {
    if (typeof x !== 'number') throw new Error('Not a number');
    return x * 2;
  })
  .stopOnError(err => console.log('Stopped due to:', err.message))
  .toArray(console.log); // [2, 4] (stops at 'invalid')

Advanced Control Patterns

Combine multiple control mechanisms for sophisticated flow control.

/**
 * Filter using async predicate function
 * @param {Function} predicate - Function returning stream of boolean
 * @returns {Stream} Filtered stream
 */
Stream.prototype.flatFilter(predicate);

Usage Examples:

// Async filtering
const checkFileExists = _.wrapCallback(require('fs').access);

_(['file1.txt', 'file2.txt', 'missing.txt'])
  .flatFilter(filename => 
    checkFileExists(filename)
      .map(() => true)
      .errors(() => _(false))
  )
  .toArray(console.log); // Only existing files

Backpressure and Flow Control

Highland automatically manages backpressure, but you can influence it:

// Source with backpressure awareness
const slowSource = _(function(push, next) {
  // Only push when consumer is ready
  setTimeout(() => {
    push(null, Math.random());
    next(); // Signal ready for more
  }, 100);
});

// Consumer that controls flow
slowSource
  .map(heavyProcessing) // Slow operation
  .each(result => {
    // Consumer automatically provides backpressure
    console.log(result);
  });

Stream Coordination

Coordinate multiple streams with different timing:

// Wait for all streams to have data
const stream1 = _([1, 2, 3]);
const stream2 = _().write('a').write('b');
const stream3 = _(Promise.resolve('ready'));

_([stream1, stream2, stream3])
  .zipAll0() // Wait for all to emit
  .each(([num, letter, status]) => {
    console.log('Synchronized:', num, letter, status);
  });

Performance Considerations

Control stream performance with these patterns:

// Batch processing for efficiency
_([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .batch(3)
  .map(batch => {
    // Process batch efficiently
    return batch.reduce((sum, x) => sum + x, 0);
  })
  .toArray(console.log); // [6, 15, 24, 10]

// Controlled parallelism
const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];
_(urls)
  .map(fetchUrl) // Create stream of promise streams
  .parallel(2) // Only 2 concurrent requests
  .errors((err, push) => push(null, null)) // Handle failures
  .compact() // Remove failed requests
  .toArray(console.log);

docs

advanced-transforms.md

basic-transforms.md

consumption.md

flow-control.md

functional-programming.md

higher-order-streams.md

index.md

stream-creation.md

tile.json