CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

duplex-transform.mddocs/

Duplex and Transform Streams

Duplex streams are both readable and writable, while Transform streams provide data transformation capabilities. StreamX provides enhanced implementations with proper lifecycle management and error handling.

Capabilities

Duplex Class

A duplex stream is both readable and writable, inheriting from Readable and implementing the Writable API.

/**
 * Creates a duplex stream that is both readable and writable
 * @param options - Configuration options for the duplex stream
 */
class Duplex extends Readable {
  constructor(options?: DuplexOptions);
  
  // Inherits all Readable methods
  _read(cb: () => void): void;
  push(data: any): boolean;
  read(): any;
  
  // Implements Writable methods
  _write(data: any, cb: (err?: Error) => void): void;
  _writev(batch: any[], cb: (err?: Error) => void): void;
  _final(cb: (err?: Error) => void): void;
  write(data: any): boolean;
  end(): Duplex;
  
  // Shared lifecycle methods
  _open(cb: (err?: Error) => void): void;
  _destroy(cb: (err?: Error) => void): void;
  _predestroy(): void;
  destroy(err?: Error): void;
}

interface DuplexOptions extends ReadableOptions {
  /** Map function for readable side only */
  mapReadable?: (data: any) => any;
  
  /** ByteLength function for readable side only */
  byteLengthReadable?: (data: any) => number;
  
  /** Map function for writable side only */
  mapWritable?: (data: any) => any;
  
  /** ByteLength function for writable side only */
  byteLengthWritable?: (data: any) => number;
  
  /** Shorthand for _write method */
  write?: (data: any, cb: (err?: Error) => void) => void;
  
  /** Shorthand for _writev method */
  writev?: (batch: any[], cb: (err?: Error) => void) => void;
  
  /** Shorthand for _final method */
  final?: (cb: (err?: Error) => void) => void;
}

Usage Examples:

const { Duplex } = require('streamx');

// Basic duplex stream
const echo = new Duplex({
  write(data, cb) {
    // Echo data back to readable side
    this.push(data);
    cb();
  },
  
  read(cb) {
    // Data is pushed from write side
    cb();
  }
});

// Write data and read it back
echo.write('Hello');
echo.write('World');

echo.on('data', (chunk) => {
  console.log('Echoed:', chunk.toString());
});

// More complex duplex with separate read/write logic
const processor = new Duplex({
  write(data, cb) {
    console.log('Processing input:', data.toString());
    // Process and push to readable side
    this.push(`Processed: ${data}`);
    cb();
  },
  
  read(cb) {
    // Readable side is fed by write operations
    cb();
  },
  
  final(cb) {
    console.log('Processing complete');
    this.push(null); // End readable side
    cb();
  }
});

Transform Class

A transform stream is a duplex stream that transforms data from its writable side to its readable side.

/**
 * Creates a transform stream that maps input data to output data
 * @param options - Configuration options for the transform stream
 */
class Transform extends Duplex {
  constructor(options?: TransformOptions);
  
  /** Override this method to implement data transformation */
  _transform(data: any, cb: (err?: Error, output?: any) => void): void;
  
  /** Override this method for final transformation operations */
  _flush(cb: (err?: Error, output?: any) => void): void;
}

interface TransformOptions extends DuplexOptions {
  /** Shorthand for _transform method */
  transform?: (data: any, cb: (err?: Error, output?: any) => void) => void;
  
  /** Shorthand for _flush method */
  flush?: (cb: (err?: Error, output?: any) => void) => void;
}

Usage Examples:

const { Transform } = require('streamx');

// Basic transformation
const upperCase = new Transform({
  transform(data, cb) {
    const transformed = data.toString().toUpperCase();
    cb(null, transformed);
  }
});

upperCase.write('hello');
upperCase.write('world');
upperCase.end();

upperCase.on('data', (chunk) => {
  console.log('Uppercase:', chunk.toString());
});

// JSON parser transform
const jsonParser = new Transform({
  transform(data, cb) {
    try {
      const parsed = JSON.parse(data.toString());
      cb(null, parsed);
    } catch (err) {
      cb(err);
    }
  }
});

