or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mdindex.mdstream-combination.mdstream-creation.mdstream-transformation.mdutility-functions.md
tile.json

data-processing.mddocs/

Data Processing

Specialized functions for parsing JSON, stringifying objects, text replacement, and data collection with robust error handling and format conversion.

const es = require('event-stream');

Capabilities

Parse

Parses JSON chunks in a stream, typically used after splitting data into lines. Provides configurable error handling for malformed JSON.

/**
 * Parse JSON chunks with configurable error handling
 * @param {Object} options - Configuration options
 * @param {boolean} options.error - If true, emit error events; if false, log to console.error
 * @returns {TransformStream} Stream that parses JSON data
 */
function parse(options);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Parse JSON lines
fs.createReadStream('data.jsonl')
  .pipe(es.split())
  .pipe(es.parse())
  .pipe(es.map(function(obj, cb) {
    console.log('Parsed object:', obj);
    cb(null, obj);
  }));

// With error emission
const errorParser = es.parse({ error: true });
errorParser.on('error', function(err) {
  console.error('Parse error:', err.message);
});

es.readArray(['{"valid": true}', 'invalid json', '{"also": "valid"}'])
  .pipe(errorParser)
  .pipe(es.writeArray(function(err, result) {
    console.log('Valid objects:', result);
  }));

// Default error logging (to console.error)
es.readArray(['{"name": "Alice"}', 'broken', '{"name": "Bob"}'])
  .pipe(es.parse()) // Logs errors but continues processing
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [{"name": "Alice"}, {"name": "Bob"}]
  }));

// Empty lines are ignored
const jsonData = [
  '{"id": 1}',
  '',  // Empty line - ignored
  '   ', // Whitespace only - ignored  
  '{"id": 2}'
];

Stringify

Converts JavaScript objects to JSON strings with newlines, making them compatible with parse() and line-based processing.

/**
 * Convert objects to JSON strings with newlines
 * @returns {TransformStream} Stream that stringifies objects
 */
function stringify();

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Stringify objects to file
const objects = [
  { name: 'Alice', age: 30 },
  { name: 'Bob', age: 25 },
  { name: 'Charlie', age: 35 }
];

es.readArray(objects)
  .pipe(es.stringify())
  .pipe(fs.createWriteStream('output.jsonl'));

// Round-trip with parse
es.readArray(objects)
  .pipe(es.stringify())
  .pipe(es.split())
  .pipe(es.parse())
  .pipe(es.writeArray(function(err, result) {
    console.log('Round-trip result:', result);
  }));

// Handle Buffer objects
const bufferData = [
  Buffer.from('hello'),
  { text: 'world' },
  Buffer.from('buffer data')
];

es.readArray(bufferData)
  .pipe(es.stringify())
  .pipe(es.writeArray(function(err, result) {
    // Buffers are converted to strings before JSON.stringify
    console.log(result);
  }));

Replace

Replaces all occurrences of a string or regular expression with a replacement string, working like string.split().join() but for streams.

/**
 * Replace occurrences of text in stream data
 * @param {string|RegExp} from - Text or pattern to replace
 * @param {string} to - Replacement text
 * @returns {TransformStream} Stream that performs text replacement
 */
function replace(from, to);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Simple string replacement
fs.createReadStream('template.txt')
  .pipe(es.replace('{{NAME}}', 'Alice'))
  .pipe(es.replace('{{AGE}}', '30'))
  .pipe(fs.createWriteStream('output.txt'));

// Regular expression replacement
const htmlEscaper = es.replace(/[<>&"]/g, function(match) {
  const escapes = {
    '<': '&lt;',
    '>': '&gt;',
    '&': '&amp;',
    '"': '&quot;'
  };
  return escapes[match];
});

es.readArray(['<script>alert("hello")</script>'])
  .pipe(htmlEscaper)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['&lt;script&gt;alert(&quot;hello&quot;)&lt;/script&gt;']
  }));

// Multiple replacements in pipeline
const textProcessor = es.pipeline(
  es.replace(/\s+/g, ' '),      // Normalize whitespace
  es.replace(/^\s+|\s+$/g, ''), // Trim
  es.replace(/old/g, 'new')     // Replace content
);

// Case-insensitive replacement
const caseInsensitive = es.replace(/hello/gi, 'hi');

Join

Joins stream chunks with a separator, similar to Array.join(). Also provides legacy callback-based functionality.

/**
 * Join stream chunks with separator
 * @param {string} separator - String to insert between chunks
 * @returns {TransformStream} Stream that joins chunks
 */
function join(separator);

/**
 * Legacy: wait for all data and call callback (same as wait())
 * @param {Function} callback - Function called with complete data
 * @returns {TransformStream} Stream that collects and calls callback
 */
function join(callback);

Usage Examples:

const es = require('event-stream');

// Join with separator
es.readArray(['apple', 'banana', 'cherry'])
  .pipe(es.join(', '))
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['apple, banana, cherry']
  }));

// Join lines with newlines
es.readArray(['line 1', 'line 2', 'line 3'])
  .pipe(es.join('\n'))
  .pipe(process.stdout);

// Legacy callback usage (equivalent to wait())
es.readArray(['hello', ' ', 'world'])
  .pipe(es.join(function(err, result) {
    console.log('Complete text:', result); // 'hello world'
  }));

// Use in text processing pipeline
const csvFormatter = es.pipeline(
  es.split(','),           // Split CSV fields
  es.mapSync(field => field.trim()), // Clean fields
  es.join(' | ')           // Join with pipe separator
);

Wait

Waits for the stream to end and joins all chunks into a single string or buffer, with optional callback for the complete data.

/**
 * Collect all stream chunks into single string/buffer
 * @param {Function} callback - Optional callback called with complete data
 * @returns {TransformStream} Stream that emits single data event with all chunks
 */
function wait(callback);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Read entire file as single string
fs.createReadStream('document.txt')
  .pipe(es.wait(function(err, body) {
    if (err) throw err;
    console.log('File contents:', body);
  }));

// Collect transformed data
es.readArray(['hello', ' ', 'world', '!'])
  .pipe(es.mapSync(chunk => chunk.toUpperCase()))
  .pipe(es.wait(function(err, result) {
    console.log(result); // 'HELLO WORLD!'
  }));

// Buffer handling
const binaryData = [
  Buffer.from('hello'),
  Buffer.from(' '),
  Buffer.from('world')
];

es.readArray(binaryData)
  .pipe(es.wait(function(err, buffer) {
    console.log(buffer.toString()); // 'hello world'
    console.log('Buffer length:', buffer.length);
  }));

// Without callback - emits single data event
es.readArray(['a', 'b', 'c'])
  .pipe(es.wait())
  .on('data', function(completeData) {
    console.log('All data:', completeData); // 'abc'
  });

Data Format Handling

The data processing functions handle various data formats:

  • parse(): Handles JSON strings, ignores empty/whitespace lines
  • stringify(): Converts objects to JSON, handles Buffer objects by converting to strings
  • replace(): Works with strings and Buffer objects
  • join(): Handles any data types, converts to strings when joining
  • wait(): Preserves data types (strings remain strings, Buffers are concatenated as Buffers)

Error Handling Strategies

Each function provides different error handling approaches:

  • parse(): Configurable - either emit errors or log to console
  • stringify(): Handles serialization errors for non-JSON data
  • replace(): Handles regex and string matching errors
  • join(): Robust concatenation with type handling
  • wait(): Collects data even if individual chunks have issues