or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mdindex.mdstream-combination.mdstream-creation.mdstream-transformation.mdutility-functions.md
tile.json

stream-creation.mddocs/

Stream Creation

Functions for creating readable and writable streams from various data sources with proper pause/resume support and lifecycle management.

const es = require('event-stream');

Capabilities

Read Array

Creates a readable stream from an array, emitting each item as a data event while respecting pause and resume.

/**
 * Create a readable stream from an array
 * @param {Array} array - Array of items to emit as stream data
 * @returns {ReadableStream} Stream that emits array items
 * @throws {Error} If array parameter is not an array
 */
function readArray(array);

Usage Examples:

const es = require('event-stream');

// Create stream from array
const numberStream = es.readArray([1, 2, 3, 4, 5]);
numberStream.pipe(es.writeArray(function(err, result) {
  console.log(result); // [1, 2, 3, 4, 5]
}));

// With objects
const users = [
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' }
];
es.readArray(users)
  .pipe(es.map(function(user, cb) {
    cb(null, user.name.toUpperCase());
  }))
  .pipe(es.writeArray(function(err, names) {
    console.log(names); // ['ALICE', 'BOB']
  }));

Write Array / Collect

Creates a writable stream that collects all data events into an array and calls a callback when the stream ends.

/**
 * Create a writable stream that collects data into an array
 * @param {Function} callback - Called with (error, array) when stream ends
 * @returns {WritableStream} Stream that collects all written data
 * @throws {Error} If callback is not a function
 */
function writeArray(callback);

// Alias for writeArray
function collect(callback);

Usage Examples:

const es = require('event-stream');

// Collect stream data
const collector = es.writeArray(function(err, array) {
  if (err) throw err;
  console.log('Collected:', array);
});

// Write data to collector
collector.write('hello');
collector.write('world');
collector.end(); // Triggers callback with ['hello', 'world']

// Use with pipe
es.readArray(['a', 'b', 'c'])
  .pipe(es.collect(function(err, result) {
    console.log(result); // ['a', 'b', 'c']
  }));

Readable

Creates a readable stream that calls an async function repeatedly while the stream is not paused, allowing for dynamic data generation.

/**
 * Create a readable stream from an async function
 * @param {Function} func - Function called with (count, callback)
 * @param {boolean} continueOnError - Whether to continue on errors (optional)
 * @returns {ReadableStream} Stream that calls function for data
 * @throws {Error} If func is not a function
 */
function readable(func, continueOnError);

Usage Examples:

const es = require('event-stream');

// Generate sequential numbers
const numberGenerator = es.readable(function(count, callback) {
  if (count >= 5) {
    return this.emit('end'); // Stop after 5 items
  }
  
  // Simulate async operation
  setTimeout(() => {
    callback(null, count * 2); // Emit doubled count
  }, 100);
});

numberGenerator.pipe(es.writeArray(function(err, result) {
  console.log(result); // [0, 2, 4, 6, 8]
}));

// With error handling
const errorProneStream = es.readable(function(count, callback) {
  if (count === 3) {
    return callback(new Error('Test error'));
  }
  if (count >= 5) {
    return this.emit('end');
  }
  callback(null, count);
}, true); // Continue on error

errorProneStream.on('error', function(err) {
  console.log('Error caught:', err.message);
});

Through

Creates a transform stream with optional write and end functions, providing the foundation for most synchronous streams in event-stream.

/**
 * Create a through stream with custom write and end handlers
 * @param {Function} writeFunction - Called for each data chunk with (data)
 * @param {Function} endFunction - Called when stream ends
 * @returns {TransformStream} Through stream with custom handlers
 */
function through(writeFunction, endFunction);

Usage Examples:

const es = require('event-stream');

// Simple pass-through
const passThrough = es.through(function(data) {
  this.emit('data', data); // Re-emit data
});

// Transform data
const doubler = es.through(
  function write(data) {
    this.emit('data', data * 2);
  },
  function end() {
    this.emit('end');
  }
);

es.readArray([1, 2, 3])
  .pipe(doubler)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [2, 4, 6]
  }));

// With flow control
const pauser = es.through(function(data) {
  if (data > 3) {
    this.pause(); // Pause stream
    setTimeout(() => this.resume(), 1000); // Resume after delay
  }
  this.emit('data', data);
});

Stream Properties

All created streams have standard Node.js stream properties and methods:

  • readable/writable: Boolean flags indicating stream state
  • pause(): Pause the stream
  • resume(): Resume a paused stream
  • destroy(): Destroy the stream and clean up resources
  • pipe(): Connect to another stream
  • on(event, handler): Listen for stream events (data, end, error, close)