Highland provides flexible ways to create streams from various data sources including arrays, generators, promises, Node streams, and events.
The primary Highland constructor that creates streams from various sources.
/**
* Creates a Highland stream from various sources
* @param {Array|Function|Promise|Stream|EventEmitter|Iterator|Iterable|String} source - The data source
* @param {Function} onFinished - (Optional) Completion detection for Readable streams
* @param {Array|Function|Number} mappingHint - (Optional) Event argument mapping for EventEmitters
* @returns {Stream} Highland stream
*/
function _(source, onFinished, mappingHint);Usage Examples:
const _ = require('highland');
// From arrays
const arrayStream = _([1, 2, 3, 4]);
// From generator functions
const generatorStream = _(function(push, next) {
push(null, 1);
push(null, 2);
push(null, _.nil); // End the stream
});
// From promises
const promiseStream = _(Promise.resolve('hello'));
// From Node readable streams
const fs = require('fs');
const fileStream = _(fs.createReadStream('file.txt'));
// From events
const eventStream = _('click', document.body);
// From iterables
const mapStream = _(new Map([['a', 1], ['b', 2]]));Utility functions for creating streams with specific values or errors.
/**
* Creates a stream containing a single value
* @param {*} value - The value to emit
* @returns {Stream} Stream containing the single value
*/
function _.of(value);
/**
* Creates a stream that emits a single error
* @param {Error} error - The error to emit
* @returns {Stream} Stream that emits the error
*/
function _.fromError(error);Usage Examples:
// Single value stream
_.of(42).toArray(result => {
console.log(result); // [42]
});
// Error stream
_.fromError(new Error('Something went wrong'))
.errors((err, push) => {
console.log('Caught error:', err.message);
push(null, 'recovered');
})
.toArray(console.log); // ['recovered']The sentinel value used to signal the end of a stream.
/**
* Global end-of-stream marker
* @type {Object}
*/
const _.nil;
/**
* Check if a value is the nil marker
* @param {*} value - Value to test
* @returns {boolean} True if value is nil
*/
function _.isNil(value);Utility functions for identifying different types of values and streams.
/**
* Check if value is a Highland stream
* @param {*} value - Value to test
* @returns {boolean} True if value is a Highland stream
*/
function _.isStream(value);
/**
* Check if value is undefined
* @param {*} value - Value to test
* @returns {boolean} True if value is undefined
*/
function _.isUndefined(value);
/**
* Check if value is a function
* @param {*} value - Value to test
* @returns {boolean} True if value is a function
*/
function _.isFunction(value);
/**
* Check if value is an object (non-null)
* @param {*} value - Value to test
* @returns {boolean} True if value is an object
*/
function _.isObject(value);
/**
* Check if value is a string
* @param {*} value - Value to test
* @returns {boolean} True if value is a string
*/
function _.isString(value);
/**
* Check if value is an array
* @param {*} value - Value to test
* @returns {boolean} True if value is an array
*/
function _.isArray(value);Generator functions provide the most flexible way to create streams with custom logic.
const customStream = _(function(push, next) {
// Push values asynchronously
setTimeout(() => {
push(null, 'first');
next(); // Continue generating
}, 100);
setTimeout(() => {
push(null, 'second');
next();
}, 200);
setTimeout(() => {
push(null, _.nil); // End the stream
}, 300);
});Event streams can map event arguments in various ways.
// Map to array of first 2 arguments
_('request', server, 2).each(args => {
console.log(args); // [req, res]
});
// Map to object with named properties
_('request', server, ['req', 'res']).each(obj => {
console.log(obj); // { req: IncomingMessage, res: ServerResponse }
});
// Map with custom function
_('request', server, (req, res) => res).each(response => {
// Only get the response object
});Some Node streams need custom completion detection.
const stream = _(req, function(readable, callback) {
// Custom completion detection
readable.on('end', callback);
readable.on('close', callback);
readable.on('error', callback);
// Return cleanup function
return function() {
readable.removeListener('end', callback);
readable.removeListener('close', callback);
readable.removeListener('error', callback);
};
});// Generator function signature
type GeneratorFunction = (
push: (error: Error | null, value?: any) => void,
next: (redirectStream?: Stream) => void
) => void;
// Event mapping hint types
type MappingHint =
| number // Take first N arguments as array
| string[] // Map arguments to object properties
| ((...args: any[]) => any); // Custom mapping function
// Node stream completion detector
type OnFinishedFunction = (
readable: NodeJS.ReadableStream,
callback: (error?: Error) => void
) => void | (() => void) | { onDestroy?: () => void, continueOnError?: boolean };