0
# merge-stream
1
2
Merge-stream is a lightweight Node.js streaming utility that enables merging (interleaving) multiple readable streams into a single output stream. Built on Node.js Streams3 API using PassThrough streams, it provides a simple yet powerful interface for stream aggregation with automatic error handling and lifecycle management.
3
4
## Package Information
5
6
- **Package Name**: merge-stream
7
- **Package Type**: npm
8
- **Language**: JavaScript (Node.js)
9
- **Installation**: `npm install merge-stream`
10
11
## Core Imports
12
13
```javascript
14
const mergeStream = require('merge-stream');
15
```
16
17
For ES modules:
18
19
```javascript
20
import mergeStream from 'merge-stream';
21
```
22
23
## Basic Usage
24
25
```javascript
26
const mergeStream = require('merge-stream');
27
const fs = require('fs');
28
29
// Merge multiple readable streams
30
const stream1 = fs.createReadStream('file1.txt');
31
const stream2 = fs.createReadStream('file2.txt');
32
33
const merged = mergeStream(stream1, stream2);
34
35
merged.on('data', (chunk) => {
36
console.log('Data:', chunk.toString());
37
});
38
39
merged.on('end', () => {
40
console.log('All streams completed');
41
});
42
43
// Dynamic addition of streams
44
const stream3 = fs.createReadStream('file3.txt');
45
merged.add(stream3);
46
47
// Check if merged stream has any sources
48
if (!merged.isEmpty()) {
49
// Process the merged stream
50
merged.pipe(process.stdout);
51
}
52
```
53
54
## Architecture
55
56
merge-stream is built around several key components:
57
58
- **PassThrough Stream**: Uses Node.js PassThrough stream in object mode as the base
59
- **Source Management**: Tracks active source streams and manages their lifecycle
60
- **Event Propagation**: Forwards data, error, and end events from source streams
61
- **Dynamic Addition**: Allows adding new source streams during runtime
62
- **Auto-cleanup**: Automatically removes ended streams and closes output when no sources remain
63
64
## Capabilities
65
66
### Stream Creation and Merging
67
68
Creates a merged stream from multiple source streams with variadic argument support.
69
70
```javascript { .api }
71
/**
72
* Creates a merged stream from multiple source streams
73
* @param {...(Stream|Stream[])} streams - Source streams or arrays of streams to merge
74
* @returns {PassThrough} PassThrough stream with additional methods (add, isEmpty)
75
*/
76
function mergeStream(...streams);
77
```
78
79
The returned stream is a PassThrough stream in object mode with the following properties:
80
- `objectMode: true` - Handles both object and buffer data
81
- `maxListeners: 0` - Unlimited event listeners to prevent warnings
82
- Additional methods: `add()` and `isEmpty()`
83
84
**Usage Examples:**
85
86
```javascript
87
const mergeStream = require('merge-stream');
88
89
// Create merged stream with initial sources
90
const merged = mergeStream(stream1, stream2);
91
92
// Create empty merged stream
93
const empty = mergeStream();
94
95
// Create with array of streams
96
const fromArray = mergeStream([stream1, stream2, stream3]);
97
98
// Mixed arguments
99
const mixed = mergeStream(stream1, [stream2, stream3], stream4);
100
```
101
102
### Dynamic Stream Addition
103
104
Dynamically adds more source streams to an existing merged stream.
105
106
```javascript { .api }
107
/**
108
* Dynamically adds source streams to the merged stream
109
* @param {Stream|Stream[]} source - A readable stream or array of streams to add
110
* @returns {PassThrough} Returns this for method chaining
111
*/
112
merged.add(source);
113
```
114
115
**Usage Examples:**
116
117
```javascript
118
const merged = mergeStream();
119
120
// Add single stream
121
merged.add(stream1);
122
123
// Add array of streams
124
merged.add([stream2, stream3]);
125
126
// Method chaining
127
merged
128
.add(stream1)
129
.add([stream2, stream3])
130
.add(stream4);
131
```
132
133
### Stream State Checking
134
135
Checks if the merged stream has any active source streams.
136
137
```javascript { .api }
138
/**
139
* Checks if the merged stream has any active source streams
140
* @returns {boolean} true if no sources are active, false otherwise
141
*/
142
merged.isEmpty();
143
```
144
145
**Usage Examples:**
146
147
```javascript
148
const merged = mergeStream();
149
150
console.log(merged.isEmpty()); // true
151
152
merged.add(stream1);
153
console.log(merged.isEmpty()); // false
154
155
// Conditional processing based on stream state
156
return merged.isEmpty() ? null : merged;
157
```
158
159
## Stream Lifecycle and Event Handling
160
161
The merged stream automatically handles the complete lifecycle of source streams:
162
163
### Automatic Event Management
164
165
- **Data Events**: All data from source streams is forwarded to the merged stream
166
- **End Events**: Source streams are automatically removed when they end
167
- **Error Events**: Errors from source streams are propagated to the merged stream
168
- **Unpipe Events**: Streams that are manually unpiped are removed from tracking
169
170
### End Behavior
171
172
The merged stream ends automatically when:
173
- All source streams have ended
174
- No more source streams are active
175
- The merged stream is readable (not already ended)
176
177
### Error Propagation
178
179
Errors from any source stream are automatically propagated to the merged stream without terminating other active sources.
180
181
```javascript
182
merged.on('error', (err) => {
183
console.error('Stream error:', err.message);
184
// Error came from one of the source streams
185
// Other source streams continue operating normally
186
});
187
```
188
189
## Advanced Usage Patterns
190
191
### Gulp Integration
192
193
Commonly used in Gulp build tasks to combine multiple processing streams:
194
195
```javascript
196
const gulp = require('gulp');
197
const htmlValidator = require('gulp-w3c-html-validator');
198
const jsHint = require('gulp-jshint');
199
const mergeStream = require('merge-stream');
200
201
function lint() {
202
return mergeStream(
203
gulp.src('src/*.html')
204
.pipe(htmlValidator())
205
.pipe(htmlValidator.reporter()),
206
gulp.src('src/*.js')
207
.pipe(jsHint())
208
.pipe(jsHint.reporter())
209
);
210
}
211
```
212
213
### Conditional Stream Processing
214
215
Using `isEmpty()` for conditional returns in build systems:
216
217
```javascript
218
function processFiles(patterns) {
219
const stream = mergeStream();
220
221
patterns.forEach(pattern => {
222
if (glob.sync(pattern).length > 0) {
223
stream.add(gulp.src(pattern).pipe(processFile()));
224
}
225
});
226
227
// Return null if no files match, otherwise return the stream
228
return stream.isEmpty() ? null : stream;
229
}
230
```
231
232
### Dynamic Stream Addition
233
234
Adding streams based on runtime conditions:
235
236
```javascript
237
const merged = mergeStream();
238
239
// Add streams conditionally
240
if (process.env.NODE_ENV === 'development') {
241
merged.add(devStream);
242
}
243
244
if (enableFeature) {
245
merged.add(featureStream);
246
}
247
248
// Process only if streams were added
249
if (!merged.isEmpty()) {
250
merged.pipe(destination);
251
}
252
```