or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-duplexify

Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/duplexify@4.1.x

To install, run

npx @tessl/cli install tessl/npm-duplexify@4.1.0

index.mddocs/

Duplexify

Duplexify turns separate writable and readable streams into a single streams2 duplex stream with support for async initialization and streams1/streams2 input. It enables combining independent readable and writable stream components into a unified interface that behaves as a standard duplex stream.

Package Information

  • Package Name: duplexify
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install duplexify

Core Imports

const duplexify = require('duplexify');

For ES modules:

import duplexify from 'duplexify';

Basic Usage

const duplexify = require('duplexify');
const fs = require('fs');

// Create duplex stream from separate readable and writable streams
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
const dup = duplexify(writeStream, readStream);

// Use as a standard duplex stream
dup.write('hello world');
dup.on('data', (data) => {
  console.log('received:', data.toString());
});

// Async initialization - set streams later
const asyncDup = duplexify();

asyncDup.write('buffered until writable is set');

// Set streams asynchronously
setTimeout(() => {
  asyncDup.setWritable(writeStream);
  asyncDup.setReadable(readStream);
}, 100);

Architecture

Duplexify uses several key design patterns:

  • Stream Bridging: Combines separate readable and writable streams into a unified duplex interface
  • Lazy Initialization: Supports setting readable/writable components after construction
  • Buffering: Automatically buffers write operations until the writable stream is available
  • Error Propagation: Forwards errors and close events between component streams and the duplex stream
  • Compatibility Layer: Provides seamless support for both streams1 and streams2 APIs

Capabilities

Constructor

Creates a duplex stream from separate writable and readable streams.

/**
 * Create a duplex stream from separate writable and readable streams
 * @param {stream.Writable} [writable] - Writable stream component
 * @param {stream.Readable} [readable] - Readable stream component  
 * @param {Object} [opts] - Stream options
 * @param {boolean} [opts.autoDestroy=true] - Auto-destroy on error
 * @param {boolean} [opts.destroy=true] - Forward destroy to component streams
 * @param {boolean} [opts.end=true] - Forward end to writable stream
 * @returns {Duplexify} Duplex stream instance
 */
function duplexify(writable, readable, opts);

Object Mode Constructor

Creates an object-mode duplex stream with preset options.

/**
 * Create an object-mode duplex stream
 * @param {stream.Writable} [writable] - Writable stream component
 * @param {stream.Readable} [readable] - Readable stream component
 * @param {Object} [opts] - Additional stream options
 * @returns {Duplexify} Object-mode duplex stream instance
 */
duplexify.obj = function(writable, readable, opts);

Stream Management

Methods for dynamically setting and managing the component streams.

/**
 * Set or replace the writable component of the duplex stream
 * @param {stream.Writable|null|false} writable - Writable stream or null/false to disable
 */
setWritable(writable);

/**
 * Set or replace the readable component of the duplex stream
 * @param {stream.Readable|null|false} readable - Readable stream or null/false to disable
 */
setReadable(readable);

Flow Control

Methods for controlling stream flow and buffering behavior.

/**
 * Cork the stream to buffer write operations
 */
cork();

/**
 * Uncork the stream to flush buffered writes
 */
uncork();

Lifecycle Management

Methods for managing the stream lifecycle and cleanup.

/**
 * Destroy the duplex stream and optionally underlying streams
 * @param {Error} [err] - Optional error to emit during destruction
 * @param {Function} [cb] - Optional callback called when destruction is complete
 */
destroy(err, cb);

/**
 * End the writable side of the duplex stream
 * @param {*} [data] - Optional final data to write
 * @param {string} [encoding] - Optional encoding for data
 * @param {Function} [callback] - Optional callback when finished
 * @returns {Duplexify} this instance for chaining
 */
end(data, encoding, callback);

Properties

/**
 * Boolean indicating if the stream has been destroyed
 * @type {boolean}
 * @readonly
 */
destroyed;

Options

Configuration options for customizing duplexify behavior:

interface DuplexifyOptions {
  /** Whether to automatically destroy the stream on error (default: true) */
  autoDestroy?: boolean;
  
  /** Whether to destroy underlying streams when this stream is destroyed (default: true) */
  destroy?: boolean;
  
  /** Whether to end underlying writable stream when this stream ends (default: true) */
  end?: boolean;
  
  /** Standard Node.js stream options */
  objectMode?: boolean;
  highWaterMark?: number;
  encoding?: string;
}

Events

Duplexify inherits all standard Node.js stream events and adds custom events:

// Standard stream events
dup.on('data', (chunk) => {});        // Readable data available
dup.on('end', () => {});              // Readable side ended
dup.on('finish', () => {});           // Writable side finished
dup.on('close', () => {});            // Stream closed
dup.on('error', (err) => {});         // Error occurred

// Duplexify-specific events  
dup.on('cork', () => {});             // Stream was corked
dup.on('uncork', () => {});           // Stream was uncorked
dup.on('preend', () => {});           // Before stream ends
dup.on('prefinish', () => {});        // Before stream finishes (writable side)

Error Handling

Duplexify automatically handles error propagation between component streams:

  • Errors from either component stream bubble up to the duplex stream
  • Only the first error is emitted (subsequent errors are ignored)
  • 'premature close' errors are filtered out (converted to null)
  • The autoDestroy option controls automatic destruction on error
  • Write errors from the underlying writable stream are propagated via the error event
  • Errors during destroy() are emitted unless a callback is provided
dup.on('error', (err) => {
  // Handle errors from component streams or write operations
  console.error('Stream error:', err.message);
});

// Destroy with error
dup.destroy(new Error('custom error'));

Usage Examples

HTTP Request Wrapper

const duplexify = require('duplexify');
const http = require('http');

function request(opts) {
  const req = http.request(opts);
  const dup = duplexify(req);
  
  req.on('response', (res) => {
    dup.setReadable(res);
  });
  
  return dup;
}

const req = request({
  method: 'GET',
  host: 'www.example.com',
  port: 80
});

req.end();
req.pipe(process.stdout);

Stream Processing Pipeline

const duplexify = require('duplexify');
const through = require('through2');

// Create processing pipeline
const input = through();
const output = through();
const processor = duplexify(output, input);

// Process data
processor.write('hello');
processor.write(' world');
processor.end();

processor.on('data', (chunk) => {
  console.log('processed:', chunk.toString());
});

Async Stream Setup

const duplexify = require('duplexify');

// Create duplex before streams are available
const dup = duplexify();

// Writes are buffered
dup.write('this will be buffered');
dup.write('until writable is set');

// Set up streams asynchronously
async function setupStreams() {
  const writable = await getWritableStream();
  const readable = await getReadableStream();
  
  dup.setWritable(writable);
  dup.setReadable(readable);
}

setupStreams();

Cork/Uncork Flow Control

const duplexify = require('duplexify');
const through = require('through2');

const passthrough = through();
const dup = duplexify(passthrough, passthrough);

// Cork stream to buffer writes
dup.on('prefinish', () => {
  console.log('Stream about to finish, corking for cleanup...');
  dup.cork();
  
  // Perform cleanup operations
  setTimeout(() => {
    console.log('Cleanup complete, uncorking...');
    dup.uncork();
  }, 100);
});

dup.on('finish', () => {
  console.log('Stream finished');
});

dup.write('data');
dup.end();