CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-duplexify

Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

Duplexify

Duplexify turns separate writable and readable streams into a single streams2 duplex stream with support for async initialization and streams1/streams2 input. It enables combining independent readable and writable stream components into a unified interface that behaves as a standard duplex stream.

Package Information

  • Package Name: duplexify
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install duplexify

Core Imports

const duplexify = require('duplexify');

For ES modules:

import duplexify from 'duplexify';

Basic Usage

const duplexify = require('duplexify');
const fs = require('fs');

// Create duplex stream from separate readable and writable streams
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
const dup = duplexify(writeStream, readStream);

// Use as a standard duplex stream
dup.write('hello world');
dup.on('data', (data) => {
  console.log('received:', data.toString());
});

// Async initialization - set streams later
const asyncDup = duplexify();

asyncDup.write('buffered until writable is set');

// Set streams asynchronously
setTimeout(() => {
  asyncDup.setWritable(writeStream);
  asyncDup.setReadable(readStream);
}, 100);

Architecture

Duplexify uses several key design patterns:

  • Stream Bridging: Combines separate readable and writable streams into a unified duplex interface
  • Lazy Initialization: Supports setting readable/writable components after construction
  • Buffering: Automatically buffers write operations until the writable stream is available
  • Error Propagation: Forwards errors and close events between component streams and the duplex stream
  • Compatibility Layer: Provides seamless support for both streams1 and streams2 APIs

Capabilities

Constructor

Creates a duplex stream from separate writable and readable streams.

/**
 * Create a duplex stream from separate writable and readable streams
 * @param {stream.Writable} [writable] - Writable stream component
 * @param {stream.Readable} [readable] - Readable stream component  
 * @param {Object} [opts] - Stream options
 * @param {boolean} [opts.autoDestroy=true] - Auto-destroy on error
 * @param {boolean} [opts.destroy=true] - Forward destroy to component streams
 * @param {boolean} [opts.end=true] - Forward end to writable stream
 * @returns {Duplexify} Duplex stream instance
 */
function duplexify(writable, readable, opts);

Object Mode Constructor

Creates an object-mode duplex stream with preset options.

/**
 * Create an object-mode duplex stream
 * @param {stream.Writable} [writable] - Writable stream component
 * @param {stream.Readable} [readable] - Readable stream component
 * @param {Object} [opts] - Additional stream options
 * @returns {Duplexify} Object-mode duplex stream instance
 */
duplexify.obj = function(writable, readable, opts);

Stream Management

Methods for dynamically setting and managing the component streams.

/**
 * Set or replace the writable component of the duplex stream
 * @param {stream.Writable|null|false} writable - Writable stream or null/false to disable
 */
setWritable(writable);

/**
 * Set or replace the readable component of the duplex stream
 * @param {stream.Readable|null|false} readable - Readable stream or null/false to disable
 */
setReadable(readable);

Flow Control

Methods for controlling stream flow and buffering behavior.

/**
 * Cork the stream to buffer write operations
 */
cork();

/**
 * Uncork the stream to flush buffered writes
 */
uncork();

Lifecycle Management

Methods for managing the stream lifecycle and cleanup.

/**
 * Destroy the duplex stream and optionally underlying streams
 * @param {Error} [err] - Optional error to emit during destruction
 * @param {Function} [cb] - Optional callback called when destruction is complete
 */
destroy(err, cb);

/**
 * End the writable side of the duplex stream
 * @param {*} [data] - Optional final data to write
 * @param {string} [encoding] - Optional encoding for data
 * @param {Function} [callback] - Optional callback when finished
 * @returns {Duplexify} this instance for chaining
 */
end(data, encoding, callback);

Properties

/**
 * Boolean indicating if the stream has been destroyed
 * @type {boolean}
 * @readonly
 */
destroyed;

Options

Configuration options for customizing duplexify behavior:

interface DuplexifyOptions {
  /** Whether to automatically destroy the stream on error (default: true) */
  autoDestroy?: boolean;
  
  /** Whether to destroy underlying streams when this stream is destroyed (default: true) */
  destroy?: boolean;
  
  /** Whether to end underlying writable stream when this stream ends (default: true) */
  end?: boolean;
  
  /** Standard Node.js stream options */
  objectMode?: boolean;
  highWaterMark?: number;
  encoding?: string;
}

Events

Duplexify inherits all standard Node.js stream events and adds custom events:

// Standard stream events
dup.on('data', (chunk) => {});        // Readable data available
dup.on('end', () => {});              // Readable side ended
dup.on('finish', () => {});           // Writable side finished
dup.on('close', () => {});            // Stream closed
dup.on('error', (err) => {});         // Error occurred

// Duplexify-specific events  
dup.on('cork', () => {});             // Stream was corked
dup.on('uncork', () => {});           // Stream was uncorked
dup.on('preend', () => {});           // Before stream ends
dup.on('prefinish', () => {});        // Before stream finishes (writable side)

Error Handling

Duplexify automatically handles error propagation between component streams:

  • Errors from either component stream bubble up to the duplex stream
  • Only the first error is emitted (subsequent errors are ignored)
  • 'premature close' errors are filtered out (converted to null)
  • The autoDestroy option controls automatic destruction on error
  • Write errors from the underlying writable stream are propagated via the error event
  • Errors during destroy() are emitted unless a callback is provided
dup.on('error', (err) => {
  // Handle errors from component streams or write operations
  console.error('Stream error:', err.message);
});

// Destroy with error
dup.destroy(new Error('custom error'));

Usage Examples

HTTP Request Wrapper

const duplexify = require('duplexify');
const http = require('http');

function request(opts) {
  const req = http.request(opts);
  const dup = duplexify(req);
  
  req.on('response', (res) => {
    dup.setReadable(res);
  });
  
  return dup;
}

const req = request({
  method: 'GET',
  host: 'www.example.com',
  port: 80
});

req.end();
req.pipe(process.stdout);

Stream Processing Pipeline

const duplexify = require('duplexify');
const through = require('through2');

// Create processing pipeline
const input = through();
const output = through();
const processor = duplexify(output, input);

// Process data
processor.write('hello');
processor.write(' world');
processor.end();

processor.on('data', (chunk) => {
  console.log('processed:', chunk.toString());
});

Async Stream Setup

const duplexify = require('duplexify');

// Create duplex before streams are available
const dup = duplexify();

// Writes are buffered
dup.write('this will be buffered');
dup.write('until writable is set');

// Set up streams asynchronously
async function setupStreams() {
  const writable = await getWritableStream();
  const readable = await getReadableStream();
  
  dup.setWritable(writable);
  dup.setReadable(readable);
}

setupStreams();

Cork/Uncork Flow Control

const duplexify = require('duplexify');
const through = require('through2');

const passthrough = through();
const dup = duplexify(passthrough, passthrough);

// Cork stream to buffer writes
dup.on('prefinish', () => {
  console.log('Stream about to finish, corking for cleanup...');
  dup.cork();
  
  // Perform cleanup operations
  setTimeout(() => {
    console.log('Cleanup complete, uncorking...');
    dup.uncork();
  }, 100);
});

dup.on('finish', () => {
  console.log('Stream finished');
});

dup.write('data');
dup.end();

Install with Tessl CLI

npx tessl i tessl/npm-duplexify
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/duplexify@4.1.x
Publish Source
CLI
Badge
tessl/npm-duplexify badge