Object transformations implementing the Node.js stream.Transform API
—
Core streaming transformation functionality for scalable data processing. The stream API extends Node.js stream.Transform to provide object transformations with full backpressure support and event-driven processing.
Creates a transform stream for processing records with configurable options and handler functions.
/**
* Create a transform stream with handler function
* @param handler - Function to transform each record
* @param callback - Optional completion callback for auto-consumption
* @returns Transformer stream instance
*/
function transform<T, U>(handler: Handler<T, U>, callback?: Callback): Transformer;
/**
* Create a transform stream with options and handler
* @param options - Configuration options for the transformer
* @param handler - Function to transform each record
* @param callback - Optional completion callback for auto-consumption
* @returns Transformer stream instance
*/
function transform<T, U>(options: Options, handler: Handler<T, U>, callback?: Callback): Transformer;Usage Examples:
import { transform } from "stream-transform";
import { createReadStream } from "fs";
// Basic stream transformation
const transformer = transform((record) => {
return record.map(field => field.toUpperCase());
});
// Stream with options
const transformer = transform({
parallel: 50,
params: { prefix: "processed_" }
}, (record, params) => {
return params.prefix + record.join(",");
});
// Auto-consumption with callback
const transformer = transform((record) => record, (err, results) => {
if (err) throw err;
console.log(`Processed ${results.length} records`);
});
// Pipe data through transformer
createReadStream("input.csv")
.pipe(csvParser())
.pipe(transformer)
.pipe(createWriteStream("output.csv"));Transform stream class that extends Node.js stream.Transform with additional state tracking and configuration.
/**
* Transform stream class for data processing
*/
class Transformer extends stream.Transform {
/**
* Create a new Transformer instance
* @param options - Configuration options
*/
constructor(options: Options);
/** Configuration options (read-only) */
readonly options: Options;
/** Current transformation state (read-only) */
readonly state: State;
}Properties:
options: Immutable configuration object containing transformer settingsstate: Real-time statistics about transformation progressUsage Examples:
import { transform } from "stream-transform";
const transformer = transform({
parallel: 10,
consume: true,
params: { multiplier: 2 }
}, (record, params) => record * params.multiplier);
// Access configuration
console.log(transformer.options.parallel); // 10
console.log(transformer.options.params.multiplier); // 2
// Monitor progress
transformer.on('readable', () => {
console.log(`Progress: ${transformer.state.finished}/${transformer.state.started}`);
});Transformer streams emit standard Node.js stream events plus custom events for monitoring.
// Standard stream events
transformer.on('readable', () => { /* Data available for reading */ });
transformer.on('end', () => { /* No more data will be written */ });
transformer.on('finish', () => { /* All data has been processed */ });
transformer.on('error', (err) => { /* Error occurred during processing */ });
transformer.on('close', () => { /* Stream has been destroyed */ });
// Pipe to destination
transformer.on('pipe', (src) => { /* Stream was piped from src */ });
transformer.on('unpipe', (src) => { /* Stream was unpiped from src */ });Usage Examples:
import { transform } from "stream-transform";
const transformer = transform((record) => processRecord(record));
// Handle events
transformer.on('readable', function() {
let record;
while ((record = this.read()) !== null) {
console.log('Processed:', record);
}
});
transformer.on('error', (err) => {
console.error('Transformation error:', err);
});
transformer.on('finish', () => {
console.log('All records processed');
console.log(`Final stats: ${transformer.state.finished} completed`);
});User-defined transformation functions that process individual records. Supports multiple execution patterns.
/**
* Synchronous handler - returns result directly
*/
type SyncHandler<T, U> = (record: T, params?: any) => U;
/**
* Asynchronous handler - uses callback for result
*/
type AsyncHandler<T, U> = (record: T, callback: HandlerCallback<U>, params?: any) => void;
/**
* Promise-based handler - returns Promise of result
*/
type PromiseHandler<T, U> = (record: T, params?: any) => Promise<U>;
/**
* Generic handler type (union of all patterns)
*/
type Handler<T, U> = SyncHandler<T, U> | AsyncHandler<T, U> | PromiseHandler<T, U>;
/**
* Callback function for asynchronous handlers
*/
type HandlerCallback<T = any> = (err?: null | Error, record?: T) => void;Handler Detection:
The library automatically detects handler type based on function signature:
.then method: Promise-based handlerUsage Examples:
import { transform } from "stream-transform";
// Synchronous handler
const syncTransformer = transform((record) => {
return record.map(field => field.trim());
});
// Asynchronous handler
const asyncTransformer = transform((record, callback) => {
setTimeout(() => {
callback(null, record.join("|"));
}, 10);
});
// Promise-based handler
const promiseTransformer = transform(async (record) => {
const result = await processAsync(record);
return result;
});
// Handler with params
const paramsTransformer = transform({
params: { separator: "|", prefix: "row_" }
}, (record, params) => {
return params.prefix + record.join(params.separator);
});Configuration object for customizing transformer behavior.
interface Options extends stream.TransformOptions {
/**
* Auto-consume stream when no consumer is present
* @default false
*/
consume?: boolean;
/**
* Number of parallel transformation callbacks (async handlers only)
* @default 100
*/
parallel?: number;
/**
* User-defined parameters passed to handler function
* @default null
*/
params?: any;
}Usage Examples:
import { transform } from "stream-transform";
// Auto-consumption for standalone processing
const autoConsumer = transform({
consume: true
}, (record) => processRecord(record));
// High concurrency for I/O-bound operations
const highConcurrency = transform({
parallel: 500
}, async (record) => {
return await fetchDataForRecord(record);
});
// Custom parameters
const withParams = transform({
params: {
apiKey: process.env.API_KEY,
timeout: 5000
}
}, (record, params) => {
return enrichRecord(record, params.apiKey, params.timeout);
});
// Inherit stream options
const customStream = transform({
highWaterMark: 64 * 1024,
objectMode: true // automatically set by transform
}, (record) => record);Real-time statistics about transformation progress and performance.
interface State {
/** Number of transformations that have completed successfully */
finished: number;
/** Number of transformations currently being processed */
running: number;
/** Total number of transformations that have been started */
started: number;
/** Whether the stream is currently paused due to backpressure */
paused: boolean;
}Usage Examples:
import { transform } from "stream-transform";
const transformer = transform((record) => processRecord(record));
// Monitor progress
const progressInterval = setInterval(() => {
const { started, running, finished } = transformer.state;
const completion = started ? (finished / started * 100).toFixed(1) : 0;
console.log(`Progress: ${completion}% (${finished}/${started}, ${running} running)`);
}, 1000);
transformer.on('finish', () => {
clearInterval(progressInterval);
console.log(`Final: ${transformer.state.finished} records processed`);
});
// Detect processing bottlenecks
transformer.on('readable', () => {
if (transformer.state.running > transformer.options.parallel * 0.8) {
console.warn('High concurrency usage detected');
}
});Comprehensive error handling for stream processing with proper cleanup and recovery.
// Errors are emitted as 'error' events
transformer.on('error', (err: Error) => void);
// Handler errors automatically destroy the stream
// Callback-based handlers: call callback(error)
// Sync handlers: throw error
// Promise handlers: reject promiseUsage Examples:
import { transform } from "stream-transform";
const transformer = transform((record) => {
if (!record || record.length === 0) {
throw new Error('Invalid record: empty or null');
}
return processRecord(record);
});
// Handle stream errors
transformer.on('error', (err) => {
console.error('Transform error:', err.message);
// Stream is automatically destroyed
cleanup();
});
// Async handler error handling
const asyncTransformer = transform((record, callback) => {
processRecordAsync(record, (err, result) => {
if (err) {
return callback(err); // Will emit 'error' event
}
callback(null, result);
});
});
// Promise handler error handling
const promiseTransformer = transform(async (record) => {
try {
return await riskyOperation(record);
} catch (err) {
throw new Error(`Failed to process record: ${err.message}`);
}
});Install with Tessl CLI
npx tessl i tessl/npm-stream-transform