CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-event-stream

Construct pipes of streams of events with functional programming patterns for Node.js

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

stream-creation.mddocs/

Stream Creation

Functions for creating readable and writable streams from various data sources with proper pause/resume support and lifecycle management.

const es = require('event-stream');

Capabilities

Read Array

Creates a readable stream from an array, emitting each item as a data event while respecting pause and resume.

/**
 * Create a readable stream from an array
 * @param {Array} array - Array of items to emit as stream data
 * @returns {ReadableStream} Stream that emits array items
 * @throws {Error} If array parameter is not an array
 */
function readArray(array);

Usage Examples:

const es = require('event-stream');

// Create stream from array
const numberStream = es.readArray([1, 2, 3, 4, 5]);
numberStream.pipe(es.writeArray(function(err, result) {
  console.log(result); // [1, 2, 3, 4, 5]
}));

// With objects
const users = [
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' }
];
es.readArray(users)
  .pipe(es.map(function(user, cb) {
    cb(null, user.name.toUpperCase());
  }))
  .pipe(es.writeArray(function(err, names) {
    console.log(names); // ['ALICE', 'BOB']
  }));

Write Array / Collect

Creates a writable stream that collects all data events into an array and calls a callback when the stream ends.

/**
 * Create a writable stream that collects data into an array
 * @param {Function} callback - Called with (error, array) when stream ends
 * @returns {WritableStream} Stream that collects all written data
 * @throws {Error} If callback is not a function
 */
function writeArray(callback);

// Alias for writeArray
function collect(callback);

Usage Examples:

const es = require('event-stream');

// Collect stream data
const collector = es.writeArray(function(err, array) {
  if (err) throw err;
  console.log('Collected:', array);
});

// Write data to collector
collector.write('hello');
collector.write('world');
collector.end(); // Triggers callback with ['hello', 'world']

// Use with pipe
es.readArray(['a', 'b', 'c'])
  .pipe(es.collect(function(err, result) {
    console.log(result); // ['a', 'b', 'c']
  }));

Readable

Creates a readable stream that calls an async function repeatedly while the stream is not paused, allowing for dynamic data generation.

/**
 * Create a readable stream from an async function
 * @param {Function} func - Function called with (count, callback)
 * @param {boolean} continueOnError - Whether to continue on errors (optional)
 * @returns {ReadableStream} Stream that calls function for data
 * @throws {Error} If func is not a function
 */
function readable(func, continueOnError);

Usage Examples:

const es = require('event-stream');

// Generate sequential numbers
const numberGenerator = es.readable(function(count, callback) {
  if (count >= 5) {
    return this.emit('end'); // Stop after 5 items
  }
  
  // Simulate async operation
  setTimeout(() => {
    callback(null, count * 2); // Emit doubled count
  }, 100);
});

numberGenerator.pipe(es.writeArray(function(err, result) {
  console.log(result); // [0, 2, 4, 6, 8]
}));

// With error handling
const errorProneStream = es.readable(function(count, callback) {
  if (count === 3) {
    return callback(new Error('Test error'));
  }
  if (count >= 5) {
    return this.emit('end');
  }
  callback(null, count);
}, true); // Continue on error

errorProneStream.on('error', function(err) {
  console.log('Error caught:', err.message);
});

Through

Creates a transform stream with optional write and end functions, providing the foundation for most synchronous streams in event-stream.

/**
 * Create a through stream with custom write and end handlers
 * @param {Function} writeFunction - Called for each data chunk with (data)
 * @param {Function} endFunction - Called when stream ends
 * @returns {TransformStream} Through stream with custom handlers
 */
function through(writeFunction, endFunction);

Usage Examples:

const es = require('event-stream');

// Simple pass-through
const passThrough = es.through(function(data) {
  this.emit('data', data); // Re-emit data
});

// Transform data
const doubler = es.through(
  function write(data) {
    this.emit('data', data * 2);
  },
  function end() {
    this.emit('end');
  }
);

es.readArray([1, 2, 3])
  .pipe(doubler)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [2, 4, 6]
  }));

// With flow control
const pauser = es.through(function(data) {
  if (data > 3) {
    this.pause(); // Pause stream
    setTimeout(() => this.resume(), 1000); // Resume after delay
  }
  this.emit('data', data);
});

Stream Properties

All created streams have standard Node.js stream properties and methods:

  • readable/writable: Boolean flags indicating stream state
  • pause(): Pause the stream
  • resume(): Resume a paused stream
  • destroy(): Destroy the stream and clean up resources
  • pipe(): Connect to another stream
  • on(event, handler): Listen for stream events (data, end, error, close)

docs

data-processing.md

index.md

stream-combination.md

stream-creation.md

stream-transformation.md

utility-functions.md

tile.json