0
# Readable Streams
1
2
Readable streams in StreamX provide enhanced lifecycle management, proper backpressure handling, and improved error handling compared to Node.js core streams. They support both flowing and non-flowing modes with consistent behavior.
3
4
## Capabilities
5
6
### Readable Class
7
8
Creates a readable stream with enhanced lifecycle support and proper resource management.
9
10
```javascript { .api }
11
/**
12
* Creates a new readable stream with enhanced lifecycle support
13
* @param options - Configuration options for the readable stream
14
*/
15
class Readable extends Stream {
16
constructor(options?: ReadableOptions);
17
18
/** Override this method to implement custom read logic */
19
_read(cb: () => void): void;
20
21
/** Lifecycle hook called before the first read operation */
22
_open(cb: (err?: Error) => void): void;
23
24
/** Cleanup hook called when the stream is destroyed */
25
_destroy(cb: (err?: Error) => void): void;
26
27
/** Hook called immediately when destroy() is first invoked */
28
_predestroy(): void;
29
30
/** Push data to the stream buffer */
31
push(data: any): boolean;
32
33
/** Read data from the stream buffer */
34
read(): any;
35
36
/** Add data to the front of the buffer */
37
unshift(data: any): void;
38
39
/** Pause the stream */
40
pause(): Readable;
41
42
/** Resume the stream */
43
resume(): Readable;
44
45
/** Pipe to a writable stream with callback support */
46
pipe(destination: Writable, callback?: (err?: Error) => void): Writable;
47
48
/** Set text encoding for automatic string decoding */
49
setEncoding(encoding: string): Readable;
50
51
/** Forcefully destroy the stream */
52
destroy(err?: Error): void;
53
}
54
55
interface ReadableOptions {
56
/** Maximum buffer size in bytes (default: 16384) */
57
highWaterMark?: number;
58
59
/** Optional function to map input data */
60
map?: (data: any) => any;
61
62
/** Optional function to calculate byte size of data */
63
byteLength?: (data: any) => number;
64
65
/** AbortSignal that triggers destroy when aborted */
66
signal?: AbortSignal;
67
68
/** Eagerly open the stream (default: false) */
69
eagerOpen?: boolean;
70
71
/** Shorthand for _read method */
72
read?: (cb: () => void) => void;
73
74
/** Shorthand for _open method */
75
open?: (cb: (err?: Error) => void) => void;
76
77
/** Shorthand for _destroy method */
78
destroy?: (cb: (err?: Error) => void) => void;
79
80
/** Shorthand for _predestroy method */
81
predestroy?: () => void;
82
83
/** Text encoding for automatic string decoding */
84
encoding?: string;
85
}
86
```
87
88
**Usage Examples:**
89
90
```javascript
91
const { Readable } = require('streamx');
92
93
// Basic readable stream
94
const readable = new Readable({
95
read(cb) {
96
// Push some data
97
this.push('Hello, ');
98
this.push('World!');
99
this.push(null); // End the stream
100
cb();
101
}
102
});
103
104
// Listen for data
105
readable.on('data', (chunk) => {
106
console.log('Received:', chunk.toString());
107
});
108
109
readable.on('end', () => {
110
console.log('Stream ended');
111
});
112
113
// Readable with lifecycle hooks
114
const fileReader = new Readable({
115
open(cb) {
116
console.log('Opening file...');
117
// Open file or resource
118
cb();
119
},
120
121
read(cb) {
122
// Read from file
123
this.push('File content...');
124
this.push(null);
125
cb();
126
},
127
128
destroy(cb) {
129
console.log('Closing file...');
130
// Clean up resources
131
cb();
132
}
133
});
134
```
135
136
### Static Methods
137
138
StreamX provides static utility methods for readable stream inspection and manipulation.
139
140
```javascript { .api }
141
/**
142
* Check if a readable stream is currently paused
143
* @param stream - The readable stream to check
144
* @returns True if the stream is paused
145
*/
146
static isPaused(stream: Readable): boolean;
147
148
/**
149
* Check if a readable stream is under backpressure
150
* @param stream - The readable stream to check
151
* @returns True if the stream is backpressured
152
*/
153
static isBackpressured(stream: Readable): boolean;
154
155
/**
156
* Create a readable stream from various data sources
157
* @param source - Array, buffer, string, or async iterator
158
* @returns New readable stream
159
*/
160
static from(source: any[] | Buffer | string | AsyncIterable<any>): Readable;
161
```
162
163
**Static Method Examples:**
164
165
```javascript
166
const { Readable } = require('streamx');
167
168
// Create from array
169
const arrayStream = Readable.from([1, 2, 3, 4, 5]);
170
171
// Create from string
172
const stringStream = Readable.from('Hello World');
173
174
// Create from async iterator
175
async function* generator() {
176
yield 'first';
177
yield 'second';
178
yield 'third';
179
}
180
const iteratorStream = Readable.from(generator());
181
182
// Check stream state
183
if (Readable.isPaused(arrayStream)) {
184
console.log('Stream is paused');
185
}
186
187
if (Readable.isBackpressured(arrayStream)) {
188
console.log('Stream is under backpressure');
189
}
190
```
191
192
### Events
193
194
Readable streams emit various events during their lifecycle.
195
196
```javascript { .api }
197
interface ReadableEvents {
198
/** Emitted when data is available to read */
199
'readable': () => void;
200
201
/** Emitted when data is being read from the stream */
202
'data': (chunk: any) => void;
203
204
/** Emitted when the stream has ended and no more data is available */
205
'end': () => void;
206
207
/** Emitted when the stream has been fully closed */
208
'close': () => void;
209
210
/** Emitted when an error occurs */
211
'error': (err: Error) => void;
212
213
/** Emitted when the stream is piped to a destination */
214
'piping': (dest: Writable) => void;
215
}
216
```
217
218
### Properties
219
220
```javascript { .api }
221
interface ReadableProperties {
222
/** Boolean indicating whether the stream has been destroyed */
223
destroyed: boolean;
224
}
225
```
226
227
### Advanced Configuration
228
229
StreamX readable streams support advanced configuration for specialized use cases.
230
231
**Map Function:**
232
Transform data as it's pushed to the stream:
233
234
```javascript
235
const readable = new Readable({
236
map: (data) => {
237
// Transform strings to uppercase
238
return typeof data === 'string' ? data.toUpperCase() : data;
239
},
240
241
read(cb) {
242
this.push('hello');
243
this.push('world');
244
this.push(null);
245
cb();
246
}
247
});
248
```
249
250
**ByteLength Function:**
251
Custom byte length calculation for backpressure:
252
253
```javascript
254
const readable = new Readable({
255
byteLength: (data) => {
256
// Custom size calculation
257
if (typeof data === 'string') return data.length;
258
if (Buffer.isBuffer(data)) return data.length;
259
return 1024; // Default size for objects
260
},
261
262
highWaterMark: 8192, // 8KB buffer
263
264
read(cb) {
265
this.push({ large: 'object' });
266
cb();
267
}
268
});
269
```
270
271
**AbortSignal Integration:**
272
273
```javascript
274
const controller = new AbortController();
275
276
const readable = new Readable({
277
signal: controller.signal,
278
279
read(cb) {
280
// This will be cancelled when signal is aborted
281
setTimeout(() => {
282
if (!controller.signal.aborted) {
283
this.push('data');
284
cb();
285
}
286
}, 1000);
287
}
288
});
289
290
// Cancel the stream after 500ms
291
setTimeout(() => controller.abort(), 500);
292
```