Methods for controlling stream execution, timing, and data flow including throttling, debouncing, and backpressure management.
Control the rate of data flow through streams.
/**
* Limit number of values per time window
* @param {number} count - Maximum values per window
* @param {number} ms - Time window in milliseconds
* @returns {Stream} Rate-limited stream
*/
Stream.prototype.ratelimit(count, ms);
/**
* Only emit one value per time interval
* @param {number} ms - Minimum milliseconds between values
* @returns {Stream} Throttled stream
*/
Stream.prototype.throttle(ms);
/**
* Wait for quiet period before emitting latest value
* @param {number} ms - Milliseconds to wait for quiet period
* @returns {Stream} Debounced stream
*/
Stream.prototype.debounce(ms);Usage Examples:
// Rate limiting - max 5 values per second
_([1, 2, 3, 4, 5, 6, 7, 8])
.ratelimit(5, 1000)
.each(console.log); // First 5 immediately, then 3 more after 1 second
// Throttling - max one value per 100ms
_('mousemove', document)
.throttle(100)
.each(handleMouseMove); // At most 10 events per second
// Debouncing - wait 300ms after last keypress
_('keyup', searchInput)
.debounce(300)
.each(performSearch); // Only search after user stops typingControl how values are consumed from streams.
/**
* Get latest value when queried (non-blocking)
* @returns {Stream} Stream that provides latest value on demand
*/
Stream.prototype.latest();Usage Examples:
// Always get latest mouse position when needed
const mousePosition = _('mousemove', document)
.map(e => ({ x: e.clientX, y: e.clientY }))
.latest();
// Some slow operation that needs current mouse position
function slowOperation() {
return mousePosition.take(1).toPromise(Promise);
}
slowOperation().then(position => {
console.log('Current mouse:', position);
});Direct control over stream consumption and generation.
/**
* Low-level stream consumption with manual flow control
* @param {Function} consumer - (err, value, push, next) => void
* @returns {Stream} New stream controlled by consumer
*/
Stream.prototype.consume(consumer);
/**
* Pull single value from stream
* @param {Function} callback - (err, value) => void
* @returns {void}
*/
Stream.prototype.pull(callback);Usage Examples:
// Custom transform with consume
const customTransform = source => source.consume((err, x, push, next) => {
if (err) {
push(err);
next();
} else if (x === _.nil) {
push(null, _.nil);
} else {
// Custom logic here
if (x > 0) {
push(null, x * 2);
}
next(); // Continue processing
}
});
// Pull single values
const stream = _([1, 2, 3]);
stream.pull((err, value) => {
console.log('First value:', value); // 1
stream.pull((err, value) => {
console.log('Second value:', value); // 2
});
});Control stream execution state.
/**
* Pause stream processing
* @returns {void}
*/
Stream.prototype.pause();
/**
* Resume stream processing
* @returns {void}
*/
Stream.prototype.resume();
/**
* End stream by emitting nil
* @returns {void}
*/
Stream.prototype.end();
/**
* Destroy stream and clean up resources
* @returns {void}
*/
Stream.prototype.destroy();
/**
* Write value to stream (for writable streams)
* @param {*} value - Value to write
* @returns {boolean} False if stream is paused
*/
Stream.prototype.write(value);Usage Examples:
// Manual stream control
const stream = _([1, 2, 3, 4, 5]);
stream.each(value => {
console.log(value);
if (value === 3) {
stream.pause();
setTimeout(() => {
console.log('Resuming...');
stream.resume();
}, 1000);
}
});
// Writable stream pattern
const writableStream = _();
writableStream.each(console.log);
writableStream.write(1);
writableStream.write(2);
writableStream.end(); // or writableStream.write(_.nil);Control error propagation and recovery in streams.
/**
* Handle errors without stopping stream
* @param {Function} handler - (err, push) => void
* @returns {Stream} Stream with error handling
*/
Stream.prototype.errors(handler);
/**
* Stop stream on first error
* @param {Function} handler - (err, push) => void
* @returns {Stream} Stream that stops on error
*/
Stream.prototype.stopOnError(handler);Usage Examples:
// Continue processing despite errors
_([1, 'invalid', 3, 'bad', 5])
.map(x => {
if (typeof x !== 'number') throw new Error('Not a number');
return x * 2;
})
.errors((err, push) => {
console.log('Error:', err.message);
push(null, 0); // Emit default value
})
.toArray(console.log); // [2, 0, 6, 0, 10]
// Stop on first error
_([1, 2, 'invalid', 4, 5])
.map(x => {
if (typeof x !== 'number') throw new Error('Not a number');
return x * 2;
})
.stopOnError(err => console.log('Stopped due to:', err.message))
.toArray(console.log); // [2, 4] (stops at 'invalid')Combine multiple control mechanisms for sophisticated flow control.
/**
* Filter using async predicate function
* @param {Function} predicate - Function returning stream of boolean
* @returns {Stream} Filtered stream
*/
Stream.prototype.flatFilter(predicate);Usage Examples:
// Async filtering
const checkFileExists = _.wrapCallback(require('fs').access);
_(['file1.txt', 'file2.txt', 'missing.txt'])
.flatFilter(filename =>
checkFileExists(filename)
.map(() => true)
.errors(() => _(false))
)
.toArray(console.log); // Only existing filesHighland automatically manages backpressure, but you can influence it:
// Source with backpressure awareness
const slowSource = _(function(push, next) {
// Only push when consumer is ready
setTimeout(() => {
push(null, Math.random());
next(); // Signal ready for more
}, 100);
});
// Consumer that controls flow
slowSource
.map(heavyProcessing) // Slow operation
.each(result => {
// Consumer automatically provides backpressure
console.log(result);
});Coordinate multiple streams with different timing:
// Wait for all streams to have data
const stream1 = _([1, 2, 3]);
const stream2 = _().write('a').write('b');
const stream3 = _(Promise.resolve('ready'));
_([stream1, stream2, stream3])
.zipAll0() // Wait for all to emit
.each(([num, letter, status]) => {
console.log('Synchronized:', num, letter, status);
});Control stream performance with these patterns:
// Batch processing for efficiency
_([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.batch(3)
.map(batch => {
// Process batch efficiently
return batch.reduce((sum, x) => sum + x, 0);
})
.toArray(console.log); // [6, 15, 24, 10]
// Controlled parallelism
const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];
_(urls)
.map(fetchUrl) // Create stream of promise streams
.parallel(2) // Only 2 concurrent requests
.errors((err, push) => push(null, null)) // Handle failures
.compact() // Remove failed requests
.toArray(console.log);