Duplexify turns separate writable and readable streams into a single streams2 duplex stream with support for async initialization and streams1/streams2 input. It enables combining independent readable and writable stream components into a unified interface that behaves as a standard duplex stream.
npm install duplexifyconst duplexify = require('duplexify');For ES modules:
import duplexify from 'duplexify';const duplexify = require('duplexify');
const fs = require('fs');
// Create duplex stream from separate readable and writable streams
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
const dup = duplexify(writeStream, readStream);
// Use as a standard duplex stream
dup.write('hello world');
dup.on('data', (data) => {
console.log('received:', data.toString());
});
// Async initialization - set streams later
const asyncDup = duplexify();
asyncDup.write('buffered until writable is set');
// Set streams asynchronously
setTimeout(() => {
asyncDup.setWritable(writeStream);
asyncDup.setReadable(readStream);
}, 100);Duplexify uses several key design patterns:
Creates a duplex stream from separate writable and readable streams.
/**
* Create a duplex stream from separate writable and readable streams
* @param {stream.Writable} [writable] - Writable stream component
* @param {stream.Readable} [readable] - Readable stream component
* @param {Object} [opts] - Stream options
* @param {boolean} [opts.autoDestroy=true] - Auto-destroy on error
* @param {boolean} [opts.destroy=true] - Forward destroy to component streams
* @param {boolean} [opts.end=true] - Forward end to writable stream
* @returns {Duplexify} Duplex stream instance
*/
function duplexify(writable, readable, opts);Creates an object-mode duplex stream with preset options.
/**
* Create an object-mode duplex stream
* @param {stream.Writable} [writable] - Writable stream component
* @param {stream.Readable} [readable] - Readable stream component
* @param {Object} [opts] - Additional stream options
* @returns {Duplexify} Object-mode duplex stream instance
*/
duplexify.obj = function(writable, readable, opts);Methods for dynamically setting and managing the component streams.
/**
* Set or replace the writable component of the duplex stream
* @param {stream.Writable|null|false} writable - Writable stream or null/false to disable
*/
setWritable(writable);
/**
* Set or replace the readable component of the duplex stream
* @param {stream.Readable|null|false} readable - Readable stream or null/false to disable
*/
setReadable(readable);Methods for controlling stream flow and buffering behavior.
/**
* Cork the stream to buffer write operations
*/
cork();
/**
* Uncork the stream to flush buffered writes
*/
uncork();Methods for managing the stream lifecycle and cleanup.
/**
* Destroy the duplex stream and optionally underlying streams
* @param {Error} [err] - Optional error to emit during destruction
* @param {Function} [cb] - Optional callback called when destruction is complete
*/
destroy(err, cb);
/**
* End the writable side of the duplex stream
* @param {*} [data] - Optional final data to write
* @param {string} [encoding] - Optional encoding for data
* @param {Function} [callback] - Optional callback when finished
* @returns {Duplexify} this instance for chaining
*/
end(data, encoding, callback);/**
* Boolean indicating if the stream has been destroyed
* @type {boolean}
* @readonly
*/
destroyed;Configuration options for customizing duplexify behavior:
interface DuplexifyOptions {
/** Whether to automatically destroy the stream on error (default: true) */
autoDestroy?: boolean;
/** Whether to destroy underlying streams when this stream is destroyed (default: true) */
destroy?: boolean;
/** Whether to end underlying writable stream when this stream ends (default: true) */
end?: boolean;
/** Standard Node.js stream options */
objectMode?: boolean;
highWaterMark?: number;
encoding?: string;
}Duplexify inherits all standard Node.js stream events and adds custom events:
// Standard stream events
dup.on('data', (chunk) => {}); // Readable data available
dup.on('end', () => {}); // Readable side ended
dup.on('finish', () => {}); // Writable side finished
dup.on('close', () => {}); // Stream closed
dup.on('error', (err) => {}); // Error occurred
// Duplexify-specific events
dup.on('cork', () => {}); // Stream was corked
dup.on('uncork', () => {}); // Stream was uncorked
dup.on('preend', () => {}); // Before stream ends
dup.on('prefinish', () => {}); // Before stream finishes (writable side)Duplexify automatically handles error propagation between component streams:
autoDestroy option controls automatic destruction on errorerror eventdestroy() are emitted unless a callback is provideddup.on('error', (err) => {
// Handle errors from component streams or write operations
console.error('Stream error:', err.message);
});
// Destroy with error
dup.destroy(new Error('custom error'));const duplexify = require('duplexify');
const http = require('http');
function request(opts) {
const req = http.request(opts);
const dup = duplexify(req);
req.on('response', (res) => {
dup.setReadable(res);
});
return dup;
}
const req = request({
method: 'GET',
host: 'www.example.com',
port: 80
});
req.end();
req.pipe(process.stdout);const duplexify = require('duplexify');
const through = require('through2');
// Create processing pipeline
const input = through();
const output = through();
const processor = duplexify(output, input);
// Process data
processor.write('hello');
processor.write(' world');
processor.end();
processor.on('data', (chunk) => {
console.log('processed:', chunk.toString());
});const duplexify = require('duplexify');
// Create duplex before streams are available
const dup = duplexify();
// Writes are buffered
dup.write('this will be buffered');
dup.write('until writable is set');
// Set up streams asynchronously
async function setupStreams() {
const writable = await getWritableStream();
const readable = await getReadableStream();
dup.setWritable(writable);
dup.setReadable(readable);
}
setupStreams();const duplexify = require('duplexify');
const through = require('through2');
const passthrough = through();
const dup = duplexify(passthrough, passthrough);
// Cork stream to buffer writes
dup.on('prefinish', () => {
console.log('Stream about to finish, corking for cleanup...');
dup.cork();
// Perform cleanup operations
setTimeout(() => {
console.log('Cleanup complete, uncorking...');
dup.uncork();
}, 100);
});
dup.on('finish', () => {
console.log('Stream finished');
});
dup.write('data');
dup.end();