CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-highland

Highland is a comprehensive high-level streams library for Node.js and browsers that unifies synchronous and asynchronous data processing through a single, powerful abstraction.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

consumption.mddocs/

Consumption

Methods for consuming stream data and converting to other formats including arrays, promises, and Node streams.

Capabilities

Basic Consumption

Core methods for consuming stream values.

/**
 * Iterate over each value in the stream
 * @param {Function} iterator - Function to call with each value
 * @returns {Stream} Stream for chaining (only with done())
 */
Stream.prototype.each(iterator);

/**
 * Call function when stream ends
 * @param {Function} callback - Function to call on completion
 * @returns {void}
 */
Stream.prototype.done(callback);

Usage Examples:

// Basic iteration
_([1, 2, 3, 4]).each(x => console.log(x));
// Logs: 1, 2, 3, 4

// Chaining with done
let total = 0;
_([1, 2, 3, 4])
  .each(x => total += x)
  .done(() => console.log('Total:', total));
// Logs: Total: 10

Array Conversion

Convert streams to arrays.

/**
 * Collect all values into an array
 * @param {Function} callback - Function to call with completed array
 * @returns {void}
 */
Stream.prototype.toArray(callback);

/**
 * Collect all values into an array and pass down stream
 * @returns {Stream} Stream containing single array
 */
Stream.prototype.collect();

Usage Examples:

// Convert to array
_([1, 2, 3]).toArray(arr => {
  console.log(arr); // [1, 2, 3]
  console.log('Length:', arr.length); // Length: 3
});

// Collect for further processing
_([1, 2, 3])
  .collect()
  .map(arr => arr.length)
  .toArray(console.log); // [3]

Function Application

Apply stream values as function arguments.

/**
 * Apply all stream values as arguments to a function
 * @param {Function} fn - Function to apply arguments to
 * @returns {void}
 */
Stream.prototype.apply(fn);

Usage Examples:

// Apply values as arguments
_([1, 2, 3]).apply((a, b, c) => {
  console.log('Arguments:', a, b, c); // Arguments: 1 2 3
  console.log('Sum:', a + b + c); // Sum: 6
});

// Apply with Math functions
_([1, 5, 3, 9, 2]).apply(Math.max); // Logs: 9

Callback Conversion

Convert streams to Node.js style callbacks.

/**
 * Convert single-value stream to Node.js style callback
 * @param {Function} callback - (err, result) => void
 * @returns {void}
 */
Stream.prototype.toCallback(callback);

Usage Examples:

// Single value to callback
_.of(42).toCallback((err, result) => {
  if (err) {
    console.error('Error:', err);
  } else {
    console.log('Result:', result); // Result: 42
  }
});

// Error handling
_.fromError(new Error('Something failed')).toCallback((err, result) => {
  console.error('Caught:', err.message); // Caught: Something failed
});

// Multiple values cause error
_([1, 2, 3]).toCallback((err, result) => {
  console.error('Error:', err.message); // Error: toCallback called on stream emitting multiple values
});

Promise Conversion

Convert streams to promises.

/**
 * Convert single-value stream to Promise
 * @param {Function} PromiseConstructor - Promise constructor to use
 * @returns {Promise} Promise resolving to stream value
 */
Stream.prototype.toPromise(PromiseConstructor);

Usage Examples:

// Single value to promise
_.of(42)
  .toPromise(Promise)
  .then(result => console.log('Result:', result)) // Result: 42
  .catch(err => console.error('Error:', err));

// Error handling
_.fromError(new Error('Failed'))
  .toPromise(Promise)
  .then(result => console.log('Result:', result))
  .catch(err => console.error('Caught:', err.message)); // Caught: Failed

// Empty stream resolves to undefined
_([])
  .toPromise(Promise)
  .then(result => console.log('Result:', result)); // Result: undefined

// Async/await usage
async function processStream() {
  try {
    const result = await _([1, 2, 3]).collect().toPromise(Promise);
    console.log('Collected:', result); // Collected: [1, 2, 3]
  } catch (err) {
    console.error('Error:', err);
  }
}

Node Stream Conversion

Convert Highland streams to Node.js readable streams.

/**
 * Convert to Node.js Readable stream
 * @param {Object} options - Readable stream options
 * @returns {Readable} Node.js Readable stream
 */
Stream.prototype.toNodeStream(options);

Usage Examples:

const fs = require('fs');

// Convert to Node stream
const highlandStream = _(['line 1\n', 'line 2\n', 'line 3\n']);
const nodeStream = highlandStream.toNodeStream();

