or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transforms.mdbasic-transforms.mdconsumption.mdflow-control.mdfunctional-programming.mdhigher-order-streams.mdindex.mdstream-creation.md
tile.json

stream-creation.mddocs/

Stream Creation

Highland provides flexible ways to create streams from various data sources including arrays, generators, promises, Node streams, and events.

Capabilities

Main Constructor

The primary Highland constructor that creates streams from various sources.

/**
 * Creates a Highland stream from various sources
 * @param {Array|Function|Promise|Stream|EventEmitter|Iterator|Iterable|String} source - The data source
 * @param {Function} onFinished - (Optional) Completion detection for Readable streams
 * @param {Array|Function|Number} mappingHint - (Optional) Event argument mapping for EventEmitters
 * @returns {Stream} Highland stream
 */
function _(source, onFinished, mappingHint);

Usage Examples:

const _ = require('highland');

// From arrays
const arrayStream = _([1, 2, 3, 4]);

// From generator functions
const generatorStream = _(function(push, next) {
  push(null, 1);
  push(null, 2);
  push(null, _.nil); // End the stream
});

// From promises
const promiseStream = _(Promise.resolve('hello'));

// From Node readable streams
const fs = require('fs');
const fileStream = _(fs.createReadStream('file.txt'));

// From events
const eventStream = _('click', document.body);

// From iterables
const mapStream = _(new Map([['a', 1], ['b', 2]]));

Factory Functions

Utility functions for creating streams with specific values or errors.

/**
 * Creates a stream containing a single value
 * @param {*} value - The value to emit
 * @returns {Stream} Stream containing the single value
 */
function _.of(value);

/**
 * Creates a stream that emits a single error
 * @param {Error} error - The error to emit
 * @returns {Stream} Stream that emits the error
 */
function _.fromError(error);

Usage Examples:

// Single value stream
_.of(42).toArray(result => {
  console.log(result); // [42]
});

// Error stream
_.fromError(new Error('Something went wrong'))
  .errors((err, push) => {
    console.log('Caught error:', err.message);
    push(null, 'recovered');
  })
  .toArray(console.log); // ['recovered']

Stream End Marker

The sentinel value used to signal the end of a stream.

/**
 * Global end-of-stream marker
 * @type {Object}
 */
const _.nil;

/**
 * Check if a value is the nil marker
 * @param {*} value - Value to test
 * @returns {boolean} True if value is nil
 */
function _.isNil(value);

Type Checking Functions

Utility functions for identifying different types of values and streams.

/**
 * Check if value is a Highland stream
 * @param {*} value - Value to test
 * @returns {boolean} True if value is a Highland stream
 */
function _.isStream(value);

/**
 * Check if value is undefined
 * @param {*} value - Value to test
 * @returns {boolean} True if value is undefined
 */
function _.isUndefined(value);

/**
 * Check if value is a function
 * @param {*} value - Value to test
 * @returns {boolean} True if value is a function
 */
function _.isFunction(value);

/**
 * Check if value is an object (non-null)
 * @param {*} value - Value to test
 * @returns {boolean} True if value is an object
 */
function _.isObject(value);

/**
 * Check if value is a string
 * @param {*} value - Value to test
 * @returns {boolean} True if value is a string
 */
function _.isString(value);

/**
 * Check if value is an array
 * @param {*} value - Value to test
 * @returns {boolean} True if value is an array
 */
function _.isArray(value);

Advanced Stream Creation

From Generator Functions

Generator functions provide the most flexible way to create streams with custom logic.

const customStream = _(function(push, next) {
  // Push values asynchronously
  setTimeout(() => {
    push(null, 'first');
    next(); // Continue generating
  }, 100);
  
  setTimeout(() => {
    push(null, 'second');
    next();
  }, 200);
  
  setTimeout(() => {
    push(null, _.nil); // End the stream
  }, 300);
});

From Events with Mapping

Event streams can map event arguments in various ways.

// Map to array of first 2 arguments
_('request', server, 2).each(args => {
  console.log(args); // [req, res]
});

// Map to object with named properties
_('request', server, ['req', 'res']).each(obj => {
  console.log(obj); // { req: IncomingMessage, res: ServerResponse }
});

// Map with custom function
_('request', server, (req, res) => res).each(response => {
  // Only get the response object
});

From Node Streams with Custom Completion

Some Node streams need custom completion detection.

const stream = _(req, function(readable, callback) {
  // Custom completion detection
  readable.on('end', callback);
  readable.on('close', callback);
  readable.on('error', callback);
  
  // Return cleanup function
  return function() {
    readable.removeListener('end', callback);
    readable.removeListener('close', callback);
    readable.removeListener('error', callback);
  };
});

Types

// Generator function signature
type GeneratorFunction = (
  push: (error: Error | null, value?: any) => void,
  next: (redirectStream?: Stream) => void
) => void;

// Event mapping hint types
type MappingHint = 
  | number // Take first N arguments as array
  | string[] // Map arguments to object properties
  | ((...args: any[]) => any); // Custom mapping function

// Node stream completion detector
type OnFinishedFunction = (
  readable: NodeJS.ReadableStream,
  callback: (error?: Error) => void
) => void | (() => void) | { onDestroy?: () => void, continueOnError?: boolean };