CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-event-stream

Construct pipes of streams of events with functional programming patterns for Node.js

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

data-processing.mddocs/

Data Processing

Specialized functions for parsing JSON, stringifying objects, text replacement, and data collection with robust error handling and format conversion.

const es = require('event-stream');

Capabilities

Parse

Parses JSON chunks in a stream, typically used after splitting data into lines. Provides configurable error handling for malformed JSON.

/**
 * Parse JSON chunks with configurable error handling
 * @param {Object} options - Configuration options
 * @param {boolean} options.error - If true, emit error events; if false, log to console.error
 * @returns {TransformStream} Stream that parses JSON data
 */
function parse(options);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Parse JSON lines
fs.createReadStream('data.jsonl')
  .pipe(es.split())
  .pipe(es.parse())
  .pipe(es.map(function(obj, cb) {
    console.log('Parsed object:', obj);
    cb(null, obj);
  }));

// With error emission
const errorParser = es.parse({ error: true });
errorParser.on('error', function(err) {
  console.error('Parse error:', err.message);
});

es.readArray(['{"valid": true}', 'invalid json', '{"also": "valid"}'])
  .pipe(errorParser)
  .pipe(es.writeArray(function(err, result) {
    console.log('Valid objects:', result);
  }));

// Default error logging (to console.error)
es.readArray(['{"name": "Alice"}', 'broken', '{"name": "Bob"}'])
  .pipe(es.parse()) // Logs errors but continues processing
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // [{"name": "Alice"}, {"name": "Bob"}]
  }));

// Empty lines are ignored
const jsonData = [
  '{"id": 1}',
  '',  // Empty line - ignored
  '   ', // Whitespace only - ignored  
  '{"id": 2}'
];

Stringify

Converts JavaScript objects to JSON strings with newlines, making them compatible with parse() and line-based processing.

/**
 * Convert objects to JSON strings with newlines
 * @returns {TransformStream} Stream that stringifies objects
 */
function stringify();

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Stringify objects to file
const objects = [
  { name: 'Alice', age: 30 },
  { name: 'Bob', age: 25 },
  { name: 'Charlie', age: 35 }
];

es.readArray(objects)
  .pipe(es.stringify())
  .pipe(fs.createWriteStream('output.jsonl'));

// Round-trip with parse
es.readArray(objects)
  .pipe(es.stringify())
  .pipe(es.split())
  .pipe(es.parse())
  .pipe(es.writeArray(function(err, result) {
    console.log('Round-trip result:', result);
  }));

// Handle Buffer objects
const bufferData = [
  Buffer.from('hello'),
  { text: 'world' },
  Buffer.from('buffer data')
];

es.readArray(bufferData)
  .pipe(es.stringify())
  .pipe(es.writeArray(function(err, result) {
    // Buffers are converted to strings before JSON.stringify
    console.log(result);
  }));

Replace

Replaces all occurrences of a string or regular expression with a replacement string, working like string.split().join() but for streams.

/**
 * Replace occurrences of text in stream data
 * @param {string|RegExp} from - Text or pattern to replace
 * @param {string} to - Replacement text
 * @returns {TransformStream} Stream that performs text replacement
 */
function replace(from, to);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Simple string replacement
fs.createReadStream('template.txt')
  .pipe(es.replace('{{NAME}}', 'Alice'))
  .pipe(es.replace('{{AGE}}', '30'))
  .pipe(fs.createWriteStream('output.txt'));

// Regular expression replacement
const htmlEscaper = es.replace(/[<>&"]/g, function(match) {
  const escapes = {
    '<': '&lt;',
    '>': '&gt;',
    '&': '&amp;',
    '"': '&quot;'
  };
  return escapes[match];
});

es.readArray(['<script>alert("hello")</script>'])
  .pipe(htmlEscaper)
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['&lt;script&gt;alert(&quot;hello&quot;)&lt;/script&gt;']
  }));

