Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Methods for consuming stream data and converting to other formats including arrays, promises, and Node streams.
Core methods for consuming stream values.
/**
* Iterate over each value in the stream
* @param {Function} iterator - Function to call with each value
* @returns {Stream} Stream for chaining (only with done())
*/
Stream.prototype.each(iterator);
/**
* Call function when stream ends
* @param {Function} callback - Function to call on completion
* @returns {void}
*/
Stream.prototype.done(callback);Usage Examples:
// Basic iteration
_([1, 2, 3, 4]).each(x => console.log(x));
// Logs: 1, 2, 3, 4
// Chaining with done
let total = 0;
_([1, 2, 3, 4])
.each(x => total += x)
.done(() => console.log('Total:', total));
// Logs: Total: 10Convert streams to arrays.
/**
* Collect all values into an array
* @param {Function} callback - Function to call with completed array
* @returns {void}
*/
Stream.prototype.toArray(callback);
/**
* Collect all values into an array and pass down stream
* @returns {Stream} Stream containing single array
*/
Stream.prototype.collect();Usage Examples:
// Convert to array
_([1, 2, 3]).toArray(arr => {
console.log(arr); // [1, 2, 3]
console.log('Length:', arr.length); // Length: 3
});
// Collect for further processing
_([1, 2, 3])
.collect()
.map(arr => arr.length)
.toArray(console.log); // [3]Apply stream values as function arguments.
/**
* Apply all stream values as arguments to a function
* @param {Function} fn - Function to apply arguments to
* @returns {void}
*/
Stream.prototype.apply(fn);Usage Examples:
// Apply values as arguments
_([1, 2, 3]).apply((a, b, c) => {
console.log('Arguments:', a, b, c); // Arguments: 1 2 3
console.log('Sum:', a + b + c); // Sum: 6
});
// Apply with Math functions
_([1, 5, 3, 9, 2]).apply(Math.max); // Logs: 9Convert streams to Node.js style callbacks.
/**
* Convert single-value stream to Node.js style callback
* @param {Function} callback - (err, result) => void
* @returns {void}
*/
Stream.prototype.toCallback(callback);Usage Examples:
// Single value to callback
_.of(42).toCallback((err, result) => {
if (err) {
console.error('Error:', err);
} else {
console.log('Result:', result); // Result: 42
}
});
// Error handling
_.fromError(new Error('Something failed')).toCallback((err, result) => {
console.error('Caught:', err.message); // Caught: Something failed
});
// Multiple values cause error
_([1, 2, 3]).toCallback((err, result) => {
console.error('Error:', err.message); // Error: toCallback called on stream emitting multiple values
});Convert streams to promises.
/**
* Convert single-value stream to Promise
* @param {Function} PromiseConstructor - Promise constructor to use
* @returns {Promise} Promise resolving to stream value
*/
Stream.prototype.toPromise(PromiseConstructor);Usage Examples:
// Single value to promise
_.of(42)
.toPromise(Promise)
.then(result => console.log('Result:', result)) // Result: 42
.catch(err => console.error('Error:', err));
// Error handling
_.fromError(new Error('Failed'))
.toPromise(Promise)
.then(result => console.log('Result:', result))
.catch(err => console.error('Caught:', err.message)); // Caught: Failed
// Empty stream resolves to undefined
_([])
.toPromise(Promise)
.then(result => console.log('Result:', result)); // Result: undefined
// Async/await usage
async function processStream() {
try {
const result = await _([1, 2, 3]).collect().toPromise(Promise);
console.log('Collected:', result); // Collected: [1, 2, 3]
} catch (err) {
console.error('Error:', err);
}
}Convert Highland streams to Node.js readable streams.
/**
* Convert to Node.js Readable stream
* @param {Object} options - Readable stream options
* @returns {Readable} Node.js Readable stream
*/
Stream.prototype.toNodeStream(options);Usage Examples:
const fs = require('fs');
// Convert to Node stream
const highlandStream = _(['line 1\n', 'line 2\n', 'line 3\n']);
const nodeStream = highlandStream.toNodeStream();
// Pipe to file
nodeStream.pipe(fs.createWriteStream('output.txt'));
// Object mode
const objectStream = _([{id: 1}, {id: 2}, {id: 3}])
.toNodeStream({objectMode: true});
objectStream.on('data', obj => console.log('Object:', obj));Pipe Highland streams to Node.js writable streams.
/**
* Pipe to Node.js writable stream
* @param {Writable} dest - Destination writable stream
* @param {Object} options - Pipe options
* @returns {Writable} The destination stream
*/
Stream.prototype.pipe(dest, options);Usage Examples:
const fs = require('fs');
// Pipe to file
_(['Hello', ' ', 'World', '!'])
.pipe(fs.createWriteStream('greeting.txt'));
// Pipe with options
_(['line1\n', 'line2\n'])
.pipe(process.stdout, { end: false }); // Don't end stdout
// Chain pipes
_([1, 2, 3, 4])
.map(x => x + '\n')
.pipe(fs.createWriteStream('numbers.txt'))
.on('finish', () => console.log('Writing complete'));Reduce streams to single values.
/**
* Reduce stream to single value with initial accumulator
* @param {*} memo - Initial accumulator value
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream with single reduced value
*/
Stream.prototype.reduce(memo, reducer);
/**
* Reduce using first value as initial accumulator
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream with single reduced value
*/
Stream.prototype.reduce1(reducer);
/**
* Like reduce but emits intermediate values
* @param {*} memo - Initial accumulator value
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream of intermediate values
*/
Stream.prototype.scan(memo, reducer);
/**
* Scan using first value as initial accumulator
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream of intermediate values
*/
Stream.prototype.scan1(reducer);Usage Examples:
// Basic reduction
_([1, 2, 3, 4])
.reduce(0, (sum, x) => sum + x)
.toArray(console.log); // [10]
// Reduce without initial value
_([1, 2, 3, 4])
.reduce1((sum, x) => sum + x)
.toArray(console.log); // [10]
// Scan shows intermediate values
_([1, 2, 3, 4])
.scan(0, (sum, x) => sum + x)
.toArray(console.log); // [0, 1, 3, 6, 10]
// Scan without initial value
_([1, 2, 3, 4])
.scan1((sum, x) => sum + x)
.toArray(console.log); // [1, 3, 6, 10]
// Complex reduction - find maximum
_([{name: 'Alice', score: 85}, {name: 'Bob', score: 92}, {name: 'Charlie', score: 78}])
.reduce1((max, current) => current.score > max.score ? current : max)
.toArray(result => console.log('Winner:', result[0].name)); // Winner: BobCombine consumption with transformation for complete data processing:
// File processing pipeline
const fs = require('fs');
const _ = require('highland');
_(fs.createReadStream('data.csv'))
.split() // Split by lines
.drop(1) // Skip header
.map(line => line.split(',')) // Parse CSV
.filter(row => row.length === 3) // Valid rows only
.map(([name, age, city]) => ({ name, age: parseInt(age), city }))
.filter(person => person.age >= 18) // Adults only
.group('city') // Group by city
.toArray(results => {
console.log('Adults by city:', results);
});// Robust error handling
_([1, 2, 'invalid', 4, 5])
.map(x => {
if (typeof x !== 'number') throw new Error('Invalid number');
return x * 2;
})
.errors((err, push) => {
console.log('Skipping invalid value:', err.message);
// Don't push anything, effectively filtering out the error
})
.toArray(results => {
console.log('Valid results:', results); // [2, 4, 8, 10]
});// Process stream asynchronously
async function processData() {
const data = await _([1, 2, 3, 4, 5])
.map(x => x * 2)
.filter(x => x > 5)
.collect()
.toPromise(Promise);
console.log('Processed data:', data); // [6, 8, 10]
// Further async processing
const sum = data.reduce((a, b) => a + b, 0);
return sum;
}
processData().then(sum => console.log('Sum:', sum)); // Sum: 24