or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transforms.mdbasic-transforms.mdconsumption.mdflow-control.mdfunctional-programming.mdhigher-order-streams.mdindex.mdstream-creation.md
tile.json

flow-control.mddocs/

Flow Control

Methods for controlling stream execution, timing, and data flow including throttling, debouncing, and backpressure management.

Capabilities

Rate Limiting

Control the rate of data flow through streams.

/**
 * Limit number of values per time window
 * @param {number} count - Maximum values per window
 * @param {number} ms - Time window in milliseconds
 * @returns {Stream} Rate-limited stream
 */
Stream.prototype.ratelimit(count, ms);

/**
 * Only emit one value per time interval
 * @param {number} ms - Minimum milliseconds between values
 * @returns {Stream} Throttled stream
 */
Stream.prototype.throttle(ms);

/**
 * Wait for quiet period before emitting latest value
 * @param {number} ms - Milliseconds to wait for quiet period
 * @returns {Stream} Debounced stream
 */
Stream.prototype.debounce(ms);

Usage Examples:

// Rate limiting - max 5 values per second
_([1, 2, 3, 4, 5, 6, 7, 8])
  .ratelimit(5, 1000)
  .each(console.log); // First 5 immediately, then 3 more after 1 second

// Throttling - max one value per 100ms
_('mousemove', document)
  .throttle(100)
  .each(handleMouseMove); // At most 10 events per second

// Debouncing - wait 300ms after last keypress
_('keyup', searchInput)
  .debounce(300)
  .each(performSearch); // Only search after user stops typing

Stream Consumption Control

Control how values are consumed from streams.

/**
 * Get latest value when queried (non-blocking)
 * @returns {Stream} Stream that provides latest value on demand
 */
Stream.prototype.latest();

Usage Examples:

// Always get latest mouse position when needed
const mousePosition = _('mousemove', document)
  .map(e => ({ x: e.clientX, y: e.clientY }))
  .latest();

// Some slow operation that needs current mouse position
function slowOperation() {
  return mousePosition.take(1).toPromise(Promise);
}

slowOperation().then(position => {
  console.log('Current mouse:', position);
});

Low-Level Stream Control

Direct control over stream consumption and generation.

/**
 * Low-level stream consumption with manual flow control
 * @param {Function} consumer - (err, value, push, next) => void
 * @returns {Stream} New stream controlled by consumer
 */
Stream.prototype.consume(consumer);

/**
 * Pull single value from stream
 * @param {Function} callback - (err, value) => void
 * @returns {void}
 */
Stream.prototype.pull(callback);

Usage Examples:

// Custom transform with consume
const customTransform = source => source.consume((err, x, push, next) => {
  if (err) {
    push(err);
    next();
  } else if (x === _.nil) {
    push(null, _.nil);
  } else {
    // Custom logic here
    if (x > 0) {
      push(null, x * 2);
    }
    next(); // Continue processing
  }
});

// Pull single values
const stream = _([1, 2, 3]);
stream.pull((err, value) => {
  console.log('First value:', value); // 1
  
  stream.pull((err, value) => {
    console.log('Second value:', value); // 2
  });
});

Stream State Management

Control stream execution state.

/**
 * Pause stream processing
 * @returns {void}
 */
Stream.prototype.pause();

/**
 * Resume stream processing
 * @returns {void}
 */
Stream.prototype.resume();

/**
 * End stream by emitting nil
 * @returns {void}
 */
Stream.prototype.end();

/**
 * Destroy stream and clean up resources
 * @returns {void}
 */
Stream.prototype.destroy();

/**
 * Write value to stream (for writable streams)
 * @param {*} value - Value to write
 * @returns {boolean} False if stream is paused
 */
Stream.prototype.write(value);

Usage Examples:

// Manual stream control
const stream = _([1, 2, 3, 4, 5]);

stream.each(value => {
  console.log(value);
  if (value === 3) {
    stream.pause();
    setTimeout(() => {
      console.log('Resuming...');
      stream.resume();
    }, 1000);
  }
});

// Writable stream pattern
const writableStream = _();
writableStream.each(console.log);

writableStream.write(1);
writableStream.write(2);
writableStream.end(); // or writableStream.write(_.nil);

Error Handling

Control error propagation and recovery in streams.

/**
 * Handle errors without stopping stream
 * @param {Function} handler - (err, push) => void
 * @returns {Stream} Stream with error handling
 */
Stream.prototype.errors(handler);

/**
 * Stop stream on first error
 * @param {Function} handler - (err, push) => void
 * @returns {Stream} Stream that stops on error
 */
Stream.prototype.stopOnError(handler);

Usage Examples:

// Continue processing despite errors
_([1, 'invalid', 3, 'bad', 5])
  .map(x => {
    if (typeof x !== 'number') throw new Error('Not a number');
    return x * 2;
  })
  .errors((err, push) => {
    console.log('Error:', err.message);
    push(null, 0); // Emit default value
  })
  .toArray(console.log); // [2, 0, 6, 0, 10]

// Stop on first error
_([1, 2, 'invalid', 4, 5])
  .map(x => {
    if (typeof x !== 'number') throw new Error('Not a number');
    return x * 2;
  })
  .stopOnError(err => console.log('Stopped due to:', err.message))
  .toArray(console.log); // [2, 4] (stops at 'invalid')

Advanced Control Patterns

Combine multiple control mechanisms for sophisticated flow control.

/**
 * Filter using async predicate function
 * @param {Function} predicate - Function returning stream of boolean
 * @returns {Stream} Filtered stream
 */
Stream.prototype.flatFilter(predicate);

Usage Examples:

// Async filtering
const checkFileExists = _.wrapCallback(require('fs').access);

_(['file1.txt', 'file2.txt', 'missing.txt'])
  .flatFilter(filename => 
    checkFileExists(filename)
      .map(() => true)
      .errors(() => _(false))
  )
  .toArray(console.log); // Only existing files

Backpressure and Flow Control

Highland automatically manages backpressure, but you can influence it:

// Source with backpressure awareness
const slowSource = _(function(push, next) {
  // Only push when consumer is ready
  setTimeout(() => {
    push(null, Math.random());
    next(); // Signal ready for more
  }, 100);
});

// Consumer that controls flow
slowSource
  .map(heavyProcessing) // Slow operation
  .each(result => {
    // Consumer automatically provides backpressure
    console.log(result);
  });

Stream Coordination

Coordinate multiple streams with different timing:

// Wait for all streams to have data
const stream1 = _([1, 2, 3]);
const stream2 = _().write('a').write('b');
const stream3 = _(Promise.resolve('ready'));

_([stream1, stream2, stream3])
  .zipAll0() // Wait for all to emit
  .each(([num, letter, status]) => {
    console.log('Synchronized:', num, letter, status);
  });

Performance Considerations

Control stream performance with these patterns:

// Batch processing for efficiency
_([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .batch(3)
  .map(batch => {
    // Process batch efficiently
    return batch.reduce((sum, x) => sum + x, 0);
  })
  .toArray(console.log); // [6, 15, 24, 10]

// Controlled parallelism
const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];
_(urls)
  .map(fetchUrl) // Create stream of promise streams
  .parallel(2) // Only 2 concurrent requests
  .errors((err, push) => push(null, null)) // Handle failures
  .compact() // Remove failed requests
  .toArray(console.log);