or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transforms.mdbasic-transforms.mdconsumption.mdflow-control.mdfunctional-programming.mdhigher-order-streams.mdindex.mdstream-creation.md
tile.json

consumption.mddocs/

Consumption

Methods for consuming stream data and converting to other formats including arrays, promises, and Node streams.

Capabilities

Basic Consumption

Core methods for consuming stream values.

/**
 * Iterate over each value in the stream
 * @param {Function} iterator - Function to call with each value
 * @returns {Stream} Stream for chaining (only with done())
 */
Stream.prototype.each(iterator);

/**
 * Call function when stream ends
 * @param {Function} callback - Function to call on completion
 * @returns {void}
 */
Stream.prototype.done(callback);

Usage Examples:

// Basic iteration
_([1, 2, 3, 4]).each(x => console.log(x));
// Logs: 1, 2, 3, 4

// Chaining with done
let total = 0;
_([1, 2, 3, 4])
  .each(x => total += x)
  .done(() => console.log('Total:', total));
// Logs: Total: 10

Array Conversion

Convert streams to arrays.

/**
 * Collect all values into an array
 * @param {Function} callback - Function to call with completed array
 * @returns {void}
 */
Stream.prototype.toArray(callback);

/**
 * Collect all values into an array and pass down stream
 * @returns {Stream} Stream containing single array
 */
Stream.prototype.collect();

Usage Examples:

// Convert to array
_([1, 2, 3]).toArray(arr => {
  console.log(arr); // [1, 2, 3]
  console.log('Length:', arr.length); // Length: 3
});

// Collect for further processing
_([1, 2, 3])
  .collect()
  .map(arr => arr.length)
  .toArray(console.log); // [3]

Function Application

Apply stream values as function arguments.

/**
 * Apply all stream values as arguments to a function
 * @param {Function} fn - Function to apply arguments to
 * @returns {void}
 */
Stream.prototype.apply(fn);

Usage Examples:

// Apply values as arguments
_([1, 2, 3]).apply((a, b, c) => {
  console.log('Arguments:', a, b, c); // Arguments: 1 2 3
  console.log('Sum:', a + b + c); // Sum: 6
});

// Apply with Math functions
_([1, 5, 3, 9, 2]).apply(Math.max); // Logs: 9

Callback Conversion

Convert streams to Node.js style callbacks.

/**
 * Convert single-value stream to Node.js style callback
 * @param {Function} callback - (err, result) => void
 * @returns {void}
 */
Stream.prototype.toCallback(callback);

Usage Examples:

// Single value to callback
_.of(42).toCallback((err, result) => {
  if (err) {
    console.error('Error:', err);
  } else {
    console.log('Result:', result); // Result: 42
  }
});

// Error handling
_.fromError(new Error('Something failed')).toCallback((err, result) => {
  console.error('Caught:', err.message); // Caught: Something failed
});

// Multiple values cause error
_([1, 2, 3]).toCallback((err, result) => {
  console.error('Error:', err.message); // Error: toCallback called on stream emitting multiple values
});

Promise Conversion

Convert streams to promises.

/**
 * Convert single-value stream to Promise
 * @param {Function} PromiseConstructor - Promise constructor to use
 * @returns {Promise} Promise resolving to stream value
 */
Stream.prototype.toPromise(PromiseConstructor);

Usage Examples:

// Single value to promise
_.of(42)
  .toPromise(Promise)
  .then(result => console.log('Result:', result)) // Result: 42
  .catch(err => console.error('Error:', err));

// Error handling
_.fromError(new Error('Failed'))
  .toPromise(Promise)
  .then(result => console.log('Result:', result))
  .catch(err => console.error('Caught:', err.message)); // Caught: Failed

// Empty stream resolves to undefined
_([])
  .toPromise(Promise)
  .then(result => console.log('Result:', result)); // Result: undefined

// Async/await usage
async function processStream() {
  try {
    const result = await _([1, 2, 3]).collect().toPromise(Promise);
    console.log('Collected:', result); // Collected: [1, 2, 3]
  } catch (err) {
    console.error('Error:', err);
  }
}

Node Stream Conversion

Convert Highland streams to Node.js readable streams.

/**
 * Convert to Node.js Readable stream
 * @param {Object} options - Readable stream options
 * @returns {Readable} Node.js Readable stream
 */
Stream.prototype.toNodeStream(options);

Usage Examples:

const fs = require('fs');