// Line-by-line processor
const lineProcessor = new Transform({
  constructor() {
    super();
    this.buffer = '';
  },
  
  transform(chunk, cb) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line
    
    lines.forEach(line => {
      if (line.trim()) {
        this.push(`Processed: ${line}\n`);
      }
    });
    
    cb();
  },
  
  flush(cb) {
    if (this.buffer.trim()) {
      this.push(`Processed: ${this.buffer}\n`);
    }
    cb();
  }
});

PassThrough Class

A PassThrough stream is a Transform stream that passes data through unchanged.

/**
 * Creates a pass-through stream (identity transform)
 * @param options - Configuration options for the pass-through stream
 */
class PassThrough extends Transform {
  constructor(options?: TransformOptions);
  // Automatically passes data through without transformation
}

Usage Examples:

const { PassThrough } = require('streamx');

// Basic pass-through
const passThrough = new PassThrough();

passThrough.write('data flows through');
passThrough.end();

passThrough.on('data', (chunk) => {
  console.log('Passed through:', chunk.toString());
});

// Use as a proxy with monitoring
const monitor = new PassThrough();

monitor.on('data', (chunk) => {
  console.log(`Data passing through: ${chunk.length} bytes`);
});

// Pipe data through the monitor
someReadable.pipe(monitor).pipe(someWritable);

Advanced Transform Patterns

StreamX transforms support advanced patterns for complex data processing.

Buffering Transform:

const bufferingTransform = new Transform({
  constructor() {
    super();
    this.chunks = [];
    this.totalSize = 0;
  },
  
  transform(chunk, cb) {
    this.chunks.push(chunk);
    this.totalSize += chunk.length;
    
    // Emit when we have enough data
    if (this.totalSize >= 1024) {
      const combined = Buffer.concat(this.chunks);
      this.chunks = [];
      this.totalSize = 0;
      cb(null, combined);
    } else {
      cb();
    }
  },
  
  flush(cb) {
    if (this.chunks.length > 0) {
      const combined = Buffer.concat(this.chunks);
      cb(null, combined);
    } else {
      cb();
    }
  }
});

Async Transform:

const asyncTransform = new Transform({
  async transform(data, cb) {
    try {
      // Simulate async operation
      const processed = await processDataAsync(data.toString());
      cb(null, processed);
    } catch (err) {
      cb(err);
    }
  }
});

async function processDataAsync(data) {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(`Async processed: ${data}`);
    }, 100);
  });
}

Multi-output Transform:

const multiOutput = new Transform({
  transform(data, cb) {
    const input = data.toString();
    
    // Push multiple outputs for single input
    this.push(`Original: ${input}`);
    this.push(`Reversed: ${input.split('').reverse().join('')}`);
    this.push(`Length: ${input.length}`);
    
    cb(); // Don't pass data to cb, we used push instead
  }
});

Error Handling

Transform streams include comprehensive error handling with proper cleanup.

const errorHandlingTransform = new Transform({
  transform(data, cb) {
    try {
      if (data.toString().includes('poison')) {
        throw new Error('Poisoned data detected');
      }
      
      const result = data.toString().toUpperCase();
      cb(null, result);
    } catch (err) {
      cb(err); // Pass error to callback
    }
  }
});

errorHandlingTransform.on('error', (err) => {
  console.error('Transform error:', err.message);
});

errorHandlingTransform.on('close', () => {
  console.log('Transform stream closed');
});

// This will cause an error
errorHandlingTransform.write('poison pill');

Events

Duplex and Transform streams emit events from both readable and writable sides.

interface DuplexTransformEvents {
  // Readable events
  'readable': () => void;
  'data': (chunk: any) => void;
  'end': () => void;
  
  // Writable events
  'drain': () => void;
  'finish': () => void;
  
  // Shared events
  'close': () => void;
  'error': (err: Error) => void;
  'pipe': (src: Readable) => void;
  'piping': (dest: Writable) => void;
}

Install with Tessl CLI

npx tessl i tessl/npm-streamx

docs

duplex-transform.md

index.md

pipeline.md

readable.md

writable.md

tile.json