0
# Promise API
1
2
Promise-based versions of utility functions that integrate seamlessly with modern async/await patterns. These functions provide the same functionality as their callback-based counterparts but return promises for cleaner async code.
3
4
## Capabilities
5
6
### promises.pipeline
7
8
Promise-based version of the pipeline utility function for composing streams.
9
10
```javascript { .api }
11
/**
12
* Promise-based pipeline for composing multiple streams
13
* @param streams - Sequence of streams to pipe together
14
* @returns Promise that resolves when the pipeline completes successfully
15
*/
16
const promises = {
17
pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
18
};
19
```
20
21
**Usage Examples:**
22
23
```javascript
24
const { promises, Transform } = require('readable-stream');
25
const fs = require('fs');
26
27
// Async/await pipeline
28
async function processFile() {
29
try {
30
await promises.pipeline(
31
fs.createReadStream('input.txt'),
32
new Transform({
33
transform(chunk, encoding, callback) {
34
// Convert to uppercase
35
this.push(chunk.toString().toUpperCase());
36
callback();
37
}
38
}),
39
fs.createWriteStream('output.txt')
40
);
41
console.log('File processed successfully');
42
} catch (error) {
43
console.error('Pipeline failed:', error);
44
}
45
}
46
47
// Multiple transform pipeline
48
async function complexProcessing() {
49
const upperCase = new Transform({
50
transform(chunk, encoding, callback) {
51
this.push(chunk.toString().toUpperCase());
52
callback();
53
}
54
});
55
56
const addLineNumbers = new Transform({
57
objectMode: true,
58
transform(chunk, encoding, callback) {
59
const lines = chunk.toString().split('\n');
60
lines.forEach((line, index) => {
61
if (line.trim()) {
62
this.push(`${index + 1}: ${line}\n`);
63
}
64
});
65
callback();
66
}
67
});
68
69
try {
70
await promises.pipeline(
71
fs.createReadStream('source.txt'),
72
upperCase,
73
addLineNumbers,
74
fs.createWriteStream('numbered.txt')
75
);
76
console.log('Complex processing completed');
77
} catch (error) {
78
console.error('Processing failed:', error);
79
}
80
}
81
```
82
83
### promises.finished
84
85
Promise-based version of the finished utility function for monitoring stream completion.
86
87
```javascript { .api }
88
/**
89
* Promise-based stream completion monitoring
90
* @param stream - Stream to monitor for completion
91
* @param options - Optional configuration for what events to wait for
92
* @returns Promise that resolves when the stream is finished
93
*/
94
const promises = {
95
finished: (
96
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
97
options?: FinishedOptions
98
) => Promise<void>;
99
};
100
```
101
102
**Usage Examples:**
103
104
```javascript
105
const { promises, Readable, Writable } = require('readable-stream');
106
107
// Monitor readable stream completion
108
async function monitorReadable() {
109
const readable = new Readable({
110
read() {
111
this.push('chunk 1');
112
this.push('chunk 2');
113
this.push(null); // End stream
114
}
115
});
116
117
// Start reading the stream
118
readable.on('data', (chunk) => {
119
console.log('Read:', chunk.toString());
120
});
121
122
try {
123
await promises.finished(readable);
124
console.log('Readable stream finished successfully');
125
} catch (error) {
126
console.error('Readable stream error:', error);
127
}
128
}
129
130
// Monitor writable stream completion
131
async function monitorWritable() {
132
const writable = new Writable({
133
write(chunk, encoding, callback) {
134
console.log('Writing:', chunk.toString());
135
// Simulate async operation
136
setTimeout(callback, 100);
137
}
138
});
139
140
// Start writing to the stream
141
writable.write('data 1');
142
writable.write('data 2');
143
writable.end();
144
145
try {
146
await promises.finished(writable);
147
console.log('Writable stream finished successfully');
148
} catch (error) {
149
console.error('Writable stream error:', error);
150
}
151
}
152
153
// Monitor with specific options
154
async function monitorWithOptions() {
155
const readable = new Readable({
156
read() {
157
// Simulate data production
158
setTimeout(() => {
159
this.push('data');
160
}, 100);
161
}
162
});
163
164
try {
165
await promises.finished(readable, {
166
readable: true,
167
writable: false, // Don't wait for writable events
168
error: true // Wait for error events
169
});
170
console.log('Stream monitoring completed');
171
} catch (error) {
172
console.error('Stream monitoring failed:', error);
173
}
174
}
175
```
176
177
## Combining Promise API with Stream Operators
178
179
The Promise API works seamlessly with stream operators:
180
181
```javascript
182
const { promises, Readable } = require('readable-stream');
183
184
async function processDataWithOperators() {
185
// Create a readable stream
186
const dataSource = new Readable({
187
objectMode: true,
188
read() {
189
// Simulate data source
190
const data = [1, 2, 3, 4, 5];
191
data.forEach(item => this.push(item));
192
this.push(null);
193
}
194
});
195
196
// Transform with operators
197
const transformed = dataSource
198
.map(x => x * 2)
199
.filter(x => x > 5);
200
201
// Use promises to collect results
202
try {
203
const results = await transformed.toArray();
204
console.log('Results:', results); // [6, 8, 10]
205
} catch (error) {
206
console.error('Processing failed:', error);
207
}
208
}
209
210
// Pipeline with operator-transformed streams
211
async function pipelineWithOperators() {
212
const source = Readable.from([1, 2, 3, 4, 5]);
213
const doubled = source.map(x => x * 2);
214
215
const writable = new Writable({
216
objectMode: true,
217
write(chunk, encoding, callback) {
218
console.log('Received:', chunk);
219
callback();
220
}
221
});
222
223
try {
224
await promises.pipeline(doubled, writable);
225
console.log('Pipeline with operators completed');
226
} catch (error) {
227
console.error('Pipeline failed:', error);
228
}
229
}
230
```
231
232
## Error Handling Patterns
233
234
The Promise API provides clean error handling patterns:
235
236
```javascript
237
const { promises, Readable, Transform } = require('readable-stream');
238
239
// Graceful error handling
240
async function robustProcessing() {
241
const source = new Readable({
242
read() {
243
this.push('valid data');
244
this.push('invalid data');
245
this.push(null);
246
}
247
});
248
249
const validator = new Transform({
250
transform(chunk, encoding, callback) {
251
const data = chunk.toString();
252
if (data === 'invalid data') {
253
callback(new Error('Data validation failed'));
254
return;
255
}
256
this.push(data.toUpperCase());
257
callback();
258
}
259
});
260
261
const output = new Writable({
262
write(chunk, encoding, callback) {
263
console.log('Processed:', chunk.toString());
264
callback();
265
}
266
});
267
268
try {
269
await promises.pipeline(source, validator, output);
270
console.log('Processing completed successfully');
271
} catch (error) {
272
console.error('Processing failed:', error.message);
273
// Handle cleanup or recovery here
274
}
275
}
276
277
// Multiple pipeline error handling
278
async function multipleOperations() {
279
const operations = [
280
() => promises.pipeline(/* pipeline 1 */),
281
() => promises.pipeline(/* pipeline 2 */),
282
() => promises.pipeline(/* pipeline 3 */)
283
];
284
285
const results = await Promise.allSettled(
286
operations.map(op => op())
287
);
288
289
results.forEach((result, index) => {
290
if (result.status === 'fulfilled') {
291
console.log(`Operation ${index + 1} succeeded`);
292
} else {
293
console.error(`Operation ${index + 1} failed:`, result.reason);
294
}
295
});
296
}
297
```
298
299
## AbortSignal Support
300
301
The Promise API supports AbortSignal for cancellation:
302
303
```javascript
304
const { promises, Readable, Transform } = require('readable-stream');
305
306
async function cancellableOperation() {
307
const controller = new AbortController();
308
const { signal } = controller;
309
310
// Cancel after 5 seconds
311
setTimeout(() => {
312
controller.abort();
313
}, 5000);
314
315
const slowSource = new Readable({
316
read() {
317
// Simulate slow data production
318
setTimeout(() => {
319
this.push('data');
320
}, 1000);
321
}
322
});
323
324
const slowTransform = new Transform({
325
transform(chunk, encoding, callback) {
326
// Simulate slow processing
327
setTimeout(() => {
328
this.push(chunk.toString().toUpperCase());
329
callback();
330
}, 2000);
331
}
332
});
333
334
try {
335
await promises.finished(slowSource, { signal });
336
console.log('Operation completed');
337
} catch (error) {
338
if (error.name === 'AbortError') {
339
console.log('Operation was cancelled');
340
} else {
341
console.error('Operation failed:', error);
342
}
343
}
344
}
345
```
346
347
## Integration with Other APIs
348
349
The Promise API integrates well with other Node.js Promise APIs:
350
351
```javascript
352
const { promises, Readable } = require('readable-stream');
353
const { promises: fs } = require('fs');
354
355
async function fileProcessingWorkflow() {
356
try {
357
// Read file list
358
const files = await fs.readdir('./data');
359
360
// Create stream from file list
361
const fileStream = Readable.from(files);
362
363
// Process each file
364
const processedFiles = await fileStream
365
.filter(filename => filename.endsWith('.txt'))
366
.map(async (filename) => {
367
const content = await fs.readFile(`./data/${filename}`, 'utf8');
368
return { filename, content, size: content.length };
369
})
370
.toArray();
371
372
console.log('Processed files:', processedFiles);
373
} catch (error) {
374
console.error('Workflow failed:', error);
375
}
376
}
377
```
378
379
## Types
380
381
```javascript { .api }
382
interface FinishedOptions {
383
error?: boolean; // Wait for error event (default: true)
384
readable?: boolean; // Wait for readable to end (default: true)
385
writable?: boolean; // Wait for writable to finish (default: true)
386
signal?: AbortSignal; // AbortSignal for cancellation
387
}
388
389
// Promise API namespace
390
interface StreamPromises {
391
pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
392
finished: (stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options?: FinishedOptions) => Promise<void>;
393
}
394
```