CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-stream-transform

Object transformations implementing the Node.js stream.Transform API

Pending
Overview
Eval results
Files

stream-api.mddocs/

Stream API

Core streaming transformation functionality for scalable data processing. The stream API extends Node.js stream.Transform to provide object transformations with full backpressure support and event-driven processing.

Capabilities

Transform Function

Creates a transform stream for processing records with configurable options and handler functions.

/**
 * Create a transform stream with handler function
 * @param handler - Function to transform each record
 * @param callback - Optional completion callback for auto-consumption
 * @returns Transformer stream instance
 */
function transform<T, U>(handler: Handler<T, U>, callback?: Callback): Transformer;

/**
 * Create a transform stream with options and handler
 * @param options - Configuration options for the transformer
 * @param handler - Function to transform each record  
 * @param callback - Optional completion callback for auto-consumption
 * @returns Transformer stream instance
 */
function transform<T, U>(options: Options, handler: Handler<T, U>, callback?: Callback): Transformer;

Usage Examples:

import { transform } from "stream-transform";
import { createReadStream } from "fs";

// Basic stream transformation
const transformer = transform((record) => {
  return record.map(field => field.toUpperCase());
});

// Stream with options
const transformer = transform({
  parallel: 50,
  params: { prefix: "processed_" }
}, (record, params) => {
  return params.prefix + record.join(",");
});

// Auto-consumption with callback
const transformer = transform((record) => record, (err, results) => {
  if (err) throw err;
  console.log(`Processed ${results.length} records`);
});

// Pipe data through transformer
createReadStream("input.csv")
  .pipe(csvParser())
  .pipe(transformer)
  .pipe(createWriteStream("output.csv"));

Transformer Class

Transform stream class that extends Node.js stream.Transform with additional state tracking and configuration.

/**
 * Transform stream class for data processing
 */
class Transformer extends stream.Transform {
  /**
   * Create a new Transformer instance
   * @param options - Configuration options
   */
  constructor(options: Options);
  
  /** Configuration options (read-only) */
  readonly options: Options;
  
  /** Current transformation state (read-only) */
  readonly state: State;
}

Properties:

  • options: Immutable configuration object containing transformer settings
  • state: Real-time statistics about transformation progress

Usage Examples:

import { transform } from "stream-transform";

const transformer = transform({
  parallel: 10,
  consume: true,
  params: { multiplier: 2 }
}, (record, params) => record * params.multiplier);

// Access configuration
console.log(transformer.options.parallel); // 10
console.log(transformer.options.params.multiplier); // 2

// Monitor progress
transformer.on('readable', () => {
  console.log(`Progress: ${transformer.state.finished}/${transformer.state.started}`);
});

Stream Events

Transformer streams emit standard Node.js stream events plus custom events for monitoring.

// Standard stream events
transformer.on('readable', () => { /* Data available for reading */ });
transformer.on('end', () => { /* No more data will be written */ });
transformer.on('finish', () => { /* All data has been processed */ });
transformer.on('error', (err) => { /* Error occurred during processing */ });
transformer.on('close', () => { /* Stream has been destroyed */ });

// Pipe to destination
transformer.on('pipe', (src) => { /* Stream was piped from src */ });
transformer.on('unpipe', (src) => { /* Stream was unpiped from src */ });

Usage Examples:

import { transform } from "stream-transform";

const transformer = transform((record) => processRecord(record));

// Handle events
transformer.on('readable', function() {
  let record;
  while ((record = this.read()) !== null) {
    console.log('Processed:', record);
  }
});

transformer.on('error', (err) => {
  console.error('Transformation error:', err);
});

transformer.on('finish', () => {
  console.log('All records processed');
  console.log(`Final stats: ${transformer.state.finished} completed`);
});

Handler Functions

User-defined transformation functions that process individual records. Supports multiple execution patterns.

/**
 * Synchronous handler - returns result directly
 */
type SyncHandler<T, U> = (record: T, params?: any) => U;

/**
 * Asynchronous handler - uses callback for result
 */  
type AsyncHandler<T, U> = (record: T, callback: HandlerCallback<U>, params?: any) => void;

/**
 * Promise-based handler - returns Promise of result
 */
type PromiseHandler<T, U> = (record: T, params?: any) => Promise<U>;

/**
 * Generic handler type (union of all patterns)
 */
type Handler<T, U> = SyncHandler<T, U> | AsyncHandler<T, U> | PromiseHandler<T, U>;

