CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-mafintosh--streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

duplex.mddocs/

Duplex Streams

Duplex streams that are both readable and writable, inheriting from Readable while implementing the complete Writable interface.

Capabilities

Duplex Constructor

Creates a new Duplex stream with configuration options for both readable and writable sides.

/**
 * Creates a new Duplex stream
 * @param opts - Configuration options for both readable and writable functionality
 */
class Duplex extends Readable {
  constructor(opts?: DuplexOptions);
}

interface DuplexOptions extends ReadableOptions {
  /** Map function for readable side only */
  mapReadable?: (data: any) => any;
  /** ByteLength function for readable side only */
  byteLengthReadable?: (data: any) => number;
  /** Map function for writable side only */
  mapWritable?: (data: any) => any;
  /** ByteLength function for writable side only */
  byteLengthWritable?: (data: any) => number;
  /** Write function shorthand */
  write?: (data: any, cb: (error?: Error) => void) => void;
  /** Final function shorthand */
  final?: (cb: (error?: Error) => void) => void;
}

Usage Example:

const { Duplex } = require("@mafintosh/streamx");

const duplex = new Duplex({
  read(cb) {
    this.push(`read-${Date.now()}`);
    cb(null);
  },
  write(data, cb) {
    console.log("Writing:", data.toString());
    cb(null);
  }
});

Readable Side

All methods and events from Readable streams are available:

Data Production Methods

/**
 * Called when stream wants new data for reading
 * @param cb - Callback to call when read operation is complete
 */
_read(cb: (error?: Error) => void): void;

/**
 * Push data to the readable buffer
 * @param data - Data to push, or null to end readable side
 * @returns True if buffer is not full
 */
push(data: any): boolean;

/**
 * Read data from the readable buffer
 * @returns Data from buffer, or null if empty
 */
read(): any;

Writable Side

All methods and events from Writable streams are available:

Data Consumption Methods

/**
 * Called when stream wants to write data
 * @param data - Data to write
 * @param callback - Callback to call when write is complete
 */
_write(data: any, callback: (error?: Error) => void): void;

/**
 * Write data to the stream
 * @param data - Data to write
 * @returns True if buffer is not full
 */
write(data: any): boolean;

/**
 * End the writable side gracefully
 * @param data - Optional final data to write before ending
 */
end(data?: any): void;

/**
 * Called before finish event for cleanup
 * @param callback - Callback to call when final is complete
 */
_final(callback: (error?: Error) => void): void;

Combined Usage

Echo Server Example:

const { Duplex } = require("@mafintosh/streamx");

class EchoStream extends Duplex {
  constructor() {
    super();
    this.buffer = [];
  }

  _read(cb) {
    if (this.buffer.length > 0) {
      this.push(this.buffer.shift());
    }
    cb(null);
  }

  _write(data, cb) {
    // Echo written data back to readable side
    this.buffer.push(`Echo: ${data}`);
    this.push(`Echo: ${data}`);
    cb(null);
  }
}

const echo = new EchoStream();

// Write data
echo.write("Hello");
echo.write("World");

// Read echoed data
echo.on('data', (chunk) => {
  console.log('Received:', chunk.toString());
});

echo.end();

Proxy Stream Example:

const { Duplex } = require("@mafintosh/streamx");

class ProxyStream extends Duplex {
  constructor(target) {
    super();
    this.target = target;
    
    // Forward data from target to our readable side
    this.target.on('data', (chunk) => {
      this.push(chunk);
    });
    
    this.target.on('end', () => {
      this.push(null);
    });
  }

  _write(data, cb) {
    // Forward written data to target
    this.target.write(data);
    cb(null);
  }

  _final(cb) {
    this.target.end();
    cb(null);
  }
}

Bidirectional Transform Example:

const { Duplex } = require("@mafintosh/streamx");

class BidirectionalTransform extends Duplex {
  constructor() {
    super();
    this.readCounter = 0;
    this.writeCounter = 0;
  }

  _read(cb) {
    // Generate data for reading
    if (this.readCounter < 5) {
      this.push(`generated-${this.readCounter++}`);
    } else {
      this.push(null);
    }
    cb(null);
  }

  _write(data, cb) {
    // Process written data
    const processed = data.toString().toUpperCase();
    console.log(`Processed write #${this.writeCounter++}:`, processed);
    cb(null);
  }
}

const transform = new BidirectionalTransform();

// Read generated data
transform.on('data', (chunk) => {
  console.log('Read:', chunk.toString());
});

// Write data to be processed
transform.write("hello");
transform.write("world");
transform.end();

Events

Duplex streams emit events from both Readable and Writable:

Readable Events

  • readable - Data available to read
  • data - Data chunk read (auto-resumes stream)
  • end - Readable side ended

Writable Events

  • drain - Buffer drained, safe to write more
  • finish - All writes completed

Shared Events

  • close - Stream fully closed
  • error - Error occurred

Complete Event Handling Example:

const { Duplex } = require("@mafintosh/streamx");

const duplex = new Duplex({
  read(cb) {
    this.push(`data-${Date.now()}`);
    setTimeout(() => cb(null), 100);
  },
  write(data, cb) {
    console.log("Writing:", data.toString());
    setTimeout(() => cb(null), 50);
  }
});

// Readable events
duplex.on('readable', () => {
  console.log('Readable event');
});

duplex.on('data', (chunk) => {
  console.log('Data:', chunk.toString());
});

duplex.on('end', () => {
  console.log('Readable end');
});

// Writable events
duplex.on('drain', () => {
  console.log('Drain event');
});

duplex.on('finish', () => {
  console.log('Writable finish');
});

// Shared events
duplex.on('close', () => {
  console.log('Stream closed');
});

duplex.on('error', (err) => {
  console.error('Error:', err);
});

// Use both sides
duplex.write("test data");
duplex.end();

Advanced Configuration

Separate Readable/Writable Configuration:

const duplex = new Duplex({
  // Readable configuration
  highWaterMark: 8192,
  mapReadable: (data) => data.toString().toUpperCase(),
  byteLengthReadable: (data) => Buffer.byteLength(data),
  
  // Writable configuration
  mapWritable: (data) => Buffer.from(data),
  byteLengthWritable: (data) => data.length,
  
  // Implementation
  read(cb) {
    this.push("readable data");
    cb(null);
  },
  
  write(data, cb) {
    console.log("Wrote:", data);
    cb(null);
  }
});

Install with Tessl CLI

npx tessl i tessl/npm-mafintosh--streamx

docs

duplex.md

index.md

readable.md

stream.md

transform.md

writable.md

tile.json