Methods for consuming stream data and converting to other formats including arrays, promises, and Node streams.
Core methods for consuming stream values.
/**
* Iterate over each value in the stream
* @param {Function} iterator - Function to call with each value
* @returns {Stream} Stream for chaining (only with done())
*/
Stream.prototype.each(iterator);
/**
* Call function when stream ends
* @param {Function} callback - Function to call on completion
* @returns {void}
*/
Stream.prototype.done(callback);Usage Examples:
// Basic iteration
_([1, 2, 3, 4]).each(x => console.log(x));
// Logs: 1, 2, 3, 4
// Chaining with done
let total = 0;
_([1, 2, 3, 4])
.each(x => total += x)
.done(() => console.log('Total:', total));
// Logs: Total: 10Convert streams to arrays.
/**
* Collect all values into an array
* @param {Function} callback - Function to call with completed array
* @returns {void}
*/
Stream.prototype.toArray(callback);
/**
* Collect all values into an array and pass down stream
* @returns {Stream} Stream containing single array
*/
Stream.prototype.collect();Usage Examples:
// Convert to array
_([1, 2, 3]).toArray(arr => {
console.log(arr); // [1, 2, 3]
console.log('Length:', arr.length); // Length: 3
});
// Collect for further processing
_([1, 2, 3])
.collect()
.map(arr => arr.length)
.toArray(console.log); // [3]Apply stream values as function arguments.
/**
* Apply all stream values as arguments to a function
* @param {Function} fn - Function to apply arguments to
* @returns {void}
*/
Stream.prototype.apply(fn);Usage Examples:
// Apply values as arguments
_([1, 2, 3]).apply((a, b, c) => {
console.log('Arguments:', a, b, c); // Arguments: 1 2 3
console.log('Sum:', a + b + c); // Sum: 6
});
// Apply with Math functions
_([1, 5, 3, 9, 2]).apply(Math.max); // Logs: 9Convert streams to Node.js style callbacks.
/**
* Convert single-value stream to Node.js style callback
* @param {Function} callback - (err, result) => void
* @returns {void}
*/
Stream.prototype.toCallback(callback);Usage Examples:
// Single value to callback
_.of(42).toCallback((err, result) => {
if (err) {
console.error('Error:', err);
} else {
console.log('Result:', result); // Result: 42
}
});
// Error handling
_.fromError(new Error('Something failed')).toCallback((err, result) => {
console.error('Caught:', err.message); // Caught: Something failed
});
// Multiple values cause error
_([1, 2, 3]).toCallback((err, result) => {
console.error('Error:', err.message); // Error: toCallback called on stream emitting multiple values
});Convert streams to promises.
/**
* Convert single-value stream to Promise
* @param {Function} PromiseConstructor - Promise constructor to use
* @returns {Promise} Promise resolving to stream value
*/
Stream.prototype.toPromise(PromiseConstructor);Usage Examples:
// Single value to promise
_.of(42)
.toPromise(Promise)
.then(result => console.log('Result:', result)) // Result: 42
.catch(err => console.error('Error:', err));
// Error handling
_.fromError(new Error('Failed'))
.toPromise(Promise)
.then(result => console.log('Result:', result))
.catch(err => console.error('Caught:', err.message)); // Caught: Failed
// Empty stream resolves to undefined
_([])
.toPromise(Promise)
.then(result => console.log('Result:', result)); // Result: undefined
// Async/await usage
async function processStream() {
try {
const result = await _([1, 2, 3]).collect().toPromise(Promise);
console.log('Collected:', result); // Collected: [1, 2, 3]
} catch (err) {
console.error('Error:', err);
}
}Convert Highland streams to Node.js readable streams.
/**
* Convert to Node.js Readable stream
* @param {Object} options - Readable stream options
* @returns {Readable} Node.js Readable stream
*/
Stream.prototype.toNodeStream(options);Usage Examples:
const fs = require('fs');
// Convert to Node stream
const highlandStream = _(['line 1\n', 'line 2\n', 'line 3\n']);
const nodeStream = highlandStream.toNodeStream();
// Pipe to file
nodeStream.pipe(fs.createWriteStream('output.txt'));
// Object mode
const objectStream = _([{id: 1}, {id: 2}, {id: 3}])
.toNodeStream({objectMode: true});
objectStream.on('data', obj => console.log('Object:', obj));Pipe Highland streams to Node.js writable streams.
/**
* Pipe to Node.js writable stream
* @param {Writable} dest - Destination writable stream
* @param {Object} options - Pipe options
* @returns {Writable} The destination stream
*/
Stream.prototype.pipe(dest, options);Usage Examples:
const fs = require('fs');
// Pipe to file
_(['Hello', ' ', 'World', '!'])
.pipe(fs.createWriteStream('greeting.txt'));
// Pipe with options
_(['line1\n', 'line2\n'])
.pipe(process.stdout, { end: false }); // Don't end stdout
// Chain pipes
_([1, 2, 3, 4])
.map(x => x + '\n')
.pipe(fs.createWriteStream('numbers.txt'))
.on('finish', () => console.log('Writing complete'));Reduce streams to single values.
/**
* Reduce stream to single value with initial accumulator
* @param {*} memo - Initial accumulator value
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream with single reduced value
*/
Stream.prototype.reduce(memo, reducer);
/**
* Reduce using first value as initial accumulator
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream with single reduced value
*/
Stream.prototype.reduce1(reducer);
/**
* Like reduce but emits intermediate values
* @param {*} memo - Initial accumulator value
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream of intermediate values
*/
Stream.prototype.scan(memo, reducer);
/**
* Scan using first value as initial accumulator
* @param {Function} reducer - (accumulator, value) => newAccumulator
* @returns {Stream} Stream of intermediate values
*/
Stream.prototype.scan1(reducer);Usage Examples:
// Basic reduction
_([1, 2, 3, 4])
.reduce(0, (sum, x) => sum + x)
.toArray(console.log); // [10]
// Reduce without initial value
_([1, 2, 3, 4])
.reduce1((sum, x) => sum + x)
.toArray(console.log); // [10]
// Scan shows intermediate values
_([1, 2, 3, 4])
.scan(0, (sum, x) => sum + x)
.toArray(console.log); // [0, 1, 3, 6, 10]
// Scan without initial value
_([1, 2, 3, 4])
.scan1((sum, x) => sum + x)
.toArray(console.log); // [1, 3, 6, 10]
// Complex reduction - find maximum
_([{name: 'Alice', score: 85}, {name: 'Bob', score: 92}, {name: 'Charlie', score: 78}])
.reduce1((max, current) => current.score > max.score ? current : max)
.toArray(result => console.log('Winner:', result[0].name)); // Winner: BobCombine consumption with transformation for complete data processing:
// File processing pipeline
const fs = require('fs');
const _ = require('highland');
_(fs.createReadStream('data.csv'))
.split() // Split by lines
.drop(1) // Skip header
.map(line => line.split(',')) // Parse CSV
.filter(row => row.length === 3) // Valid rows only
.map(([name, age, city]) => ({ name, age: parseInt(age), city }))
.filter(person => person.age >= 18) // Adults only
.group('city') // Group by city
.toArray(results => {
console.log('Adults by city:', results);
});// Robust error handling
_([1, 2, 'invalid', 4, 5])
.map(x => {
if (typeof x !== 'number') throw new Error('Invalid number');
return x * 2;
})
.errors((err, push) => {
console.log('Skipping invalid value:', err.message);
// Don't push anything, effectively filtering out the error
})
.toArray(results => {
console.log('Valid results:', results); // [2, 4, 8, 10]
});// Process stream asynchronously
async function processData() {
const data = await _([1, 2, 3, 4, 5])
.map(x => x * 2)
.filter(x => x > 5)
.collect()
.toPromise(Promise);
console.log('Processed data:', data); // [6, 8, 10]
// Further async processing
const sum = data.reduce((a, b) => a + b, 0);
return sum;
}
processData().then(sum => console.log('Sum:', sum)); // Sum: 24