/**
 * Callback function for asynchronous handlers
 */
type HandlerCallback<T = any> = (err?: null | Error, record?: T) => void;

Handler Detection:

The library automatically detects handler type based on function signature:

  • 1 parameter (+ optional params): Synchronous handler
  • 2 parameters (+ optional params): Asynchronous handler (second param is callback)
  • Return value with .then method: Promise-based handler

Usage Examples:

import { transform } from "stream-transform";

// Synchronous handler
const syncTransformer = transform((record) => {
  return record.map(field => field.trim());
});

// Asynchronous handler
const asyncTransformer = transform((record, callback) => {
  setTimeout(() => {
    callback(null, record.join("|"));
  }, 10);
});

// Promise-based handler
const promiseTransformer = transform(async (record) => {
  const result = await processAsync(record);
  return result;
});

// Handler with params
const paramsTransformer = transform({
  params: { separator: "|", prefix: "row_" }
}, (record, params) => {
  return params.prefix + record.join(params.separator);
});

Configuration Options

Configuration object for customizing transformer behavior.

interface Options extends stream.TransformOptions {
  /**
   * Auto-consume stream when no consumer is present
   * @default false
   */
  consume?: boolean;
  
  /**
   * Number of parallel transformation callbacks (async handlers only)
   * @default 100
   */
  parallel?: number;
  
  /**
   * User-defined parameters passed to handler function
   * @default null
   */
  params?: any;
}

Usage Examples:

import { transform } from "stream-transform";

// Auto-consumption for standalone processing
const autoConsumer = transform({
  consume: true
}, (record) => processRecord(record));

// High concurrency for I/O-bound operations
const highConcurrency = transform({
  parallel: 500
}, async (record) => {
  return await fetchDataForRecord(record);
});

// Custom parameters
const withParams = transform({
  params: { 
    apiKey: process.env.API_KEY,
    timeout: 5000
  }
}, (record, params) => {
  return enrichRecord(record, params.apiKey, params.timeout);
});

// Inherit stream options
const customStream = transform({
  highWaterMark: 64 * 1024,
  objectMode: true // automatically set by transform
}, (record) => record);

State Monitoring

Real-time statistics about transformation progress and performance.

interface State {
  /** Number of transformations that have completed successfully */
  finished: number;
  
  /** Number of transformations currently being processed */
  running: number;
  
  /** Total number of transformations that have been started */
  started: number;
  
  /** Whether the stream is currently paused due to backpressure */
  paused: boolean;
}

Usage Examples:

import { transform } from "stream-transform";

const transformer = transform((record) => processRecord(record));

// Monitor progress
const progressInterval = setInterval(() => {
  const { started, running, finished } = transformer.state;
  const completion = started ? (finished / started * 100).toFixed(1) : 0;
  console.log(`Progress: ${completion}% (${finished}/${started}, ${running} running)`);
}, 1000);

transformer.on('finish', () => {
  clearInterval(progressInterval);
  console.log(`Final: ${transformer.state.finished} records processed`);
});

// Detect processing bottlenecks
transformer.on('readable', () => {
  if (transformer.state.running > transformer.options.parallel * 0.8) {
    console.warn('High concurrency usage detected');
  }
});

Error Handling

Comprehensive error handling for stream processing with proper cleanup and recovery.

// Errors are emitted as 'error' events
transformer.on('error', (err: Error) => void);

// Handler errors automatically destroy the stream
// Callback-based handlers: call callback(error)
// Sync handlers: throw error  
// Promise handlers: reject promise

Usage Examples:

import { transform } from "stream-transform";

const transformer = transform((record) => {
  if (!record || record.length === 0) {
    throw new Error('Invalid record: empty or null');
  }
  return processRecord(record);
});

// Handle stream errors
transformer.on('error', (err) => {
  console.error('Transform error:', err.message);
  // Stream is automatically destroyed
  cleanup();
});

// Async handler error handling
const asyncTransformer = transform((record, callback) => {
  processRecordAsync(record, (err, result) => {
    if (err) {
      return callback(err); // Will emit 'error' event
    }
    callback(null, result);
  });
});

// Promise handler error handling  
const promiseTransformer = transform(async (record) => {
  try {
    return await riskyOperation(record);
  } catch (err) {
    throw new Error(`Failed to process record: ${err.message}`);
  }
});

Install with Tessl CLI

npx tessl i tessl/npm-stream-transform

docs

callback-api.md

index.md

stream-api.md

sync-api.md

tile.json