// Pipe to file
nodeStream.pipe(fs.createWriteStream('output.txt'));

// Object mode
const objectStream = _([{id: 1}, {id: 2}, {id: 3}])
  .toNodeStream({objectMode: true});

objectStream.on('data', obj => console.log('Object:', obj));

Piping to Node Streams

Pipe Highland streams to Node.js writable streams.

/**
 * Pipe to Node.js writable stream
 * @param {Writable} dest - Destination writable stream
 * @param {Object} options - Pipe options
 * @returns {Writable} The destination stream
 */
Stream.prototype.pipe(dest, options);

Usage Examples:

const fs = require('fs');

// Pipe to file
_(['Hello', ' ', 'World', '!'])
  .pipe(fs.createWriteStream('greeting.txt'));

// Pipe with options
_(['line1\n', 'line2\n'])
  .pipe(process.stdout, { end: false }); // Don't end stdout

// Chain pipes
_([1, 2, 3, 4])
  .map(x => x + '\n')
  .pipe(fs.createWriteStream('numbers.txt'))
  .on('finish', () => console.log('Writing complete'));

Reduction Operations

Reduce streams to single values.

/**
 * Reduce stream to single value with initial accumulator
 * @param {*} memo - Initial accumulator value
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream with single reduced value
 */
Stream.prototype.reduce(memo, reducer);

/**
 * Reduce using first value as initial accumulator
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream with single reduced value
 */
Stream.prototype.reduce1(reducer);

/**
 * Like reduce but emits intermediate values
 * @param {*} memo - Initial accumulator value
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream of intermediate values
 */
Stream.prototype.scan(memo, reducer);

/**
 * Scan using first value as initial accumulator
 * @param {Function} reducer - (accumulator, value) => newAccumulator
 * @returns {Stream} Stream of intermediate values
 */
Stream.prototype.scan1(reducer);

Usage Examples:

// Basic reduction
_([1, 2, 3, 4])
  .reduce(0, (sum, x) => sum + x)
  .toArray(console.log); // [10]

// Reduce without initial value
_([1, 2, 3, 4])
  .reduce1((sum, x) => sum + x)
  .toArray(console.log); // [10]

// Scan shows intermediate values
_([1, 2, 3, 4])
  .scan(0, (sum, x) => sum + x)
  .toArray(console.log); // [0, 1, 3, 6, 10]

// Scan without initial value
_([1, 2, 3, 4])
  .scan1((sum, x) => sum + x)
  .toArray(console.log); // [1, 3, 6, 10]

// Complex reduction - find maximum
_([{name: 'Alice', score: 85}, {name: 'Bob', score: 92}, {name: 'Charlie', score: 78}])
  .reduce1((max, current) => current.score > max.score ? current : max)
  .toArray(result => console.log('Winner:', result[0].name)); // Winner: Bob

Advanced Consumption Patterns

Stream Processing Pipelines

Combine consumption with transformation for complete data processing:

// File processing pipeline
const fs = require('fs');
const _ = require('highland');

_(fs.createReadStream('data.csv'))
  .split() // Split by lines
  .drop(1) // Skip header
  .map(line => line.split(',')) // Parse CSV
  .filter(row => row.length === 3) // Valid rows only
  .map(([name, age, city]) => ({ name, age: parseInt(age), city }))
  .filter(person => person.age >= 18) // Adults only
  .group('city') // Group by city
  .toArray(results => {
    console.log('Adults by city:', results);
  });

Error Handling in Consumption

// Robust error handling
_([1, 2, 'invalid', 4, 5])
  .map(x => {
    if (typeof x !== 'number') throw new Error('Invalid number');
    return x * 2;
  })
  .errors((err, push) => {
    console.log('Skipping invalid value:', err.message);
    // Don't push anything, effectively filtering out the error
  })
  .toArray(results => {
    console.log('Valid results:', results); // [2, 4, 8, 10]
  });

Async Consumption with Promises

// Process stream asynchronously
async function processData() {
  const data = await _([1, 2, 3, 4, 5])
    .map(x => x * 2)
    .filter(x => x > 5)
    .collect()
    .toPromise(Promise);
    
  console.log('Processed data:', data); // [6, 8, 10]
  
  // Further async processing
  const sum = data.reduce((a, b) => a + b, 0);
  return sum;
}

processData().then(sum => console.log('Sum:', sum)); // Sum: 24

docs

advanced-transforms.md

basic-transforms.md

consumption.md

flow-control.md

functional-programming.md

higher-order-streams.md

index.md

stream-creation.md

tile.json