// Multiple replacements in pipeline
const textProcessor = es.pipeline(
  es.replace(/\s+/g, ' '),      // Normalize whitespace
  es.replace(/^\s+|\s+$/g, ''), // Trim
  es.replace(/old/g, 'new')     // Replace content
);

// Case-insensitive replacement
const caseInsensitive = es.replace(/hello/gi, 'hi');

Join

Joins stream chunks with a separator, similar to Array.join(). Also provides legacy callback-based functionality.

/**
 * Join stream chunks with separator
 * @param {string} separator - String to insert between chunks
 * @returns {TransformStream} Stream that joins chunks
 */
function join(separator);

/**
 * Legacy: wait for all data and call callback (same as wait())
 * @param {Function} callback - Function called with complete data
 * @returns {TransformStream} Stream that collects and calls callback
 */
function join(callback);

Usage Examples:

const es = require('event-stream');

// Join with separator
es.readArray(['apple', 'banana', 'cherry'])
  .pipe(es.join(', '))
  .pipe(es.writeArray(function(err, result) {
    console.log(result); // ['apple, banana, cherry']
  }));

// Join lines with newlines
es.readArray(['line 1', 'line 2', 'line 3'])
  .pipe(es.join('\n'))
  .pipe(process.stdout);

// Legacy callback usage (equivalent to wait())
es.readArray(['hello', ' ', 'world'])
  .pipe(es.join(function(err, result) {
    console.log('Complete text:', result); // 'hello world'
  }));

// Use in text processing pipeline
const csvFormatter = es.pipeline(
  es.split(','),           // Split CSV fields
  es.mapSync(field => field.trim()), // Clean fields
  es.join(' | ')           // Join with pipe separator
);

Wait

Waits for the stream to end and joins all chunks into a single string or buffer, with optional callback for the complete data.

/**
 * Collect all stream chunks into single string/buffer
 * @param {Function} callback - Optional callback called with complete data
 * @returns {TransformStream} Stream that emits single data event with all chunks
 */
function wait(callback);

Usage Examples:

const es = require('event-stream');
const fs = require('fs');

// Read entire file as single string
fs.createReadStream('document.txt')
  .pipe(es.wait(function(err, body) {
    if (err) throw err;
    console.log('File contents:', body);
  }));

// Collect transformed data
es.readArray(['hello', ' ', 'world', '!'])
  .pipe(es.mapSync(chunk => chunk.toUpperCase()))
  .pipe(es.wait(function(err, result) {
    console.log(result); // 'HELLO WORLD!'
  }));

// Buffer handling
const binaryData = [
  Buffer.from('hello'),
  Buffer.from(' '),
  Buffer.from('world')
];

es.readArray(binaryData)
  .pipe(es.wait(function(err, buffer) {
    console.log(buffer.toString()); // 'hello world'
    console.log('Buffer length:', buffer.length);
  }));

// Without callback - emits single data event
es.readArray(['a', 'b', 'c'])
  .pipe(es.wait())
  .on('data', function(completeData) {
    console.log('All data:', completeData); // 'abc'
  });

Data Format Handling

The data processing functions handle various data formats:

  • parse(): Handles JSON strings, ignores empty/whitespace lines
  • stringify(): Converts objects to JSON, handles Buffer objects by converting to strings
  • replace(): Works with strings and Buffer objects
  • join(): Handles any data types, converts to strings when joining
  • wait(): Preserves data types (strings remain strings, Buffers are concatenated as Buffers)

Error Handling Strategies

Each function provides different error handling approaches:

  • parse(): Configurable - either emit errors or log to console
  • stringify(): Handles serialization errors for non-JSON data
  • replace(): Handles regex and string matching errors
  • join(): Robust concatenation with type handling
  • wait(): Collects data even if individual chunks have issues

docs

data-processing.md

index.md

stream-combination.md

stream-creation.md

stream-transformation.md

utility-functions.md

tile.json