0
# Readable Streams
1
2
Readable stream implementation providing data flow control, buffer management, and pipe operations with enhanced error handling.
3
4
## Capabilities
5
6
### Readable Constructor
7
8
Creates a new Readable stream with configuration options.
9
10
```javascript { .api }
11
/**
12
* Creates a new Readable stream
13
* @param opts - Configuration options for the readable stream
14
*/
15
class Readable extends Stream {
16
constructor(opts?: ReadableOptions);
17
}
18
19
interface ReadableOptions extends StreamOptions {
20
/** Maximum buffer size in bytes (default: 16384) */
21
highWaterMark?: number;
22
/** Function to map input data */
23
map?: (data: any) => any;
24
/** Function to calculate byte size of data */
25
byteLength?: (data: any) => number;
26
/** Read function shorthand */
27
read?: (cb: (error?: Error) => void) => void;
28
}
29
```
30
31
**Usage Example:**
32
33
```javascript
34
const { Readable } = require("@mafintosh/streamx");
35
36
const readable = new Readable({
37
highWaterMark: 8192,
38
read(cb) {
39
this.push("Hello World");
40
cb(null);
41
}
42
});
43
```
44
45
### Data Production
46
47
#### _read Method
48
49
Called when the stream wants new data. Override to implement data reading logic.
50
51
```javascript { .api }
52
/**
53
* Called when stream wants new data
54
* @param cb - Callback to call when read operation is complete
55
*/
56
_read(cb: (error?: Error) => void): void;
57
```
58
59
**Usage Example:**
60
61
```javascript
62
class CustomReadable extends Readable {
63
constructor() {
64
super();
65
this.counter = 0;
66
}
67
68
_read(cb) {
69
if (this.counter < 5) {
70
this.push(`data-${this.counter++}`);
71
} else {
72
this.push(null); // End stream
73
}
74
cb(null);
75
}
76
}
77
```
78
79
#### push Method
80
81
Pushes data to the stream buffer.
82
83
```javascript { .api }
84
/**
85
* Push data to stream buffer
86
* @param data - Data to push, or null to end stream
87
* @returns True if buffer is not full and more data can be pushed
88
*/
89
push(data: any): boolean;
90
```
91
92
**Usage Example:**
93
94
```javascript
95
const readable = new Readable({
96
read(cb) {
97
const shouldContinue = this.push("some data");
98
if (shouldContinue) {
99
this.push("more data");
100
}
101
this.push(null); // End stream
102
cb(null);
103
}
104
});
105
```
106
107
### Data Consumption
108
109
#### read Method
110
111
Reads data from the stream buffer.
112
113
```javascript { .api }
114
/**
115
* Read data from stream buffer
116
* @returns Data from buffer, or null if buffer is empty or stream ended
117
*/
118
read(): any;
119
```
120
121
**Usage Example:**
122
123
```javascript
124
const readable = new Readable({
125
read(cb) {
126
this.push("Hello");
127
this.push("World");
128
this.push(null);
129
cb(null);
130
}
131
});
132
133
readable.on('readable', () => {
134
let chunk;
135
while ((chunk = readable.read()) !== null) {
136
console.log('Read:', chunk);
137
}
138
});
139
```
140
141
#### unshift Method
142
143
Adds data to the front of the buffer (useful for putting back over-read data).
144
145
```javascript { .api }
146
/**
147
* Add data to front of buffer
148
* @param data - Data to add to front of buffer
149
*/
150
unshift(data: any): void;
151
```
152
153
**Usage Example:**
154
155
```javascript
156
const readable = new Readable({
157
read(cb) {
158
this.push("Hello World");
159
this.push(null);
160
cb(null);
161
}
162
});
163
164
readable.on('readable', () => {
165
const data = readable.read();
166
if (data === "Hello World") {
167
// Put it back for later processing
168
readable.unshift(data);
169
}
170
});
171
```
172
173
### Flow Control
174
175
#### pause Method
176
177
Pauses the stream (only needed if stream is resumed).
178
179
```javascript { .api }
180
/**
181
* Pause the stream
182
*/
183
pause(): void;
184
```
185
186
#### resume Method
187
188
Resumes/starts consuming the stream as fast as possible.
189
190
```javascript { .api }
191
/**
192
* Resume consuming the stream
193
*/
194
resume(): void;
195
```
196
197
**Usage Example:**
198
199
```javascript
200
const readable = new Readable({
201
read(cb) {
202
this.push(`data-${Date.now()}`);
203
cb(null);
204
}
205
});
206
207
// Start consuming
208
readable.resume();
209
210
// Pause after 1 second
211
setTimeout(() => {
212
readable.pause();
213
}, 1000);
214
215
// Resume after another second
216
setTimeout(() => {
217
readable.resume();
218
}, 2000);
219
```
220
221
### Pipe Operations
222
223
#### pipe Method
224
225
Efficiently pipes the readable stream to a writable stream with error handling.
226
227
```javascript { .api }
228
/**
229
* Pipe readable stream to writable stream
230
* @param destination - Writable stream to pipe to
231
* @param callback - Optional callback called when pipeline completes
232
* @returns The destination stream
233
*/
234
pipe(destination: Writable, callback?: (error?: Error) => void): Writable;
235
```
236
237
**Usage Example:**
238
239
```javascript
240
const { Readable, Writable } = require("@mafintosh/streamx");
241
242
const readable = new Readable({
243
read(cb) {
244
this.push("Hello World");
245
this.push(null);
246
cb(null);
247
}
248
});
249
250
const writable = new Writable({
251
write(data, cb) {
252
console.log("Received:", data.toString());
253
cb(null);
254
}
255
});
256
257
readable.pipe(writable, (err) => {
258
if (err) console.error("Pipeline failed:", err);
259
else console.log("Pipeline completed successfully");
260
});
261
```
262
263
### Events
264
265
#### readable Event
266
267
Emitted when data is available in the buffer and buffer was previously empty.
268
269
```javascript
270
readable.on('readable', () => {
271
// Data is available to read
272
});
273
```
274
275
#### data Event
276
277
Emitted when data is being read from the stream. Attaching this event automatically resumes the stream.
278
279
```javascript
280
readable.on('data', (chunk) => {
281
console.log('Received chunk:', chunk);
282
});
283
```
284
285
#### end Event
286
287
Emitted when the stream has ended and no more data is available.
288
289
```javascript
290
readable.on('end', () => {
291
console.log('Stream ended');
292
});
293
```
294
295
#### close Event
296
297
Emitted when the stream has fully closed.
298
299
```javascript
300
readable.on('close', () => {
301
console.log('Stream closed');
302
});
303
```
304
305
#### error Event
306
307
Emitted when an error occurs.
308
309
```javascript
310
readable.on('error', (err) => {
311
console.error('Stream error:', err);
312
});
313
```
314
315
**Complete Usage Example:**
316
317
```javascript
318
const { Readable } = require("@mafintosh/streamx");
319
320
const readable = new Readable({
321
read(cb) {
322
// Simulate reading data
323
setTimeout(() => {
324
if (Math.random() > 0.8) {
325
this.push(null); // End stream
326
} else {
327
this.push(`data-${Date.now()}`);
328
}
329
cb(null);
330
}, 100);
331
}
332
});
333
334
readable.on('data', (chunk) => {
335
console.log('Data:', chunk);
336
});
337
338
readable.on('end', () => {
339
console.log('Stream ended');
340
});
341
342
readable.on('close', () => {
343
console.log('Stream closed');
344
});
345
346
readable.on('error', (err) => {
347
console.error('Error:', err);
348
});
349
```