0
# Stream Classes
1
2
Core stream classes that provide the foundation for all streaming operations in readable-stream. These classes mirror Node.js native streams with enhanced stability and cross-version compatibility.
3
4
## Capabilities
5
6
### Readable
7
8
Base class for readable streams that produce data for consumption.
9
10
```javascript { .api }
11
/**
12
* Creates a readable stream
13
* @param options - Configuration options for the readable stream
14
*/
15
class Readable extends Stream {
16
constructor(options?: ReadableOptions);
17
18
/**
19
* Reads data from the stream
20
* @param size - Optional amount of data to read
21
* @returns Data chunk or null if no data available
22
*/
23
read(size?: number): any;
24
25
/**
26
* Pushes data into the stream buffer
27
* @param chunk - Data to push (null to end stream)
28
* @param encoding - Encoding for string chunks
29
* @returns true if more data can be written
30
*/
31
push(chunk: any, encoding?: BufferEncoding): boolean;
32
33
/**
34
* Unshifts data back to the front of the stream
35
* @param chunk - Data to unshift
36
* @param encoding - Encoding for string chunks
37
*/
38
unshift(chunk: any, encoding?: BufferEncoding): void;
39
40
/**
41
* Pauses the stream from emitting 'data' events
42
* @returns this stream instance
43
*/
44
pause(): this;
45
46
/**
47
* Resumes emitting 'data' events
48
* @returns this stream instance
49
*/
50
resume(): this;
51
52
/**
53
* Sets the encoding for string data
54
* @param encoding - The encoding to use
55
* @returns this stream instance
56
*/
57
setEncoding(encoding: BufferEncoding): this;
58
59
/**
60
* Pipes this readable to a writable stream
61
* @param destination - The writable stream to pipe to
62
* @param options - Piping options
63
* @returns the destination stream
64
*/
65
pipe<T extends NodeJS.WritableStream>(destination: T, options?: { end?: boolean }): T;
66
67
/**
68
* Removes a pipe destination
69
* @param destination - The destination to unpipe
70
* @returns this stream instance
71
*/
72
unpipe(destination?: NodeJS.WritableStream): this;
73
74
/**
75
* Wraps an old-style stream in a Readable
76
* @param stream - Old-style readable stream
77
* @param options - Wrap options
78
* @returns Readable stream
79
*/
80
static wrap(stream: NodeJS.ReadableStream, options?: ReadableOptions): Readable;
81
82
/**
83
* Creates a Readable from iterable/async iterable
84
* @param iterable - Source iterable
85
* @param options - Stream options
86
* @returns Readable stream
87
*/
88
static from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable;
89
}
90
```
91
92
**Usage Examples:**
93
94
```javascript
95
const { Readable } = require('readable-stream');
96
97
// Create a simple readable stream
98
const readable = new Readable({
99
read() {
100
this.push('chunk 1');
101
this.push('chunk 2');
102
this.push(null); // End the stream
103
}
104
});
105
106
// Create a readable from an array
107
const fromArray = Readable.from(['a', 'b', 'c']);
108
109
// Handle data events
110
readable.on('data', (chunk) => {
111
console.log('Received:', chunk.toString());
112
});
113
114
readable.on('end', () => {
115
console.log('Stream ended');
116
});
117
```
118
119
### Writable
120
121
Base class for writable streams that consume data.
122
123
```javascript { .api }
124
/**
125
* Creates a writable stream
126
* @param options - Configuration options for the writable stream
127
*/
128
class Writable extends Stream {
129
constructor(options?: WritableOptions);
130
131
/**
132
* Writes data to the stream
133
* @param chunk - Data to write
134
* @param encoding - Encoding for string chunks
135
* @param callback - Callback when write completes
136
* @returns false if buffer is full, true otherwise
137
*/
138
write(chunk: any, encoding?: BufferEncoding, callback?: (error: Error | null | undefined) => void): boolean;
139
140
/**
141
* Ends the writable stream
142
* @param chunk - Optional final chunk to write
143
* @param encoding - Encoding for string chunks
144
* @param callback - Callback when stream ends
145
* @returns this stream instance
146
*/
147
end(chunk?: any, encoding?: BufferEncoding, callback?: () => void): this;
148
149
/**
150
* Sets default encoding for string writes
151
* @param encoding - The default encoding
152
* @returns this stream instance
153
*/
154
setDefaultEncoding(encoding: BufferEncoding): this;
155
156
/**
157
* Destroys the stream
158
* @param error - Optional error to emit
159
* @returns this stream instance
160
*/
161
destroy(error?: Error): this;
162
163
/**
164
* Corked state - temporarily buffer writes
165
*/
166
cork(): void;
167
168
/**
169
* Uncork the stream - flush buffered writes
170
*/
171
uncork(): void;
172
}
173
```
174
175
**Usage Examples:**
176
177
```javascript
178
const { Writable } = require('readable-stream');
179
180
// Create a writable that logs data
181
const writable = new Writable({
182
write(chunk, encoding, callback) {
183
console.log('Writing:', chunk.toString());
184
callback();
185
}
186
});
187
188
// Write data
189
writable.write('hello');
190
writable.write('world');
191
writable.end();
192
193
// Handle events
194
writable.on('finish', () => {
195
console.log('All writes completed');
196
});
197
```
198
199
### Duplex
200
201
Stream that is both readable and writable, allowing bidirectional data flow.
202
203
```javascript { .api }
204
/**
205
* Creates a duplex stream
206
* @param options - Configuration options combining readable and writable options
207
*/
208
class Duplex extends Readable {
209
constructor(options?: DuplexOptions);
210
211
// Inherits all Readable methods
212
// Plus Writable interface:
213
214
write(chunk: any, encoding?: BufferEncoding, callback?: (error: Error | null | undefined) => void): boolean;
215
end(chunk?: any, encoding?: BufferEncoding, callback?: () => void): this;
216
setDefaultEncoding(encoding: BufferEncoding): this;
217
cork(): void;
218
uncork(): void;
219
}
220
```
221
222
**Usage Examples:**
223
224
```javascript
225
const { Duplex } = require('readable-stream');
226
227
// Create a duplex stream
228
const duplex = new Duplex({
229
read() {
230
this.push('data from readable side');
231
this.push(null);
232
},
233
write(chunk, encoding, callback) {
234
console.log('Received on writable side:', chunk.toString());
235
callback();
236
}
237
});
238
239
// Use both sides
240
duplex.write('hello');
241
duplex.on('data', (chunk) => {
242
console.log('Read:', chunk.toString());
243
});
244
```
245
246
### Transform
247
248
Duplex stream where the output is computed from the input, providing data transformation capabilities.
249
250
```javascript { .api }
251
/**
252
* Creates a transform stream
253
* @param options - Configuration options including transform function
254
*/
255
class Transform extends Duplex {
256
constructor(options?: TransformOptions);
257
258
/**
259
* Transform implementation - override this method
260
* @param chunk - Input data chunk
261
* @param encoding - Encoding of the chunk
262
* @param callback - Callback to call when transform is complete
263
*/
264
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
265
266
/**
267
* Flush implementation - called before stream ends
268
* @param callback - Callback to call when flush is complete
269
*/
270
_flush(callback: TransformCallback): void;
271
}
272
```
273
274
**Usage Examples:**
275
276
```javascript
277
const { Transform } = require('readable-stream');
278
279
// Create an uppercase transform
280
const upperCaseTransform = new Transform({
281
transform(chunk, encoding, callback) {
282
this.push(chunk.toString().toUpperCase());
283
callback();
284
}
285
});
286
287
// Use the transform
288
upperCaseTransform.write('hello');
289
upperCaseTransform.write('world');
290
upperCaseTransform.end();
291
292
upperCaseTransform.on('data', (chunk) => {
293
console.log('Transformed:', chunk.toString()); // "HELLO", "WORLD"
294
});
295
```
296
297
### PassThrough
298
299
Transform stream that passes input through to output unchanged, useful for testing and stream composition.
300
301
```javascript { .api }
302
/**
303
* Creates a passthrough stream
304
* @param options - Configuration options
305
*/
306
class PassThrough extends Transform {
307
constructor(options?: PassThroughOptions);
308
// Automatically passes all input to output unchanged
309
}
310
```
311
312
**Usage Examples:**
313
314
```javascript
315
const { PassThrough } = require('readable-stream');
316
317
// Create a passthrough for monitoring
318
const monitor = new PassThrough();
319
320
monitor.on('data', (chunk) => {
321
console.log('Data passing through:', chunk.toString());
322
});
323
324
// Data flows through unchanged
325
monitor.write('test data');
326
monitor.end();
327
```
328
329
## Stream Events
330
331
All streams emit various events during their lifecycle:
332
333
### Readable Events
334
- `'data'` - Emitted when data is available to read
335
- `'end'` - Emitted when no more data will be provided
336
- `'readable'` - Emitted when data is available to be read
337
- `'close'` - Emitted when the stream is closed
338
- `'error'` - Emitted when an error occurs
339
340
### Writable Events
341
- `'drain'` - Emitted when it's safe to write again after buffer was full
342
- `'finish'` - Emitted after `end()` is called and all data has been processed
343
- `'pipe'` - Emitted when a readable is piped to this writable
344
- `'unpipe'` - Emitted when a readable is unpiped from this writable
345
- `'close'` - Emitted when the stream is closed
346
- `'error'` - Emitted when an error occurs
347
348
## Types
349
350
```javascript { .api }
351
interface ReadableOptions {
352
highWaterMark?: number;
353
encoding?: BufferEncoding;
354
objectMode?: boolean;
355
read?(this: Readable, size: number): void;
356
destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void;
357
autoDestroy?: boolean;
358
signal?: AbortSignal;
359
}
360
361
interface WritableOptions {
362
highWaterMark?: number;
363
decodeStrings?: boolean;
364
defaultEncoding?: BufferEncoding;
365
objectMode?: boolean;
366
emitClose?: boolean;
367
write?(this: Writable, chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void;
368
writev?(this: Writable, chunks: Array<{ chunk: any, encoding: BufferEncoding }>, callback: (error?: Error | null) => void): void;
369
destroy?(this: Writable, error: Error | null, callback: (error: Error | null) => void): void;
370
final?(this: Writable, callback: (error?: Error | null) => void): void;
371
autoDestroy?: boolean;
372
signal?: AbortSignal;
373
}
374
375
interface DuplexOptions extends ReadableOptions, WritableOptions {
376
allowHalfOpen?: boolean;
377
readableObjectMode?: boolean;
378
writableObjectMode?: boolean;
379
readableHighWaterMark?: number;
380
writableHighWaterMark?: number;
381
}
382
383
interface TransformOptions extends DuplexOptions {
384
transform?(this: Transform, chunk: any, encoding: BufferEncoding, callback: TransformCallback): void;
385
flush?(this: Transform, callback: TransformCallback): void;
386
}
387
388
interface PassThroughOptions extends TransformOptions {}
389
390
type TransformCallback = (error?: Error | null, data?: any) => void;
391
```