Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction. It provides functional programming utilities similar to Underscore.js or Lodash but designed specifically for handling data distributed over time, allowing developers to seamlessly work with arrays, streams, promises, events, and callbacks using the same API.
npm install highlandconst _ = require('highland');For ES modules:
import _ from 'highland';The main export is both a function and namespace - it serves as the primary entry point for creating streams and accessing utility functions.
const _ = require('highland');
// Create streams from arrays
_([1, 2, 3, 4])
.map(x => x * 2)
.filter(x => x > 4)
.toArray(result => {
console.log(result); // [6, 8]
});
// Work with async data sources
_(fs.createReadStream('data.txt'))
.split()
.map(line => line.trim())
.compact()
.each(console.log);
// Transform promises and events
_('click', button)
.throttle(1000)
.each(handleClick);Highland is built around several key architectural components:
Core functionality for creating Highland streams from various data sources including arrays, generators, promises, Node streams, and events.
// Main constructor function
function _(source, onFinished, mappingHint) { /* ... */ }
// Factory functions
function _.of(value) { /* ... */ }
function _.fromError(error) { /* ... */ }
// Special values
const _.nil; // End-of-stream markerType checking, debugging, and internal utility functions.
// Type checking utilities
function _.isUndefined(x);
function _.isFunction(x);
function _.isObject(x);
function _.isString(x);
function _.isArray(x);
function _.isNil(x);
function _.isStream(x);
// Internal utilities (less commonly used)
function _._isStreamError(x);
function _._isStreamRedirect(x);
// Object utilities
function _.keys(obj);
function _.values(obj);
function _.pairs(obj);
function _.extend(extensions, target);
function _.get(prop, obj);
function _.set(prop, val, obj);
// Mathematical operators
function _.add(a, b); // Curried
function _.not(x);
// Debugging
function _.log(...args);
// Stream utilities
function _.pipeline(...transforms); // Create reusable transformation pipeline
function _.wrapCallback(fn, mappingHint); // Wrap Node callback to return stream
function _.streamifyAll(obj); // Add stream methods to objectEssential stream transformation operations for mapping, filtering, and basic data manipulation.
// Transform values
Stream.prototype.map(fn);
Stream.prototype.filter(fn);
Stream.prototype.reject(fn);
// Object operations
Stream.prototype.pluck(property);
Stream.prototype.pick(properties);
Stream.prototype.where(properties);Sophisticated transformation operations including grouping, sorting, unique filtering, and batch processing.
// Grouping and sorting
Stream.prototype.group(fn);
Stream.prototype.sortBy(fn);
Stream.prototype.uniq();
Stream.prototype.uniqBy(fn);
// Batch operations
Stream.prototype.batch(n);
Stream.prototype.batchWithTimeOrCount(ms, n);Operations for working with streams of streams, including flattening, parallel processing, and merging.
// Stream composition
Stream.prototype.flatMap(fn);
Stream.prototype.sequence();
Stream.prototype.flatten();
// Parallel processing
Stream.prototype.parallel(n);
Stream.prototype.merge();
Stream.prototype.fork();Methods for controlling stream execution, timing, and data flow including throttling, debouncing, and backpressure management.
// Timing control
Stream.prototype.throttle(ms);
Stream.prototype.debounce(ms);
Stream.prototype.ratelimit(num, ms);
// Flow control
Stream.prototype.take(n);
Stream.prototype.drop(n);
Stream.prototype.slice(start, end);Methods for consuming stream data and converting to other formats including arrays, promises, and Node streams.
// Consumption methods
Stream.prototype.each(fn);
Stream.prototype.toArray(fn);
Stream.prototype.toCallback(cb);
Stream.prototype.toPromise(PromiseCtor);
Stream.prototype.collect();Curried utility functions and functional programming helpers for composition and partial application.
// Function utilities
function _.curry(fn, ...args);
function _.partial(fn, ...args);
function _.compose(...fns);
function _.seq(...fns);
// Limited top-level method aliases (only these two)
const _.tap = _.doto; // Alias for doto method
const _.series = _.sequence; // Alias for sequence method// Core stream class
class Stream extends EventEmitter {
constructor(source, onFinished, mappingHint);
// Stream control
pause();
resume();
end();
destroy();
write(value);
// Consumption
consume(fn);
pull(fn);
}
// Special marker object
const nil = {}; // End-of-stream sentinel value
// Error wrapper for stream errors
class StreamError {
constructor(error);
error; // The wrapped error
}
// Redirect wrapper for stream redirection
class StreamRedirect {
constructor(stream);
to; // The target stream
}