CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-mafintosh--streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

transform.mddocs/

Transform Streams

Transform streams for data transformation, extending Duplex with specialized transformation capabilities that map written data to readable output.

Capabilities

Transform Constructor

Creates a new Transform stream with transformation configuration.

/**
 * Creates a new Transform stream
 * @param opts - Configuration options including transformation function
 */
class Transform extends Duplex {
  constructor(opts?: TransformOptions);
}

interface TransformOptions extends DuplexOptions {
  /** Transform function shorthand */
  transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
  /** Flush function shorthand */
  flush?: (cb: (error?: Error, result?: any) => void) => void;
}

Usage Example:

const { Transform } = require("@mafintosh/streamx");

const upperCaseTransform = new Transform({
  transform(data, cb) {
    const result = data.toString().toUpperCase();
    cb(null, result);
  }
});

Transformation Logic

_transform Method

Called to transform incoming data. Override to implement transformation logic.

/**
 * Transform incoming data
 * @param data - Data to transform
 * @param callback - Callback with error and transformed result, or use push()
 */
_transform(data: any, callback: (error?: Error, result?: any) => void): void;

Usage Example:

class JSONTransform extends Transform {
  _transform(data, cb) {
    try {
      const parsed = JSON.parse(data.toString());
      const transformed = {
        ...parsed,
        processed: true,
        timestamp: Date.now()
      };
      cb(null, JSON.stringify(transformed));
    } catch (err) {
      cb(err);
    }
  }
}

_flush Method

Called at the end of the transform to flush any remaining data.

/**
 * Called to flush any remaining data at the end of transformation
 * @param callback - Callback with error and optional final data
 */
_flush(callback: (error?: Error, result?: any) => void): void;

_final Method

Called internally by the stream when ending. Automatically calls _flush.

/**
 * Called internally when the transform stream is ending
 * @param callback - Callback to call when final processing is complete
 */
_final(callback: (error?: Error) => void): void;

Transform Patterns

Using Callback

Transform data and pass result via callback:

const csvToJson = new Transform({
  transform(data, cb) {
    const lines = data.toString().split('\n');
    const result = lines.map(line => {
      const [name, age] = line.split(',');
      return JSON.stringify({ name, age: parseInt(age) });
    }).join('\n');
    cb(null, result);
  }
});

Using Push

Transform data and push result directly:

const splitTransform = new Transform({
  transform(data, cb) {
    const lines = data.toString().split('\n');
    lines.forEach(line => {
      if (line.trim()) {
        this.push(line.trim());
      }
    });
    cb(null); // No result via callback
  }
});

Multiple Output

Generate multiple outputs for single input:

const duplicateTransform = new Transform({
  transform(data, cb) {
    const str = data.toString();
    this.push(`Original: ${str}`);
    this.push(`Copy: ${str}`);
    this.push(`Uppercase: ${str.toUpperCase()}`);
    cb(null);
  }
});

Common Transform Examples

Text Processing

const { Transform } = require("@mafintosh/streamx");

// Line-by-line processor
class LineProcessor extends Transform {
  constructor() {
    super();
    this.buffer = '';
  }

  _transform(chunk, cb) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    
    // Keep the last incomplete line in buffer
    this.buffer = lines.pop();
    
    // Process complete lines
    lines.forEach(line => {
      if (line.trim()) {
        const processed = `[${new Date().toISOString()}] ${line}`;
        this.push(processed);
      }
    });
    
    cb(null);
  }

  _flush(cb) {
    // Process any remaining data in buffer
    if (this.buffer.trim()) {
      const processed = `[${new Date().toISOString()}] ${this.buffer}`;
      this.push(processed);
    }
    cb(null);
  }
}

Data Validation and Filtering

class ValidatorTransform extends Transform {
  constructor(validator) {
    super();
    this.validate = validator;
    this.validCount = 0;
    this.invalidCount = 0;
  }

