0
# Stream Operators
1
2
Functional programming methods available on Readable streams for data transformation and processing. These operators provide a chainable API for stream manipulation and are inspired by functional programming patterns.
3
4
## Capabilities
5
6
### Stream-Returning Operators
7
8
Operators that return new stream instances, allowing for chainable transformations.
9
10
#### map
11
12
Transforms each chunk using the provided function, with full type safety.
13
14
```javascript { .api }
15
/**
16
* Transform each chunk using a mapping function
17
* @param fn - Function to transform each chunk
18
* @param options - Optional configuration
19
* @returns New readable stream with transformed data
20
*/
21
map(fn: (chunk: any, options?: any) => any, options?: any): Readable;
22
```
23
24
**Usage Examples:**
25
26
```javascript
27
const { Readable } = require('readable-stream');
28
29
// Create a source stream
30
const source = Readable.from([1, 2, 3, 4, 5]);
31
32
// Transform each number by doubling it
33
const doubled = source.map((num) => num * 2);
34
35
doubled.on('data', (chunk) => {
36
console.log('Doubled:', chunk); // 2, 4, 6, 8, 10
37
});
38
39
// String transformation example
40
const words = Readable.from(['hello', 'world', 'stream']);
41
const uppercase = words.map((word) => word.toUpperCase());
42
43
uppercase.on('data', (chunk) => {
44
console.log('Uppercase:', chunk); // "HELLO", "WORLD", "STREAM"
45
});
46
```
47
48
#### filter
49
50
Filters chunks based on a predicate function.
51
52
```javascript { .api }
53
/**
54
* Filter chunks based on a predicate
55
* @param fn - Predicate function that returns true to keep the chunk
56
* @param options - Optional configuration
57
* @returns New readable stream with filtered data
58
*/
59
filter(fn: (chunk: any, options?: any) => boolean, options?: any): Readable;
60
```
61
62
**Usage Examples:**
63
64
```javascript
65
const { Readable } = require('readable-stream');
66
67
// Filter even numbers
68
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
69
const evenNumbers = numbers.filter((num) => num % 2 === 0);
70
71
evenNumbers.on('data', (chunk) => {
72
console.log('Even:', chunk); // 2, 4, 6, 8, 10
73
});
74
75
// Filter strings by length
76
const words = Readable.from(['cat', 'elephant', 'dog', 'hippopotamus']);
77
const longWords = words.filter((word) => word.length > 5);
78
79
longWords.on('data', (chunk) => {
80
console.log('Long word:', chunk); // "elephant", "hippopotamus"
81
});
82
```
83
84
#### drop
85
86
Skips the first N chunks from the stream.
87
88
```javascript { .api }
89
/**
90
* Skip the first N chunks
91
* @param number - Number of chunks to skip
92
* @param options - Optional configuration
93
* @returns New readable stream with first N chunks dropped
94
*/
95
drop(number: number, options?: any): Readable;
96
```
97
98
**Usage Examples:**
99
100
```javascript
101
const { Readable } = require('readable-stream');
102
103
// Skip first 3 numbers
104
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);
105
const afterDrop = numbers.drop(3);
106
107
afterDrop.on('data', (chunk) => {
108
console.log('After drop:', chunk); // 4, 5, 6, 7
109
});
110
```
111
112
#### take
113
114
Takes only the first N chunks from the stream.
115
116
```javascript { .api }
117
/**
118
* Take only the first N chunks
119
* @param number - Number of chunks to take
120
* @param options - Optional configuration
121
* @returns New readable stream with only first N chunks
122
*/
123
take(number: number, options?: any): Readable;
124
```
125
126
**Usage Examples:**
127
128
```javascript
129
const { Readable } = require('readable-stream');
130
131
// Take first 3 numbers
132
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);
133
const firstThree = numbers.take(3);
134
135
firstThree.on('data', (chunk) => {
136
console.log('First three:', chunk); // 1, 2, 3
137
});
138
```
139
140
#### asIndexedPairs (Deprecated)
141
142
**⚠️ DEPRECATED:** This operator will be removed in a future version.
143
144
Creates index-value pairs for each chunk in the stream.
145
146
```javascript { .api }
147
/**
148
* @deprecated This operator will be removed in a future version
149
* Map each chunk to [index, value] pairs
150
* @param options - Optional configuration
151
* @returns New readable stream with indexed pairs
152
*/
153
asIndexedPairs(options?: any): Readable;
154
```
155
156
#### flatMap
157
158
Maps each chunk to an iterable and flattens the results.
159
160
```javascript { .api }
161
/**
162
* Map each chunk to an iterable and flatten the results
163
* @param fn - Function that returns an iterable for each chunk
164
* @param options - Optional configuration
165
* @returns New readable stream with flattened results
166
*/
167
flatMap(fn: (chunk: any, options?: any) => Iterable<any>, options?: any): Readable;
168
```
169
170
**Usage Examples:**
171
172
```javascript
173
const { Readable } = require('readable-stream');
174
175
// Split each string into characters
176
const words = Readable.from(['hello', 'world']);
177
const characters = words.flatMap((word) => word.split(''));
178
179
characters.on('data', (chunk) => {
180
console.log('Character:', chunk); // 'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'
181
});
182
183
// Expand numbers into ranges
184
const ranges = Readable.from([2, 3]);
185
const expanded = ranges.flatMap((n) => Array.from({length: n}, (_, i) => i));
186
187
expanded.on('data', (chunk) => {
188
console.log('Expanded:', chunk); // 0, 1, 0, 1, 2
189
});
190
```
191
192
#### compose
193
194
Compose the readable stream with another transform stream.
195
196
```javascript { .api }
197
/**
198
* Compose this readable with a transform stream
199
* @param stream - Transform stream to compose with
200
* @param options - Optional configuration
201
* @returns New readable stream representing the composition
202
*/
203
compose(stream: NodeJS.ReadWriteStream, options?: any): Readable;
204
```
205
206
**Usage Examples:**
207
208
```javascript
209
const { Readable, Transform } = require('readable-stream');
210
211
// Create a transform stream
212
const upperCaseTransform = new Transform({
213
transform(chunk, encoding, callback) {
214
this.push(chunk.toString().toUpperCase());
215
callback();
216
}
217
});
218
219
// Compose with the transform
220
const source = Readable.from(['hello', 'world']);
221
const composed = source.compose(upperCaseTransform);
222
223
composed.on('data', (chunk) => {
224
console.log('Composed result:', chunk.toString()); // "HELLO", "WORLD"
225
});
226
```
227
228
### Promise-Returning Operators
229
230
Operators that return promises, useful for consuming stream data or performing reductions.
231
232
#### reduce
233
234
Reduces the stream to a single value using an accumulator function.
235
236
```javascript { .api }
237
/**
238
* Reduce the stream to a single value
239
* @param fn - Reducer function
240
* @param initial - Initial accumulator value
241
* @param options - Optional configuration
242
* @returns Promise that resolves to the final accumulated value
243
*/
244
reduce(fn: (previous: any, current: any, options?: any) => any, initial?: any, options?: any): Promise<any>;
245
```
246
247
**Usage Examples:**
248
249
```javascript
250
const { Readable } = require('readable-stream');
251
252
// Sum all numbers
253
const numbers = Readable.from([1, 2, 3, 4, 5]);
254
const sum = await numbers.reduce((acc, num) => acc + num, 0);
255
console.log('Sum:', sum); // 15
256
257
// Concatenate strings
258
const words = Readable.from(['hello', ' ', 'world']);
259
const sentence = await words.reduce((acc, word) => acc + word, '');
260
console.log('Sentence:', sentence); // "hello world"
261
262
// Find maximum value
263
const values = Readable.from([3, 7, 2, 9, 1]);
264
const max = await values.reduce((acc, val) => Math.max(acc, val), -Infinity);
265
console.log('Max:', max); // 9
266
```
267
268
#### toArray
269
270
Collects all chunks from the stream into an array.
271
272
```javascript { .api }
273
/**
274
* Collect all chunks into an array
275
* @param options - Optional configuration
276
* @returns Promise that resolves to an array of all chunks
277
*/
278
toArray(options?: any): Promise<any[]>;
279
```
280
281
**Usage Examples:**
282
283
```javascript
284
const { Readable } = require('readable-stream');
285
286
// Collect all numbers
287
const numbers = Readable.from([1, 2, 3, 4, 5]);
288
const array = await numbers.toArray();
289
console.log('Array:', array); // [1, 2, 3, 4, 5]
290
291
// Collect transformed data
292
const doubled = Readable.from([1, 2, 3]).map(x => x * 2);
293
const result = await doubled.toArray();
294
console.log('Doubled array:', result); // [2, 4, 6]
295
```
296
297
#### forEach
298
299
Executes a function for each chunk in the stream.
300
301
```javascript { .api }
302
/**
303
* Execute a function for each chunk
304
* @param fn - Function to execute for each chunk
305
* @param options - Optional configuration
306
* @returns Promise that resolves when all chunks have been processed
307
*/
308
forEach(fn: (chunk: any, options?: any) => void, options?: any): Promise<void>;
309
```
310
311
**Usage Examples:**
312
313
```javascript
314
const { Readable } = require('readable-stream');
315
316
// Log each chunk
317
const numbers = Readable.from([1, 2, 3, 4, 5]);
318
await numbers.forEach((num) => {
319
console.log('Processing:', num);
320
});
321
console.log('All numbers processed');
322
323
// Perform side effects
324
const data = Readable.from(['file1.txt', 'file2.txt']);
325
await data.forEach((filename) => {
326
console.log('Would process file:', filename);
327
// In real code, you might read/process the file here
328
});
329
```
330
331
#### every
332
333
Tests whether all chunks in the stream pass a predicate test.
334
335
```javascript { .api }
336
/**
337
* Test whether all chunks pass a predicate
338
* @param fn - Predicate function to test each chunk
339
* @param options - Optional configuration
340
* @returns Promise that resolves to true if all chunks pass the test
341
*/
342
every(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;
343
```
344
345
**Usage Examples:**
346
347
```javascript
348
const { Readable } = require('readable-stream');
349
350
// Check if all numbers are positive
351
const positiveNumbers = Readable.from([1, 2, 3, 4, 5]);
352
const allPositive = await positiveNumbers.every(num => num > 0);
353
console.log('All positive:', allPositive); // true
354
355
// Check if all strings are long enough
356
const words = Readable.from(['hello', 'world', 'stream']);
357
const allLongEnough = await words.every(word => word.length >= 5);
358
console.log('All long enough:', allLongEnough); // true
359
```
360
361
#### some
362
363
Tests whether at least one chunk in the stream passes a predicate test.
364
365
```javascript { .api }
366
/**
367
* Test whether at least one chunk passes a predicate
368
* @param fn - Predicate function to test each chunk
369
* @param options - Optional configuration
370
* @returns Promise that resolves to true if any chunk passes the test
371
*/
372
some(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;
373
```
374
375
**Usage Examples:**
376
377
```javascript
378
const { Readable } = require('readable-stream');
379
380
// Check if any number is even
381
const numbers = Readable.from([1, 3, 5, 7, 8]);
382
const hasEven = await numbers.some(num => num % 2 === 0);
383
console.log('Has even number:', hasEven); // true
384
385
// Check if any string contains 'a'
386
const words = Readable.from(['hello', 'world', 'stream']);
387
const hasA = await words.some(word => word.includes('a'));
388
console.log('Has word with "a":', hasA); // true (stream)
389
```
390
391
#### find
392
393
Finds the first chunk that passes a predicate test.
394
395
```javascript { .api }
396
/**
397
* Find the first chunk that passes a predicate
398
* @param fn - Predicate function to test each chunk
399
* @param options - Optional configuration
400
* @returns Promise that resolves to the first matching chunk or undefined
401
*/
402
find(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<any>;
403
```
404
405
**Usage Examples:**
406
407
```javascript
408
const { Readable } = require('readable-stream');
409
410
// Find first even number
411
const numbers = Readable.from([1, 3, 4, 6, 7]);
412
const firstEven = await numbers.find(num => num % 2 === 0);
413
console.log('First even:', firstEven); // 4
414
415
// Find first long word
416
const words = Readable.from(['cat', 'elephant', 'dog']);
417
const firstLongWord = await words.find(word => word.length > 5);
418
console.log('First long word:', firstLongWord); // "elephant"
419
```
420
421
## Chaining Operators
422
423
Stream operators can be chained together to create complex data processing pipelines:
424
425
```javascript
426
const { Readable } = require('readable-stream');
427
428
// Complex processing pipeline
429
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
430
431
const result = await numbers
432
.filter(n => n % 2 === 0) // Keep only even numbers: [2, 4, 6, 8, 10]
433
.map(n => n * n) // Square them: [4, 16, 36, 64, 100]
434
.drop(1) // Skip first: [16, 36, 64, 100]
435
.take(3) // Take first 3: [16, 36, 64]
436
.reduce((sum, n) => sum + n, 0); // Sum them up
437
438
console.log('Result:', result); // 116
439
```
440
441
## Error Handling
442
443
Stream operators properly propagate errors through the chain:
444
445
```javascript
446
const { Readable } = require('readable-stream');
447
448
const source = Readable.from([1, 2, 3, 4, 5]);
449
450
try {
451
const result = await source
452
.map(n => {
453
if (n === 3) throw new Error('Processing error');
454
return n * 2;
455
})
456
.toArray();
457
} catch (error) {
458
console.error('Pipeline error:', error.message); // "Processing error"
459
}
460
```
461
462
## Types
463
464
```javascript { .api }
465
// All operators accept an optional options parameter
466
interface OperatorOptions {
467
signal?: AbortSignal;
468
highWaterMark?: number;
469
[key: string]: any;
470
}
471
```