0
# Pipeline Functions
1
2
StreamX provides powerful pipeline utilities for connecting multiple streams with automatic error handling, cleanup, and callback support. Pipelines handle stream lifecycle management and ensure proper resource cleanup.
3
4
## Capabilities
5
6
### Pipeline Function
7
8
Connects multiple streams together with comprehensive error handling and automatic cleanup.
9
10
```javascript { .api }
11
/**
12
* Pipe multiple streams together with error handling and cleanup
13
* @param streams - Streams to connect, with optional callback as last argument
14
* @returns The last stream in the pipeline
15
*/
16
function pipeline(...streams: (Stream | ((err?: Error) => void))[]): Stream;
17
```
18
19
**Usage Examples:**
20
21
```javascript
22
const { pipeline, Readable, Transform, Writable } = require('streamx');
23
24
// Basic pipeline with callback
25
const lastStream = pipeline(
26
Readable.from(['hello', 'world', 'from', 'streamx']),
27
new Transform({
28
transform(data, cb) {
29
cb(null, data.toString().toUpperCase());
30
}
31
}),
32
new Writable({
33
write(data, cb) {
34
console.log('Output:', data.toString());
35
cb();
36
}
37
}),
38
(err) => {
39
if (err) {
40
console.error('Pipeline failed:', err);
41
} else {
42
console.log('Pipeline completed successfully');
43
}
44
}
45
);
46
47
// Pipeline without callback
48
const result = pipeline(
49
source,
50
transformer1,
51
transformer2,
52
destination
53
);
54
55
// Monitor the last stream
56
result.on('finish', () => {
57
console.log('Pipeline finished');
58
});
59
```
60
61
### Pipeline Promise
62
63
Promise-based version of pipeline for async/await workflows.
64
65
```javascript { .api }
66
/**
67
* Promise-based pipeline that resolves when complete or rejects on error
68
* @param streams - Streams to connect in pipeline
69
* @returns Promise that resolves when pipeline completes
70
*/
71
function pipelinePromise(...streams: Stream[]): Promise<void>;
72
```
73
74
**Usage Examples:**
75
76
```javascript
77
const { pipelinePromise, Readable, Transform, Writable } = require('streamx');
78
79
// Using async/await
80
async function processData() {
81
try {
82
await pipelinePromise(
83
Readable.from(['data1', 'data2', 'data3']),
84
new Transform({
85
transform(data, cb) {
86
const processed = `Processed: ${data}`;
87
cb(null, processed);
88
}
89
}),
90
new Writable({
91
write(data, cb) {
92
console.log(data.toString());
93
cb();
94
}
95
})
96
);
97
98
console.log('Pipeline completed successfully');
99
} catch (err) {
100
console.error('Pipeline failed:', err);
101
}
102
}
103
104
// Multiple pipelines in sequence
105
async function sequentialProcessing() {
106
await pipelinePromise(sourceA, transformA, destinationA);
107
await pipelinePromise(sourceB, transformB, destinationB);
108
console.log('All pipelines completed');
109
}
110
111
// Parallel pipelines
112
async function parallelProcessing() {
113
await Promise.all([
114
pipelinePromise(source1, transform1, dest1),
115
pipelinePromise(source2, transform2, dest2),
116
pipelinePromise(source3, transform3, dest3)
117
]);
118
console.log('All parallel pipelines completed');
119
}
120
```
121
122
### Utility Functions
123
124
Helper functions for stream inspection and state checking.
125
126
```javascript { .api }
127
/**
128
* Check if an object is a stream (Node.js or StreamX)
129
* @param obj - Object to check
130
* @returns True if object is a stream
131
*/
132
function isStream(obj: any): boolean;
133
134
/**
135
* Check if an object is specifically a StreamX stream
136
* @param obj - Object to check
137
* @returns True if object is a StreamX stream
138
*/
139
function isStreamx(obj: any): boolean;
140
141
/**
142
* Check if a readable stream has ended
143
* @param stream - Readable stream to check
144
* @returns True if stream has ended
145
*/
146
function isEnded(stream: Readable): boolean;
147
148
/**
149
* Check if a writable stream has finished
150
* @param stream - Writable stream to check
151
* @returns True if stream has finished
152
*/
153
function isFinished(stream: Writable): boolean;
154
155
/**
156
* Check if a readable stream has been disturbed (data read from it)
157
* @param stream - Readable stream to check
158
* @returns True if stream has been disturbed
159
*/
160
function isDisturbed(stream: Readable): boolean;
161
162
/**
163
* Get error from a stream if any exists
164
* @param stream - Stream to check for errors
165
* @param opts - Optional configuration
166
* @returns Error object or null if no error
167
*/
168
function getStreamError(stream: Stream, opts?: object): Error | null;
169
```
170
171
**Utility Function Examples:**
172
173
```javascript
174
const {
175
isStream,
176
isStreamx,
177
isEnded,
178
isFinished,
179
isDisturbed,
180
getStreamError,
181
Readable,
182
Writable
183
} = require('streamx');
184
const fs = require('fs');
185
186
// Check stream types
187
const streamxReadable = new Readable();
188
const nodeReadable = fs.createReadStream('file.txt');
189
190
console.log(isStream(streamxReadable)); // true
191
console.log(isStream(nodeReadable)); // true
192
console.log(isStreamx(streamxReadable)); // true
193
console.log(isStreamx(nodeReadable)); // false
194
195
// Check stream states
196
const readable = Readable.from(['data1', 'data2']);
197
198
console.log(isEnded(readable)); // false
199
console.log(isDisturbed(readable)); // false
200
201
readable.read(); // Disturb the stream
202
console.log(isDisturbed(readable)); // true
203
204
// Monitor writable stream
205
const writable = new Writable({
206
write(data, cb) {
207
console.log('Written:', data.toString());
208
cb();
209
}
210
});
211
212
writable.write('test');
213
writable.end();
214
215
writable.on('finish', () => {
216
console.log(isFinished(writable)); // true
217
});
218
219
// Error checking
220
const errorStream = new Readable({
221
read(cb) {
222
cb(new Error('Read failed'));
223
}
224
});
225
226
setTimeout(() => {
227
const error = getStreamError(errorStream);
228
if (error) {
229
console.log('Stream has error:', error.message);
230
}
231
}, 100);
232
```
233
234
### Advanced Pipeline Patterns
235
236
StreamX pipelines support advanced patterns for complex stream processing workflows.
237
238
**Conditional Pipeline:**
239
240
```javascript
241
async function conditionalPipeline(data, shouldTransform) {
242
const streams = [
243
Readable.from(data)
244
];
245
246
if (shouldTransform) {
247
streams.push(new Transform({
248
transform(chunk, cb) {
249
cb(null, chunk.toString().toUpperCase());
250
}
251
}));
252
}
253
254
streams.push(new Writable({
255
write(chunk, cb) {
256
console.log('Result:', chunk.toString());
257
cb();
258
}
259
}));
260
261
await pipelinePromise(...streams);
262
}
263
```
264
265
**Pipeline with Error Recovery:**
266
267
```javascript
268
async function resilientPipeline() {
269
const maxRetries = 3;
270
let attempt = 0;
271
272
while (attempt < maxRetries) {
273
try {
274
await pipelinePromise(
275
createSource(),
276
createTransform(),
277
createDestination()
278
);
279
280
console.log('Pipeline succeeded');
281
break;
282
} catch (err) {
283
attempt++;
284
console.log(`Attempt ${attempt} failed:`, err.message);
285
286
if (attempt >= maxRetries) {
287
throw new Error(`Pipeline failed after ${maxRetries} attempts`);
288
}
289
290
// Wait before retry
291
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
292
}
293
}
294
}
295
```
296
297
**Pipeline with Monitoring:**
298
299
```javascript
300
function createMonitoredPipeline() {
301
const source = Readable.from(['data1', 'data2', 'data3']);
302
const transform = new Transform({
303
transform(data, cb) {
304
console.log('Transforming:', data.toString());
305
cb(null, data.toString().toUpperCase());
306
}
307
});
308
const destination = new Writable({
309
write(data, cb) {
310
console.log('Writing:', data.toString());
311
cb();
312
}
313
});
314
315
// Monitor each stream
316
[source, transform, destination].forEach((stream, index) => {
317
stream.on('error', (err) => {
318
console.error(`Stream ${index} error:`, err.message);
319
});
320
321
stream.on('close', () => {
322
console.log(`Stream ${index} closed`);
323
});
324
});
325
326
return pipeline(source, transform, destination, (err) => {
327
if (err) {
328
console.error('Pipeline error:', err.message);
329
} else {
330
console.log('Pipeline completed successfully');
331
}
332
});
333
}
334
```
335
336
**Dynamic Pipeline Construction:**
337
338
```javascript
339
function createDynamicPipeline(config) {
340
const streams = [
341
Readable.from(config.data)
342
];
343
344
// Add transforms based on configuration
345
if (config.uppercase) {
346
streams.push(new Transform({
347
transform(data, cb) {
348
cb(null, data.toString().toUpperCase());
349
}
350
}));
351
}
352
353
if (config.prefix) {
354
streams.push(new Transform({
355
transform(data, cb) {
356
cb(null, `${config.prefix}: ${data}`);
357
}
358
}));
359
}
360
361
if (config.filter) {
362
streams.push(new Transform({
363
transform(data, cb) {
364
if (data.toString().includes(config.filter)) {
365
cb(null, data);
366
} else {
367
cb(); // Skip this data
368
}
369
}
370
}));
371
}
372
373
// Add destination
374
streams.push(new Writable({
375
write(data, cb) {
376
console.log('Final output:', data.toString());
377
cb();
378
}
379
}));
380
381
return pipelinePromise(...streams);
382
}
383
384
// Usage
385
createDynamicPipeline({
386
data: ['hello', 'world', 'test'],
387
uppercase: true,
388
prefix: 'OUTPUT',
389
filter: 'world'
390
});
391
```
392
393
### Error Handling
394
395
Pipelines provide comprehensive error handling with automatic cleanup of all streams.
396
397
```javascript
398
const { pipeline } = require('streamx');
399
400
// Pipeline with error-prone transform
401
pipeline(
402
Readable.from(['good', 'error', 'data']),
403
new Transform({
404
transform(data, cb) {
405
if (data.toString() === 'error') {
406
return cb(new Error('Transform failed'));
407
}
408
cb(null, data.toString().toUpperCase());
409
}
410
}),
411
new Writable({
412
write(data, cb) {
413
console.log('Success:', data.toString());
414
cb();
415
}
416
}),
417
(err) => {
418
if (err) {
419
console.error('Pipeline failed:', err.message);
420
// All streams are automatically cleaned up
421
}
422
}
423
);
424
```