  _transform(data, cb) {
    try {
      const item = JSON.parse(data.toString());
      if (this.validate(item)) {
        this.validCount++;
        this.push(JSON.stringify(item));
      } else {
        this.invalidCount++;
        console.warn('Invalid item:', item);
      }
    } catch (err) {
      this.invalidCount++;
      console.error('Parse error:', err.message);
    }
    cb(null);
  }

  _flush(cb) {
    console.log(`Processed: ${this.validCount} valid, ${this.invalidCount} invalid`);
    cb(null);
  }
}

// Usage
const validator = new ValidatorTransform(item => 
  typeof item.name === 'string' && typeof item.age === 'number'
);

Compression/Encoding Transform

const crypto = require('crypto');

class Base64Transform extends Transform {
  _transform(data, cb) {
    const encoded = Buffer.from(data).toString('base64');
    cb(null, encoded);
  }
}

class HashTransform extends Transform {
  constructor(algorithm = 'sha256') {
    super();
    this.algorithm = algorithm;
  }

  _transform(data, cb) {
    const hash = crypto.createHash(this.algorithm);
    hash.update(data);
    const result = hash.digest('hex');
    cb(null, result);
  }
}

Pass-through Behavior

By default, Transform acts as a pass-through stream if no _transform is implemented:

const passThrough = new Transform(); // Acts as pass-through

// Data written to it is emitted as readable data unchanged
passThrough.write("Hello");
passThrough.end();

passThrough.on('data', (chunk) => {
  console.log('Passed through:', chunk.toString()); // "Hello"
});

Pipeline Usage

Transform streams work excellently in pipelines:

const { Readable, Writable, Transform } = require("@mafintosh/streamx");

// Create source data
const source = new Readable({
  read(cb) {
    this.push('hello world\n');
    this.push('foo bar\n');
    this.push(null);
    cb(null);
  }
});

// Create transforms
const upperCase = new Transform({
  transform(data, cb) {
    cb(null, data.toString().toUpperCase());
  }
});

const addPrefix = new Transform({
  transform(data, cb) {
    cb(null, `>> ${data}`);
  }
});

// Create destination
const destination = new Writable({
  write(data, cb) {
    console.log('Final:', data.toString());
    cb(null);
  }
});

// Pipeline
source.pipe(upperCase).pipe(addPrefix).pipe(destination, (err) => {
  if (err) console.error('Pipeline error:', err);
  else console.log('Pipeline completed');
});

Advanced Transform Features

Stateful Transformation

class CounterTransform extends Transform {
  constructor() {
    super();
    this.lineNumber = 0;
  }

  _transform(data, cb) {
    const lines = data.toString().split('\n');
    const result = lines.map(line => {
      if (line.trim()) {
        return `${++this.lineNumber}: ${line}`;
      }
      return line;
    }).join('\n');
    cb(null, result);
  }
}

Async Transformation

class AsyncTransform extends Transform {
  async _transform(data, cb) {
    try {
      // Simulate async operation
      const result = await this.processAsync(data.toString());
      cb(null, result);
    } catch (err) {
      cb(err);
    }
  }

  async processAsync(data) {
    return new Promise(resolve => {
      setTimeout(() => {
        resolve(data.toUpperCase());
      }, 100);
    });
  }
}

Events

Transform streams inherit all events from Duplex streams:

const transform = new Transform({
  transform(data, cb) {
    cb(null, data.toString().toUpperCase());
  }
});

transform.on('data', (chunk) => {
  console.log('Transformed:', chunk.toString());
});

transform.on('finish', () => {
  console.log('All transforms completed');
});

transform.on('close', () => {
  console.log('Transform stream closed');
});

transform.write('hello');
transform.write('world');
transform.end();

Install with Tessl CLI

npx tessl i tessl/npm-mafintosh--streamx

docs

duplex.md

index.md

readable.md

stream.md

transform.md

writable.md

tile.json