or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mdindex.mdstream-combination.mdstream-creation.mdstream-transformation.mdutility-functions.md
tile.json

stream-transformation.mddocs/

Stream Transformation

Functions for transforming stream data using synchronous and asynchronous operations with built-in error handling and flow control.

const es = require('event-stream');

Capabilities

Map (Asynchronous)

Creates a transform stream from an asynchronous function that processes each data item with callback-based error handling.

/**
 * Create a transform stream from an async function
 * @param {Function} asyncFunction - Function called with (data, callback)
 * @returns {TransformStream} Stream that transforms data asynchronously
 */
function map(asyncFunction);

The callback function must be called with one of:

  • callback() - Drop this data (acts like filter)
  • callback(null, newData) - Transform data to newData
  • callback(error) - Emit an error for this item

Usage Examples:

const es = require('event-stream');

// Async transformation
const asyncTransformer = es.map(function(data, callback) {
  // Simulate async operation
  setTimeout(() => {
    const result = data.toString().toUpperCase();
    callback(null, result);
  }, 10);
});

es.readArray(['hello', 'world'])
  .pipe(asyncTransformer)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['HELLO', 'WORLD']
  }));

// With error handling
const errorHandler = es.map(function(data, callback) {
  if (typeof data !== 'string') {
    return callback(new Error('Expected string'));
  }
  callback(null, data.length);
});

// Filter by not calling callback
const evenFilter = es.map(function(number, callback) {
  if (number % 2 === 0) {
    callback(null, number); // Keep even numbers
  } else {
    callback(); // Drop odd numbers
  }
});

Map Sync

Creates a transform stream using a synchronous function that processes data immediately with automatic error handling.

/**
 * Create a transform stream with synchronous transformation
 * @param {Function} syncFunction - Function that transforms data synchronously
 * @returns {TransformStream} Stream that transforms data synchronously
 */
function mapSync(syncFunction);

Usage Examples:

const es = require('event-stream');

// Synchronous transformation
const doubler = es.mapSync(function(data) {
  return data * 2;
});

es.readArray([1, 2, 3, 4])
  .pipe(doubler)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [2, 4, 6, 8]
  }));

// Object transformation
const userFormatter = es.mapSync(function(user) {
  return {
    id: user.id,
    name: user.name.toUpperCase(),
    active: true
  };
});

// Return undefined to filter out data
const positiveOnly = es.mapSync(function(number) {
  return number > 0 ? number : undefined;
});

Filter Sync

Creates a transform stream that filters data based on a synchronous predicate function.

/**
 * Filter stream data based on a test function
 * @param {Function} testFunction - Function that returns boolean for each item
 * @returns {TransformStream} Stream that only emits items passing the test
 */
function filterSync(testFunction);

Usage Examples:

const es = require('event-stream');

// Filter numbers
const evenNumbers = es.filterSync(function(number) {
  return number % 2 === 0;
});

es.readArray([1, 2, 3, 4, 5, 6])
  .pipe(evenNumbers)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [2, 4, 6]
  }));

// Filter objects
const activeUsers = es.filterSync(function(user) {
  return user.active === true;
});

const users = [
  { name: 'Alice', active: true },
  { name: 'Bob', active: false },
  { name: 'Charlie', active: true }
];

es.readArray(users)
  .pipe(activeUsers)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [{ name: 'Alice', active: true }, { name: 'Charlie', active: true }]
  }));

Flatmap Sync

Creates a transform stream that applies a mapping function to nested data and flattens the results.

/**
 * Map and flatten nested data structures
 * @param {Function} mapperFunction - Function that maps each array element
 * @returns {TransformStream} Stream that flattens mapped results
 */
function flatmapSync(mapperFunction);

Usage Examples:

const es = require('event-stream');

// Flatten arrays of numbers
const multiplier = es.flatmapSync(function(number) {
  return number * 2;
});

// Input: array of arrays, each element gets mapped
es.readArray([[1, 2], [3, 4]])
  .pipe(multiplier)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [2, 4, 6, 8]
  }));

// Transform object properties
const propertyExtractor = es.flatmapSync(function(obj) {
  return obj.name + '_processed';
});

es.readArray([
  [{ name: 'item1' }, { name: 'item2' }],
  [{ name: 'item3' }]
])
  .pipe(propertyExtractor)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['item1_processed', 'item2_processed', 'item3_processed']
  }));

Split

Breaks up a stream by a delimiter and reassembles it so that each split part becomes a separate chunk.

/**
 * Split stream data by delimiter (string or regex)
 * @param {string|RegExp} matcher - Delimiter to split on (defaults to '\n')
 * @returns {TransformStream} Stream that emits split chunks
 */
function split(matcher);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Split on newlines (default)
fs.createReadStream('file.txt')
  .pipe(es.split())
  .pipe(es.map(function(line, cb) {
    console.log('Line:', line);
    cb(null, line);
  }));

// Split on custom delimiter
const csvSplitter = es.split(',');
es.readArray(['a,b,c', 'd,e,f'])
  .pipe(csvSplitter)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['a', 'b', 'c', 'd', 'e', 'f']
  }));

// Keep line breaks using regex
fs.createReadStream('file.txt')
  .pipe(es.split(/(\r?\n)/))
  .pipe(es.map(function(chunk, cb) {
    // Chunks will include line breaks
    cb(null, chunk);
  }));

// Split on regex pattern
const wordSplitter = es.split(/\s+/);

Error Handling

All transformation functions include built-in error handling:

  • mapSync: Catches exceptions and emits error events
  • map: Supports error-first callback pattern
  • filterSync: Catches exceptions in predicate functions
  • flatmapSync: Handles errors in mapping functions
  • split: Handles regex and string matching errors

Streams continue processing after errors unless explicitly destroyed.