0
# Streams
1
2
Readable and writable stream implementations for efficient handling of large files with position control, encoding options, and event handling. Provides Node.js compatible stream interfaces for memory filesystem operations.
3
4
## Capabilities
5
6
### Readable Streams
7
8
Create readable streams for efficient reading of large files with precise control over reading position and encoding.
9
10
```javascript { .api }
11
/**
12
* Create a readable stream from a file
13
* @param path - File path to read from
14
* @param options - Stream options including encoding, position control, and file descriptor
15
* @returns Readable stream instance
16
*/
17
createReadStream(path: string | Buffer, options?: ReadStreamOptions): ReadStream;
18
createReadStream(path: string | Buffer, encoding: string): ReadStream;
19
20
interface ReadStreamOptions {
21
/** File open flags */
22
flags?: string;
23
/** Text encoding for string output */
24
encoding?: string;
25
/** Use existing file descriptor instead of opening file */
26
fd?: number;
27
/** File mode for newly created files */
28
mode?: number;
29
/** Auto-close file descriptor when stream ends */
30
autoClose?: boolean;
31
/** Starting byte position in file */
32
start?: number;
33
/** Ending byte position in file (inclusive) */
34
end?: number;
35
/** Stream buffer size */
36
highWaterMark?: number;
37
}
38
39
interface ReadStream extends NodeJS.ReadableStream {
40
/** File path being read */
41
path: string | Buffer;
42
/** Number of bytes read so far */
43
bytesRead: number;
44
/** File descriptor (if opened) */
45
fd?: number;
46
47
// Event emitters
48
on(event: 'open', listener: (fd: number) => void): this;
49
on(event: 'close', listener: () => void): this;
50
on(event: 'data', listener: (chunk: Buffer | string) => void): this;
51
on(event: 'end', listener: () => void): this;
52
on(event: 'error', listener: (err: Error) => void): this;
53
}
54
```
55
56
**Usage Examples:**
57
58
```javascript
59
// Basic file reading stream
60
const readStream = fs.createReadStream('/large-file.txt');
61
readStream.on('data', (chunk) => {
62
console.log(`Received ${chunk.length} bytes`);
63
});
64
readStream.on('end', () => {
65
console.log('File reading completed');
66
});
67
68
// Read with encoding
69
const textStream = fs.createReadStream('/document.txt', 'utf8');
70
textStream.on('data', (chunk) => {
71
console.log('Text chunk:', chunk); // String chunks
72
});
73
74
// Read specific file range
75
const rangeStream = fs.createReadStream('/data.bin', {
76
start: 1000, // Start at byte 1000
77
end: 2000, // End at byte 2000 (inclusive)
78
encoding: 'hex' // Output as hex string
79
});
80
81
// Use existing file descriptor
82
const fd = fs.openSync('/file.txt', 'r');
83
const fdStream = fs.createReadStream('/file.txt', { fd });
84
fdStream.on('open', (openedFd) => {
85
console.log('Stream opened with fd:', openedFd);
86
});
87
88
// Stream with custom buffer size
89
const bufferedStream = fs.createReadStream('/big-file.txt', {
90
highWaterMark: 64 * 1024, // 64KB buffer
91
encoding: 'utf8'
92
});
93
94
// Handle stream events
95
const stream = fs.createReadStream('/log.txt');
96
stream.on('open', (fd) => {
97
console.log('File opened, fd:', fd);
98
});
99
stream.on('close', () => {
100
console.log('File closed');
101
});
102
stream.on('error', (err) => {
103
console.error('Stream error:', err);
104
});
105
106
// Pipe to another stream
107
const readStream = fs.createReadStream('/input.txt');
108
const writeStream = fs.createWriteStream('/output.txt');
109
readStream.pipe(writeStream);
110
```
111
112
### Writable Streams
113
114
Create writable streams for efficient writing to files with position control and buffering.
115
116
```javascript { .api }
117
/**
118
* Create a writable stream to a file
119
* @param path - File path to write to
120
* @param options - Stream options including encoding, position control, and file descriptor
121
* @returns Writable stream instance
122
*/
123
createWriteStream(path: string | Buffer, options?: WriteStreamOptions): WriteStream;
124
createWriteStream(path: string | Buffer, encoding: string): WriteStream;
125
126
interface WriteStreamOptions {
127
/** File open flags */
128
flags?: string;
129
/** Default text encoding for string writes */
130
encoding?: string;
131
/** Use existing file descriptor instead of opening file */
132
fd?: number;
133
/** File mode for newly created files */
134
mode?: number;
135
/** Auto-close file descriptor when stream finishes */
136
autoClose?: boolean;
137
/** Starting byte position in file */
138
start?: number;
139
/** Emit 'close' event when stream closes */
140
emitClose?: boolean;
141
}
142
143
interface WriteStream extends NodeJS.WritableStream {
144
/** File path being written to */
145
path: string | Buffer;
146
/** Number of bytes written so far */
147
bytesWritten: number;
148
/** File descriptor (if opened) */
149
fd?: number;
150
151
// Event emitters
152
on(event: 'open', listener: (fd: number) => void): this;
153
on(event: 'close', listener: () => void): this;
154
on(event: 'finish', listener: () => void): this;
155
on(event: 'error', listener: (err: Error) => void): this;
156
157
// Write methods
158
write(chunk: string | Buffer, encoding?: string): boolean;
159
write(chunk: string | Buffer, callback?: (error?: Error) => void): boolean;
160
write(chunk: string | Buffer, encoding?: string, callback?: (error?: Error) => void): boolean;
161
end(): void;
162
end(chunk: string | Buffer): void;
163
end(chunk: string | Buffer, encoding?: string): void;
164
end(chunk: string | Buffer, callback?: () => void): void;
165
end(chunk: string | Buffer, encoding?: string, callback?: () => void): void;
166
}
167
```
168
169
**Usage Examples:**
170
171
```javascript
172
// Basic file writing stream
173
const writeStream = fs.createWriteStream('/output.txt');
174
writeStream.write('Hello ');
175
writeStream.write('World!');
176
writeStream.end();
177
178
writeStream.on('finish', () => {
179
console.log('Writing completed');
180
});
181
182
// Write with encoding
183
const textStream = fs.createWriteStream('/document.txt', 'utf8');
184
textStream.write('Unicode content: ');
185
textStream.write('Hello 世界');
186
textStream.end();
187
188
// Write to specific position
189
const positionStream = fs.createWriteStream('/patchable.txt', {
190
flags: 'r+', // Open for reading and writing
191
start: 100 // Start writing at byte 100
192
});
193
positionStream.write('Inserted text');
194
positionStream.end();
195
196
// Use existing file descriptor
197
const fd = fs.openSync('/target.txt', 'w');
198
const fdStream = fs.createWriteStream('/target.txt', {
199
fd,
200
emitClose: true
201
});
202
fdStream.on('open', (openedFd) => {
203
console.log('Stream opened with fd:', openedFd);
204
});
205
206
// Stream with callback handling
207
const callbackStream = fs.createWriteStream('/callback.txt');
208
callbackStream.write('First chunk', (err) => {
209
if (!err) {
210
console.log('First chunk written');
211
}
212
});
213
callbackStream.write('Second chunk', 'utf8', (err) => {
214
if (!err) {
215
console.log('Second chunk written');
216
}
217
});
218
callbackStream.end('Final chunk', () => {
219
console.log('Stream ended');
220
});
221
222
// Handle stream events
223
const stream = fs.createWriteStream('/log.txt');
224
stream.on('open', (fd) => {
225
console.log('File opened for writing, fd:', fd);
226
});
227
stream.on('finish', () => {
228
console.log('All data written to file');
229
});
230
stream.on('close', () => {
231
console.log('File closed');
232
});
233
stream.on('error', (err) => {
234
console.error('Write error:', err);
235
});
236
```
237
238
### Stream Processing Patterns
239
240
Common patterns for working with streams in Metro Memory FS.
241
242
```javascript
243
// Copy file using streams
244
function copyFileWithStreams(src, dest) {
245
return new Promise((resolve, reject) => {
246
const readStream = fs.createReadStream(src);
247
const writeStream = fs.createWriteStream(dest);
248
249
readStream.on('error', reject);
250
writeStream.on('error', reject);
251
writeStream.on('finish', resolve);
252
253
readStream.pipe(writeStream);
254
});
255
}
256
257
// Process large file line by line
258
function processFileLineByLine(filePath, processor) {
259
return new Promise((resolve, reject) => {
260
const stream = fs.createReadStream(filePath, 'utf8');
261
let buffer = '';
262
263
stream.on('data', (chunk) => {
264
buffer += chunk;
265
const lines = buffer.split('\n');
266
buffer = lines.pop() || ''; // Keep incomplete line in buffer
267
268
lines.forEach(line => {
269
processor(line);
270
});
271
});
272
273
stream.on('end', () => {
274
if (buffer) {
275
processor(buffer); // Process final line
276
}
277
resolve();
278
});
279
280
stream.on('error', reject);
281
});
282
}
283
284
// Transform stream data
285
function transformStream(inputPath, outputPath, transformer) {
286
return new Promise((resolve, reject) => {
287
const readStream = fs.createReadStream(inputPath);
288
const writeStream = fs.createWriteStream(outputPath);
289
290
readStream.on('data', (chunk) => {
291
const transformed = transformer(chunk);
292
writeStream.write(transformed);
293
});
294
295
readStream.on('end', () => {
296
writeStream.end();
297
});
298
299
readStream.on('error', reject);
300
writeStream.on('error', reject);
301
writeStream.on('finish', resolve);
302
});
303
}
304
305
// Append to file using stream
306
function appendToFile(filePath, data) {
307
return new Promise((resolve, reject) => {
308
const stream = fs.createWriteStream(filePath, { flags: 'a' });
309
310
stream.write(data, (err) => {
311
if (err) {
312
reject(err);
313
} else {
314
stream.end();
315
}
316
});
317
318
stream.on('finish', resolve);
319
stream.on('error', reject);
320
});
321
}
322
```
323
324
### Stream Buffering and Flow Control
325
326
Control stream buffering and handle backpressure.
327
328
```javascript
329
// Handle backpressure in write streams
330
function writeWithBackpressure(stream, data) {
331
return new Promise((resolve, reject) => {
332
if (!stream.write(data)) {
333
// Stream buffer is full, wait for drain
334
stream.once('drain', resolve);
335
stream.once('error', reject);
336
} else {
337
// Data was written immediately
338
resolve();
339
}
340
});
341
}
342
343
// Read file in controlled chunks
344
async function readFileInChunks(filePath, chunkSize = 1024) {
345
const chunks = [];
346
const stream = fs.createReadStream(filePath, {
347
highWaterMark: chunkSize
348
});
349
350
return new Promise((resolve, reject) => {
351
stream.on('data', (chunk) => {
352
chunks.push(chunk);
353
console.log(`Read chunk of ${chunk.length} bytes`);
354
});
355
356
stream.on('end', () => {
357
resolve(Buffer.concat(chunks));
358
});
359
360
stream.on('error', reject);
361
});
362
}
363
364
// Pause and resume streams
365
function pauseResumeExample(filePath) {
366
const stream = fs.createReadStream(filePath);
367
368
stream.on('data', (chunk) => {
369
console.log(`Received ${chunk.length} bytes`);
370
371
// Pause stream for processing
372
stream.pause();
373
374
// Simulate processing time
375
setTimeout(() => {
376
console.log('Processing complete, resuming...');
377
stream.resume();
378
}, 100);
379
});
380
381
stream.on('end', () => {
382
console.log('Stream ended');
383
});
384
}
385
```
386
387
### File Descriptor Management with Streams
388
389
Proper handling of file descriptors in stream operations.
390
391
```javascript
392
// Manual file descriptor management
393
function streamWithManualFd(filePath) {
394
const fd = fs.openSync(filePath, 'r');
395
396
const stream = fs.createReadStream(filePath, {
397
fd,
398
autoClose: false // Don't auto-close the fd
399
});
400
401
stream.on('end', () => {
402
// Manually close the file descriptor
403
fs.closeSync(fd);
404
console.log('File descriptor closed manually');
405
});
406
407
stream.on('error', () => {
408
fs.closeSync(fd); // Close on error too
409
});
410
411
return stream;
412
}
413
414
// Stream without auto-close
415
function createNonClosingStream(filePath) {
416
return fs.createReadStream(filePath, {
417
autoClose: false
418
});
419
}
420
421
// Check stream state
422
function checkStreamState(stream) {
423
console.log('Stream readable:', stream.readable);
424
console.log('Stream destroyed:', stream.destroyed);
425
console.log('Bytes read:', stream.bytesRead);
426
427
if (stream.fd !== undefined) {
428
console.log('File descriptor:', stream.fd);
429
}
430
}
431
```
432
433
## Performance Considerations
434
435
```javascript
436
// Optimize buffer sizes for large files
437
const largeFileStream = fs.createReadStream('/huge-file.dat', {
438
highWaterMark: 1024 * 1024 // 1MB buffer for large files
439
});
440
441
// Use smaller buffers for many small operations
442
const smallFileStream = fs.createReadStream('/config.json', {
443
highWaterMark: 1024 // 1KB buffer for small files
444
});
445
446
// Efficient file copying
447
function efficientCopy(src, dest) {
448
const readStream = fs.createReadStream(src, {
449
highWaterMark: 64 * 1024 // 64KB chunks
450
});
451
const writeStream = fs.createWriteStream(dest, {
452
highWaterMark: 64 * 1024
453
});
454
455
return new Promise((resolve, reject) => {
456
readStream.pipe(writeStream);
457
writeStream.on('finish', resolve);
458
writeStream.on('error', reject);
459
readStream.on('error', reject);
460
});
461
}
462
```
463
464
## Error Handling
465
466
Common stream errors and handling patterns:
467
468
```javascript
469
// Comprehensive error handling
470
function robustStreamOperation(inputPath, outputPath) {
471
return new Promise((resolve, reject) => {
472
const readStream = fs.createReadStream(inputPath);
473
const writeStream = fs.createWriteStream(outputPath);
474
475
let finished = false;
476
477
function cleanup(err) {
478
if (finished) return;
479
finished = true;
480
481
readStream.destroy();
482
writeStream.destroy();
483
484
if (err) {
485
// Clean up partial output file on error
486
try {
487
fs.unlinkSync(outputPath);
488
} catch (cleanupErr) {
489
// Ignore cleanup errors
490
}
491
reject(err);
492
} else {
493
resolve();
494
}
495
}
496
497
readStream.on('error', cleanup);
498
writeStream.on('error', cleanup);
499
writeStream.on('finish', () => cleanup(null));
500
501
readStream.pipe(writeStream);
502
});
503
}
504
```
505
506
Common stream errors:
507
- `ENOENT` - Source file doesn't exist (read stream)
508
- `EISDIR` - Trying to read/write a directory as a file
509
- `EMFILE` - Too many open files
510
- `ENOSPC` - No space left on device (simulated in memory constraints)