A stream that emits multiple other streams one after another
npx @tessl/cli install tessl/npm-combined-stream@1.0.0Combined Stream is a Node.js streaming utility that enables combining multiple streams into a single stream that emits them sequentially one after another. It provides flexible stream management with configurable back pressure control, memory management via buffer size limits, and supports both immediate and lazy stream loading.
npm install combined-streamconst CombinedStream = require('combined-stream');const CombinedStream = require('combined-stream');
const fs = require('fs');
// Create a combined stream
const combinedStream = CombinedStream.create();
// Append multiple streams
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
// Pipe to destination
combinedStream.pipe(fs.createWriteStream('combined.txt'));Create a new combined stream instance with optional configuration.
/**
* Create a new CombinedStream instance
* @param {Object} [options] - Configuration options
* @param {number} [options.maxDataSize] - Maximum buffer size in bytes (default: 2MB)
* @param {boolean} [options.pauseStreams] - Whether to pause streams (default: true)
* @returns {CombinedStream} New CombinedStream instance
*/
CombinedStream.create(options);Usage Examples:
// Basic creation
const combinedStream = CombinedStream.create();
// With custom options
const combinedStream = CombinedStream.create({
maxDataSize: 5 * 1024 * 1024, // 5MB limit
pauseStreams: false // Don't pause source streams
});Test whether an object is stream-like for internal processing.
/**
* Test if an object is stream-like
* @param {any} stream - Object to test
* @returns {boolean} True if stream-like, false otherwise
*/
CombinedStream.isStreamLike(stream);Append streams, strings, buffers, or callback functions to the combined stream.
/**
* Append a stream, string, buffer, or callback to the combined stream
* @param {Stream|Function|string|Buffer} stream - Item to append
* @returns {CombinedStream} This instance for chaining
*/
combinedStream.append(stream);Usage Examples:
const combinedStream = CombinedStream.create();
// Append file streams
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
// Append strings and buffers
combinedStream.append('Hello ');
combinedStream.append(Buffer.from('World!'));
// Append lazy streams via callback
combinedStream.append(function(next) {
// Stream is created only when needed
next(fs.createReadStream('lazy-file.txt'));
});
// Method chaining
combinedStream
.append(fs.createReadStream('file1.txt'))
.append('separator')
.append(fs.createReadStream('file2.txt'));Pipe the combined stream to a destination and start flowing.
/**
* Pipe combined stream to destination and start processing
* @param {Stream} dest - Destination stream
* @param {Object} [options] - Pipe options
* @returns {Stream} The destination stream
*/
combinedStream.pipe(dest, options);Control stream processing with pause and resume operations.
/**
* Pause stream processing (if pauseStreams is enabled)
*/
combinedStream.pause();
/**
* Resume stream processing and start draining queued streams
*/
combinedStream.resume();Usage Example:
const combinedStream = CombinedStream.create({pauseStreams: true});
// Add streams
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
// Control processing
combinedStream.pause(); // Pause current stream
combinedStream.resume(); // Resume processingEnd or destroy the combined stream.
/**
* End the stream gracefully, emitting 'end' event
*/
combinedStream.end();
/**
* Destroy the stream immediately, emitting 'close' event
*/
combinedStream.destroy();Write data directly to the stream (primarily for internal use).
/**
* Write data to stream (emits 'data' event)
* @param {any} data - Data to write
*/
combinedStream.write(data);Available options for CombinedStream.create():
interface CombinedStreamOptions {
/** Maximum buffer size in bytes (default: 2 * 1024 * 1024) */
maxDataSize?: number;
/** Whether to pause underlying streams (default: true) */
pauseStreams?: boolean;
}interface CombinedStream {
/** Whether the stream accepts writes */
writable: boolean;
/** Whether the stream produces data */
readable: boolean;
/** Current buffer size in bytes */
dataSize: number;
/** Maximum allowed buffer size */
maxDataSize: number;
/** Whether to pause underlying streams */
pauseStreams: boolean;
}Combined streams emit standard Node.js stream events:
Usage Example:
const combinedStream = CombinedStream.create();
combinedStream.on('data', (chunk) => {
console.log('Received:', chunk.toString());
});
combinedStream.on('end', () => {
console.log('All streams completed');
});
combinedStream.on('error', (err) => {
console.error('Stream error:', err);
});Combined Stream automatically handles errors from appended streams and re-emits them. When maxDataSize is exceeded, an error is emitted:
const combinedStream = CombinedStream.create({maxDataSize: 1024});
combinedStream.on('error', (err) => {
if (err.message.includes('maxDataSize')) {
console.error('Buffer limit exceeded');
}
});Use callback functions to defer stream creation until needed:
const combinedStream = CombinedStream.create();
combinedStream.append(function(next) {
// This runs only when it's time to process this stream
const stream = someExpensiveStreamCreation();
next(stream);
});Control memory usage with maxDataSize and monitoring:
const combinedStream = CombinedStream.create({
maxDataSize: 10 * 1024 * 1024 // 10MB limit
});
// Monitor buffer size
setInterval(() => {
console.log('Buffer size:', combinedStream.dataSize, 'bytes');
}, 1000);Manage stream pressure with pauseStreams setting:
// Immediate processing (no back pressure)
const fastStream = CombinedStream.create({pauseStreams: false});
// Controlled processing (with back pressure)
const controlledStream = CombinedStream.create({pauseStreams: true});