// Convert to Node stream
const highlandStream = _(['line 1\n', 'line 2\n', 'line 3\n']);
const nodeStream = highlandStream.toNodeStream();

// Pipe to file
nodeStream.pipe(fs.createWriteStream('output.txt'));

// Object mode
const objectStream = _([{id: 1}, {id: 2}, {id: 3}])
  .toNodeStream({objectMode: true});

objectStream.on('data', obj => console.log('Object:', obj));

Piping to Node Streams

Pipe Highland streams to Node.js writable streams.

/**
 * Pipe to Node.js writable stream
 * @param {Writable} dest - Destination writable stream
 * @param {Object} options - Pipe options
 * @returns {Writable} The destination stream
 */
Stream.prototype.pipe(dest, options);

Usage Examples:

const fs = require('fs');

// Pipe to file
_(['Hello', ' ', 'World', '!'])
  .pipe(fs.createWriteStream('greeting.txt'));

// Pipe with options
_(['line1\n', 'line2\n'])
  .pipe(process.stdout, { end: false }); // Don't end stdout

// Chain pipes
_([1, 2, 3, 4])
  .map(x => x + '\n')
  .pipe(fs.createWriteStream('numbers.txt'))
  .on('finish', () => console.log('Writing complete'));

Reduction Operations

Reduce streams to single values.

/**
 * Reduce stream to single value with initial accumulator
 * @param {*} memo - Initial accumulator value
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream with single reduced value
 */
Stream.prototype.reduce(memo, reducer);

/**
 * Reduce using first value as initial accumulator
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream with single reduced value
 */
Stream.prototype.reduce1(reducer);

/**
 * Like reduce but emits intermediate values
 * @param {*} memo - Initial accumulator value
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream of intermediate values
 */
Stream.prototype.scan(memo, reducer);

/**
 * Scan using first value as initial accumulator
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream of intermediate values
 */
Stream.prototype.scan1(reducer);

Usage Examples:

// Basic reduction
_([1, 2, 3, 4])
  .reduce(0, (sum, x) => sum + x)
  .toArray(console.log); // [10]

// Reduce without initial value
_([1, 2, 3, 4])
  .reduce1((sum, x) => sum + x)
  .toArray(console.log); // [10]

// Scan shows intermediate values
_([1, 2, 3, 4])
  .scan(0, (sum, x) => sum + x)
  .toArray(console.log); // [0, 1, 3, 6, 10]

// Scan without initial value
_([1, 2, 3, 4])
  .scan1((sum, x) => sum + x)
  .toArray(console.log); // [1, 3, 6, 10]

// Complex reduction - find maximum
_([{name: 'Alice', score: 85}, {name: 'Bob', score: 92}, {name: 'Charlie', score: 78}])
  .reduce1((max, current) => current.score > max.score ? current : max)
  .toArray(result => console.log('Winner:', result[0].name)); // Winner: Bob

Advanced Consumption Patterns

Stream Processing Pipelines

Combine consumption with transformation for complete data processing:

// File processing pipeline
const fs = require('fs');
const _ = require('highland');

_(fs.createReadStream('data.csv'))
  .split() // Split by lines
  .drop(1) // Skip header
  .map(line => line.split(',')) // Parse CSV
  .filter(row => row.length === 3) // Valid rows only
  .map(([name, age, city]) => ({ name, age: parseInt(age), city }))
  .filter(person => person.age >= 18) // Adults only
  .group('city') // Group by city
  .toArray(results => {
    console.log('Adults by city:', results);
  });

Error Handling in Consumption

// Robust error handling
_([1, 2, 'invalid', 4, 5])
  .map(x => {
    if (typeof x !== 'number') throw new Error('Invalid number');
    return x * 2;
  })
  .errors((err, push) => {
    console.log('Skipping invalid value:', err.message);
    // Don't push anything, effectively filtering out the error
  })
  .toArray(results => {
    console.log('Valid results:', results); // [2, 4, 8, 10]
  });

Async Consumption with Promises

// Process stream asynchronously
async function processData() {
  const data = await _([1, 2, 3, 4, 5])
    .map(x => x * 2)
    .filter(x => x > 5)
    .collect()
    .toPromise(Promise);
    
  console.log('Processed data:', data); // [6, 8, 10]
  
  // Further async processing
  const sum = data.reduce((a, b) => a + b, 0);
  return sum;
}

processData().then(sum => console.log('Sum:', sum)); // Sum: 24