0
# Utility Functions
1
2
Essential utilities for stream composition, error handling, and lifecycle management. These functions provide robust patterns for working with multiple streams and are crucial for building reliable streaming applications.
3
4
## Capabilities
5
6
### pipeline
7
8
Pipes between streams forwarding errors and cleaning up properly, providing a robust way to compose multiple streams.
9
10
```javascript { .api }
11
/**
12
* Pipe between streams, handling errors and cleanup automatically
13
* @param streams - Sequence of streams to pipe together
14
* @param callback - Called when pipeline completes or errors
15
* @returns The last stream in the pipeline
16
*/
17
function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>, callback: (err: NodeJS.ErrnoException | null) => void): NodeJS.ReadableStream;
18
19
/**
20
* Promise-based pipeline (available via promises.pipeline)
21
* @param streams - Sequence of streams to pipe together
22
* @returns Promise that resolves when pipeline completes
23
*/
24
function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>): Promise<void>;
25
```
26
27
**Usage Examples:**
28
29
```javascript
30
const { pipeline, Readable, Transform } = require('readable-stream');
31
const fs = require('fs');
32
33
// Callback-based pipeline
34
pipeline(
35
fs.createReadStream('input.txt'),
36
new Transform({
37
transform(chunk, encoding, callback) {
38
this.push(chunk.toString().toUpperCase());
39
callback();
40
}
41
}),
42
fs.createWriteStream('output.txt'),
43
(err) => {
44
if (err) {
45
console.error('Pipeline failed:', err);
46
} else {
47
console.log('Pipeline succeeded');
48
}
49
}
50
);
51
52
// Promise-based pipeline
53
const { pipeline: pipelineAsync } = require('readable-stream').promises;
54
55
async function processFile() {
56
try {
57
await pipelineAsync(
58
fs.createReadStream('input.txt'),
59
new Transform({
60
transform(chunk, encoding, callback) {
61
this.push(chunk.toString().toLowerCase());
62
callback();
63
}
64
}),
65
fs.createWriteStream('output.txt')
66
);
67
console.log('Pipeline completed successfully');
68
} catch (error) {
69
console.error('Pipeline failed:', error);
70
}
71
}
72
```
73
74
### finished
75
76
Get notified when a stream is no longer readable, writable, or has experienced an error or premature close.
77
78
```javascript { .api }
79
/**
80
* Get notified when stream is finished
81
* @param stream - Stream to monitor
82
* @param options - Options for what conditions to wait for
83
* @param callback - Called when stream is finished or errors
84
* @returns Cleanup function to remove listeners
85
*/
86
function finished(
87
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
88
options: FinishedOptions,
89
callback: (err?: NodeJS.ErrnoException | null) => void
90
): () => void;
91
92
/**
93
* Simplified version with just callback
94
* @param stream - Stream to monitor
95
* @param callback - Called when stream is finished or errors
96
* @returns Cleanup function to remove listeners
97
*/
98
function finished(
99
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
100
callback: (err?: NodeJS.ErrnoException | null) => void
101
): () => void;
102
103
/**
104
* Promise-based finished (available via promises.finished)
105
* @param stream - Stream to monitor
106
* @param options - Options for what conditions to wait for
107
* @returns Promise that resolves when stream is finished
108
*/
109
function finished(
110
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
111
options?: FinishedOptions
112
): Promise<void>;
113
```
114
115
**Usage Examples:**
116
117
```javascript
118
const { finished, Readable } = require('readable-stream');
119
120
// Monitor a stream with callback
121
const readable = new Readable({
122
read() {
123
this.push('data');
124
this.push(null);
125
}
126
});
127
128
const cleanup = finished(readable, (err) => {
129
if (err) {
130
console.error('Stream error:', err);
131
} else {
132
console.log('Stream finished successfully');
133
}
134
});
135
136
// Promise-based monitoring
137
const { finished: finishedAsync } = require('readable-stream').promises;
138
139
async function monitorStream(stream) {
140
try {
141
await finishedAsync(stream);
142
console.log('Stream completed');
143
} catch (error) {
144
console.error('Stream failed:', error);
145
}
146
}
147
```
148
149
### compose
150
151
Compose multiple transform streams into a single transform stream, useful for creating reusable transformation pipelines.
152
153
```javascript { .api }
154
/**
155
* Compose multiple transform streams into a single transform
156
* @param streams - Transform streams to compose
157
* @returns A single transform stream representing the composition
158
*/
159
function compose(...streams: Array<NodeJS.ReadWriteStream>): NodeJS.ReadWriteStream;
160
```
161
162
**Usage Examples:**
163
164
```javascript
165
const { compose, Transform } = require('readable-stream');
166
167
// Create individual transforms
168
const upperCase = new Transform({
169
transform(chunk, encoding, callback) {
170
this.push(chunk.toString().toUpperCase());
171
callback();
172
}
173
});
174
175
const addPrefix = new Transform({
176
transform(chunk, encoding, callback) {
177
this.push('PREFIX: ' + chunk.toString());
178
callback();
179
}
180
});
181
182
// Compose them into a single transform
183
const composedTransform = compose(upperCase, addPrefix);
184
185
// Use the composed transform
186
composedTransform.write('hello world');
187
composedTransform.end();
188
189
composedTransform.on('data', (chunk) => {
190
console.log('Result:', chunk.toString()); // "PREFIX: HELLO WORLD"
191
});
192
```
193
194
### addAbortSignal
195
196
Add AbortSignal support to a stream, allowing for cancellation of stream operations.
197
198
```javascript { .api }
199
/**
200
* Add abort signal support to a stream
201
* @param signal - AbortSignal to use for cancellation
202
* @param stream - Stream to add abort support to
203
* @returns The stream with abort signal attached
204
*/
205
function addAbortSignal<T extends NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>(
206
signal: AbortSignal,
207
stream: T
208
): T;
209
```
210
211
**Usage Examples:**
212
213
```javascript
214
const { addAbortSignal, Readable } = require('readable-stream');
215
216
// Create an abort controller
217
const controller = new AbortController();
218
const { signal } = controller;
219
220
// Create a stream with abort support
221
const readable = addAbortSignal(signal, new Readable({
222
read() {
223
// Simulate slow reading
224
setTimeout(() => {
225
this.push('chunk');
226
}, 1000);
227
}
228
}));
229
230
// Abort after 500ms
231
setTimeout(() => {
232
controller.abort();
233
}, 500);
234
235
readable.on('error', (err) => {
236
if (err.name === 'AbortError') {
237
console.log('Stream was aborted');
238
}
239
});
240
```
241
242
### destroy
243
244
Destroy a stream, calling its destroy method if available, or emitting an error.
245
246
```javascript { .api }
247
/**
248
* Destroy a stream
249
* @param stream - Stream to destroy
250
* @param error - Optional error to emit
251
*/
252
function destroy(stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, error?: Error): void;
253
```
254
255
**Usage Examples:**
256
257
```javascript
258
const { destroy, Readable } = require('readable-stream');
259
260
const readable = new Readable({
261
read() {
262
this.push('data');
263
}
264
});
265
266
// Destroy the stream with an error
267
destroy(readable, new Error('Something went wrong'));
268
269
readable.on('error', (err) => {
270
console.error('Stream destroyed:', err.message);
271
});
272
```
273
274
### Stream State Utilities
275
276
Utility functions for checking stream states.
277
278
```javascript { .api }
279
/**
280
* Check if a stream has been read from or disturbed
281
* @param stream - Stream to check
282
* @returns true if stream has been disturbed
283
*/
284
function isDisturbed(stream: NodeJS.ReadableStream): boolean;
285
286
/**
287
* Check if a stream has errored
288
* @param stream - Stream to check
289
* @returns true if stream has errored
290
*/
291
function isErrored(stream: NodeJS.ReadableStream | NodeJS.WritableStream): boolean;
292
```
293
294
**Usage Examples:**
295
296
```javascript
297
const { isDisturbed, isErrored, Readable } = require('readable-stream');
298
299
const readable = new Readable({
300
read() {
301
this.push('data');
302
this.push(null);
303
}
304
});
305
306
console.log(isDisturbed(readable)); // false
307
308
readable.read(); // Read some data
309
310
console.log(isDisturbed(readable)); // true
311
console.log(isErrored(readable)); // false
312
```
313
314
### High Water Mark Configuration
315
316
Functions for configuring the default high water mark for streams.
317
318
```javascript { .api }
319
/**
320
* Set the default high water mark for streams
321
* @param objectMode - Whether this is for object mode streams
322
* @param value - The high water mark value
323
*/
324
function setDefaultHighWaterMark(objectMode: boolean, value: number): void;
325
326
/**
327
* Get the default high water mark for streams
328
* @param objectMode - Whether this is for object mode streams
329
* @returns The current default high water mark
330
*/
331
function getDefaultHighWaterMark(objectMode: boolean): number;
332
```
333
334
**Usage Examples:**
335
336
```javascript
337
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('readable-stream');
338
339
// Get current defaults
340
console.log('Current buffer HWM:', getDefaultHighWaterMark(false)); // 16384
341
console.log('Current object HWM:', getDefaultHighWaterMark(true)); // 16
342
343
// Set new defaults
344
setDefaultHighWaterMark(false, 32768); // Increase buffer default
345
setDefaultHighWaterMark(true, 32); // Increase object default
346
```
347
348
### Internal Utilities
349
350
Internal utility functions exported for compatibility with Node.js streams.
351
352
```javascript { .api }
353
/**
354
* Convert Uint8Array to Buffer (internal utility)
355
* @param chunk - Uint8Array to convert
356
* @returns Buffer representation
357
*/
358
function _uint8ArrayToBuffer(chunk: Uint8Array): Buffer;
359
360
/**
361
* Check if value is Uint8Array (internal utility)
362
* @param value - Value to check
363
* @returns true if value is Uint8Array
364
*/
365
function _isUint8Array(value: any): boolean;
366
```
367
368
**Usage Examples:**
369
370
```javascript
371
const { _uint8ArrayToBuffer, _isUint8Array } = require('readable-stream');
372
373
// Check if value is Uint8Array
374
const buffer = new Uint8Array([1, 2, 3]);
375
console.log(_isUint8Array(buffer)); // true
376
console.log(_isUint8Array([1, 2, 3])); // false
377
378
// Convert Uint8Array to Buffer
379
const convertedBuffer = _uint8ArrayToBuffer(buffer);
380
console.log(Buffer.isBuffer(convertedBuffer)); // true
381
```
382
383
## Types
384
385
```javascript { .api }
386
interface FinishedOptions {
387
error?: boolean; // Wait for error event (default: true)
388
readable?: boolean; // Wait for readable to end (default: true)
389
writable?: boolean; // Wait for writable to finish (default: true)
390
signal?: AbortSignal; // AbortSignal for cancellation
391
}
392
```