Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Methods for controlling stream execution, timing, and data flow including throttling, debouncing, and backpressure management.
Control the rate of data flow through streams.
/**
* Limit number of values per time window
* @param {number} count - Maximum values per window
* @param {number} ms - Time window in milliseconds
* @returns {Stream} Rate-limited stream
*/
Stream.prototype.ratelimit(count, ms);
/**
* Only emit one value per time interval
* @param {number} ms - Minimum milliseconds between values
* @returns {Stream} Throttled stream
*/
Stream.prototype.throttle(ms);
/**
* Wait for quiet period before emitting latest value
* @param {number} ms - Milliseconds to wait for quiet period
* @returns {Stream} Debounced stream
*/
Stream.prototype.debounce(ms);Usage Examples:
// Rate limiting - max 5 values per second
_([1, 2, 3, 4, 5, 6, 7, 8])
.ratelimit(5, 1000)
.each(console.log); // First 5 immediately, then 3 more after 1 second
// Throttling - max one value per 100ms
_('mousemove', document)
.throttle(100)
.each(handleMouseMove); // At most 10 events per second
// Debouncing - wait 300ms after last keypress
_('keyup', searchInput)
.debounce(300)
.each(performSearch); // Only search after user stops typingControl how values are consumed from streams.
/**
* Get latest value when queried (non-blocking)
* @returns {Stream} Stream that provides latest value on demand
*/
Stream.prototype.latest();Usage Examples:
// Always get latest mouse position when needed
const mousePosition = _('mousemove', document)
.map(e => ({ x: e.clientX, y: e.clientY }))
.latest();
// Some slow operation that needs current mouse position
function slowOperation() {
return mousePosition.take(1).toPromise(Promise);
}
slowOperation().then(position => {
console.log('Current mouse:', position);
});Direct control over stream consumption and generation.
/**
* Low-level stream consumption with manual flow control
* @param {Function} consumer - (err, value, push, next) => void
* @returns {Stream} New stream controlled by consumer
*/
Stream.prototype.consume(consumer);
/**
* Pull single value from stream
* @param {Function} callback - (err, value) => void
* @returns {void}
*/
Stream.prototype.pull(callback);Usage Examples:
// Custom transform with consume
const customTransform = source => source.consume((err, x, push, next) => {
if (err) {
push(err);
next();
} else if (x === _.nil) {
push(null, _.nil);
} else {
// Custom logic here
if (x > 0) {
push(null, x * 2);
}
next(); // Continue processing
}
});
// Pull single values
const stream = _([1, 2, 3]);
stream.pull((err, value) => {
console.log('First value:', value); // 1
stream.pull((err, value) => {
console.log('Second value:', value); // 2
});
});Control stream execution state.
/**
* Pause stream processing
* @returns {void}
*/
Stream.prototype.pause();
/**
* Resume stream processing
* @returns {void}
*/
Stream.prototype.resume();
/**
* End stream by emitting nil
* @returns {void}
*/
Stream.prototype.end();
/**
* Destroy stream and clean up resources
* @returns {void}
*/
Stream.prototype.destroy();
/**
* Write value to stream (for writable streams)
* @param {*} value - Value to write
* @returns {boolean} False if stream is paused
*/
Stream.prototype.write(value);Usage Examples:
// Manual stream control
const stream = _([1, 2, 3, 4, 5]);
stream.each(value => {
console.log(value);
if (value === 3) {
stream.pause();
setTimeout(() => {
console.log('Resuming...');
stream.resume();
}, 1000);
}
});
// Writable stream pattern
const writableStream = _();
writableStream.each(console.log);
writableStream.write(1);
writableStream.write(2);
writableStream.end(); // or writableStream.write(_.nil);Control error propagation and recovery in streams.
/**
* Handle errors without stopping stream
* @param {Function} handler - (err, push) => void
* @returns {Stream} Stream with error handling
*/
Stream.prototype.errors(handler);
/**
* Stop stream on first error
* @param {Function} handler - (err, push) => void
* @returns {Stream} Stream that stops on error
*/
Stream.prototype.stopOnError(handler);Usage Examples:
// Continue processing despite errors
_([1, 'invalid', 3, 'bad', 5])
.map(x => {
if (typeof x !== 'number') throw new Error('Not a number');
return x * 2;
})
.errors((err, push) => {
console.log('Error:', err.message);
push(null, 0); // Emit default value
})
.toArray(console.log); // [2, 0, 6, 0, 10]
// Stop on first error
_([1, 2, 'invalid', 4, 5])
.map(x => {
if (typeof x !== 'number') throw new Error('Not a number');
return x * 2;
})
.stopOnError(err => console.log('Stopped due to:', err.message))
.toArray(console.log); // [2, 4] (stops at 'invalid')Combine multiple control mechanisms for sophisticated flow control.
/**
* Filter using async predicate function
* @param {Function} predicate - Function returning stream of boolean
* @returns {Stream} Filtered stream
*/
Stream.prototype.flatFilter(predicate);Usage Examples:
// Async filtering
const checkFileExists = _.wrapCallback(require('fs').access);
_(['file1.txt', 'file2.txt', 'missing.txt'])
.flatFilter(filename =>
checkFileExists(filename)
.map(() => true)
.errors(() => _(false))
)
.toArray(console.log); // Only existing filesHighland automatically manages backpressure, but you can influence it:
// Source with backpressure awareness
const slowSource = _(function(push, next) {
// Only push when consumer is ready
setTimeout(() => {
push(null, Math.random());
next(); // Signal ready for more
}, 100);
});
// Consumer that controls flow
slowSource
.map(heavyProcessing) // Slow operation
.each(result => {
// Consumer automatically provides backpressure
console.log(result);
});Coordinate multiple streams with different timing:
// Wait for all streams to have data
const stream1 = _([1, 2, 3]);
const stream2 = _().write('a').write('b');
const stream3 = _(Promise.resolve('ready'));
_([stream1, stream2, stream3])
.zipAll0() // Wait for all to emit
.each(([num, letter, status]) => {
console.log('Synchronized:', num, letter, status);
});Control stream performance with these patterns:
// Batch processing for efficiency
_([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.batch(3)
.map(batch => {
// Process batch efficiently
return batch.reduce((sum, x) => sum + x, 0);
})
.toArray(console.log); // [6, 15, 24, 10]
// Controlled parallelism
const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];
_(urls)
.map(fetchUrl) // Create stream of promise streams
.parallel(2) // Only 2 concurrent requests
.errors((err, push) => push(null, null)) // Handle failures
.compact() // Remove failed requests
.toArray(console.log);